Java源码示例:org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator
示例1
private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
if (!inputType2.isTupleType()) {
throw new InvalidParameterException("Should not happen.");
}
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanLeftUnwrappingCoGroupOperator<>(
function,
keys1,
logicalKeyPositions2,
name,
outputType,
typeInfoWithKey1,
inputType2);
cogroup.setFirstInput(keyedInput1);
cogroup.setSecondInput(input2);
return cogroup;
}
示例2
private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
if (!inputType2.isTupleType()) {
throw new InvalidParameterException("Should not happen.");
}
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanLeftUnwrappingCoGroupOperator<>(
function,
keys1,
logicalKeyPositions2,
name,
outputType,
typeInfoWithKey1,
inputType2);
cogroup.setFirstInput(keyedInput1);
cogroup.setSecondInput(input2);
return cogroup;
}
示例3
private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
if (!inputType2.isTupleType()) {
throw new InvalidParameterException("Should not happen.");
}
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanLeftUnwrappingCoGroupOperator<>(
function,
keys1,
logicalKeyPositions2,
name,
outputType,
typeInfoWithKey1,
inputType2);
cogroup.setFirstInput(keyedInput1);
cogroup.setSecondInput(input2);
return cogroup;
}