Java源码示例:org.apache.flink.streaming.api.operators.StreamOperator
示例1
protected StreamNode addNode(Integer vertexID,
String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex = new StreamNode(environment,
vertexID,
slotSharingGroup,
coLocationGroup,
operatorObject,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
示例2
private void decorateNode(Integer vertexID, ObjectNode node) {
StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
if (streamGraph.getSourceIDs().contains(vertexID)) {
node.put(PACT, "Data Source");
} else if (streamGraph.getSinkIDs().contains(vertexID)) {
node.put(PACT, "Data Sink");
} else {
node.put(PACT, "Operator");
}
StreamOperator<?> operator = streamGraph.getStreamNode(vertexID).getOperator();
node.put(CONTENTS, vertex.getOperatorName());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
示例3
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
int maxParallelism,
int parallelism,
int subtaskIndex,
OperatorID operatorID) throws Exception {
this(
operator,
SimpleOperatorFactory.of(operator),
new MockEnvironmentBuilder()
.setTaskName("MockTask")
.setManagedMemorySize(3 * 1024 * 1024)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1024)
.setMaxParallelism(maxParallelism)
.setParallelism(parallelism)
.setSubtaskIndex(subtaskIndex)
.build(),
true,
operatorID);
}
示例4
@VisibleForTesting
StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
// Eagerly perform a deep copy of the configuration, otherwise it will result in undefined behavior
// when deploying with multiple bootstrap transformations.
Configuration deepCopy = new Configuration(dataSet.getExecutionEnvironment().getConfiguration());
final StreamConfig config = new StreamConfig(deepCopy);
config.setChainStart();
config.setCheckpointingEnabled(true);
config.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);
if (keyType != null) {
TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
config.setStateKeySerializer(keySerializer);
config.setStatePartitioner(0, originalKeySelector);
}
config.setStreamOperator(operator);
config.setOperatorName(operatorID.toHexString());
config.setOperatorID(operatorID);
config.setStateBackend(stateBackend);
return config;
}
示例5
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
int maxParallelism,
int parallelism,
int subtaskIndex,
OperatorID operatorID) throws Exception {
this(
operator,
new MockEnvironmentBuilder()
.setTaskName("MockTask")
.setMemorySize(3 * 1024 * 1024)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1024)
.setMaxParallelism(maxParallelism)
.setParallelism(parallelism)
.setSubtaskIndex(subtaskIndex)
.build(),
true,
operatorID);
}
示例6
public OWNER finish() {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
outEdgesInOrder.add(
new StreamEdge(
new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
0,
Collections.<String>emptyList(),
new BroadcastPartitioner<Object>(),
null));
tailConfig.setChainEnd();
tailConfig.setOutputSelectors(Collections.emptyList());
tailConfig.setNumberOfOutputs(1);
tailConfig.setOutEdgesInOrder(outEdgesInOrder);
tailConfig.setNonChainedOutputs(outEdgesInOrder);
headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
headConfig.setOutEdgesInOrder(outEdgesInOrder);
return owner;
}
示例7
@Test
public void testFullJoin() throws Exception {
StreamOperator joinOperator = newOperator(FlinkJoinType.FULL, leftIsSmall);
TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
buildSortMergeJoin(joinOperator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
expectedOutput.add(new StreamRecord<>(newRow("c", "2null")));
expectedOutput.add(new StreamRecord<>(newRow("d", "0null")));
testHarness.waitForTaskCompletion();
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
transformToBinary(testHarness.getOutput()));
}
示例8
/**
* Execute @link StreamOperator#dispose()} of each operator in the chain of this
* {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
*/
private void disposeAllOperators() throws Exception {
if (operatorChain != null && !disposedOperators) {
Exception disposalException = null;
for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
try {
operator.dispose();
}
catch (Exception e) {
disposalException = ExceptionUtils.firstOrSuppressed(e, disposalException);
}
}
disposedOperators = true;
if (disposalException != null) {
throw disposalException;
}
}
}
示例9
@VisibleForTesting
StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
final StreamConfig config;
if (keyType == null) {
config = new BoundedStreamConfig();
} else {
TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
config = new BoundedStreamConfig(keySerializer, originalKeySelector);
}
config.setStreamOperator(operator);
config.setOperatorName(operatorID.toHexString());
config.setOperatorID(operatorID);
config.setStateBackend(stateBackend);
return config;
}
示例10
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
Preconditions.checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
streamConfig.setOperatorID(new OperatorID(4711L, 123L));
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
示例11
@SuppressWarnings("unchecked")
@Override
public StreamOperator createStreamOperator(StreamOperatorParameters initializer) {
WatermarkGenerator watermarkGenerator = generatedWatermarkGenerator.newInstance(
initializer.getContainingTask().getUserCodeClassLoader());
WatermarkAssignerOperator operator = new WatermarkAssignerOperator(
rowtimeFieldIndex,
watermarkGenerator,
idleTimeout,
processingTimeService);
operator.setup(
initializer.getContainingTask(),
initializer.getStreamConfig(),
initializer.getOutput());
return operator;
}
示例12
/**
* Users of the test harness can call this utility method to setup the stream config
* if there will only be a single operator to be tested. The method will setup the
* outgoing network connection for the operator.
*
* <p>For more advanced test cases such as testing chains of multiple operators with the harness,
* please manually configure the stream config.
*/
public void setupOutputForSingletonOperatorChain() {
Preconditions.checkState(!setupCalled, "This harness was already setup.");
setupCalled = true;
streamConfig.setChainStart();
streamConfig.setBufferTimeout(0);
streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
streamConfig.setNumberOfOutputs(1);
streamConfig.setTypeSerializerOut(outputSerializer);
streamConfig.setVertexID(0);
streamConfig.setOperatorID(new OperatorID(4711L, 123L));
StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
private static final long serialVersionUID = 1L;
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
示例13
public void finish() {
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
outEdgesInOrder.add(
new StreamEdge(
new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
0,
Collections.<String>emptyList(),
new BroadcastPartitioner<Object>(),
null));
tailConfig.setBufferTimeout(0);
tailConfig.setChainEnd();
tailConfig.setOutputSelectors(Collections.emptyList());
tailConfig.setNumberOfOutputs(1);
tailConfig.setOutEdgesInOrder(outEdgesInOrder);
tailConfig.setNonChainedOutputs(outEdgesInOrder);
headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
headConfig.setOutEdgesInOrder(outEdgesInOrder);
}
示例14
@Test
public void testLeftOuterJoin() throws Exception {
StreamOperator joinOperator = newOperator(FlinkJoinType.LEFT, leftIsSmall);
TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness =
buildSortMergeJoin(joinOperator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
expectedOutput.add(new StreamRecord<>(newRow("d", "0null")));
testHarness.waitForTaskCompletion();
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
transformToBinary(testHarness.getOutput()));
}
示例15
/**
* Initialize state and open all operators in the chain from <b>tail to head</b>,
* contrary to {@link StreamOperator#close()} which happens <b>head to tail</b>
* (see {@link #closeOperators(StreamTaskActionExecutor)}).
*/
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();
}
}
示例16
public void setStreamOperator(StreamOperator<?> operator) {
if (operator != null) {
config.setClass(USER_FUNCTION, operator.getClass());
try {
InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
} catch (IOException e) {
throw new StreamTaskException("Cannot serialize operator object "
+ operator.getClass() + ".", e);
}
}
}
示例17
public <IN, OUT> void addSource(Integer vertexID,
String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
示例18
public <IN, OUT> void addSink(Integer vertexID,
String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sinks.add(vertexID);
}
示例19
public <IN, OUT> void addOperator(
Integer vertexID,
String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
if (operatorObject instanceof StoppableStreamSource) {
addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);
}
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
setSerializers(vertexID, inSerializer, null, outSerializer);
if (operatorObject instanceof OutputTypeConfigurable && outTypeInfo != null) {
@SuppressWarnings("unchecked")
OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
// sets the output type which must be know at StreamGraph creation time
outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
}
if (operatorObject instanceof InputTypeConfigurable) {
InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexID);
}
}
示例20
@Test
public void testRightOuterJoin() throws Exception {
StreamOperator joinOperator = newOperator(FlinkJoinType.RIGHT, leftIsSmall);
TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness =
buildSortMergeJoin(joinOperator);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
expectedOutput.add(new StreamRecord<>(newRow("c", "2null")));
testHarness.waitForTaskCompletion();
TestHarnessUtil.assertOutputEquals("Output was not correct.",
expectedOutput,
transformToBinary(testHarness.getOutput()));
}
示例21
/**
* Execute {@link StreamOperator#close()} of each operator in the chain of this
* {@link StreamTask}. Closing happens from <b>head to tail</b> operator in the chain,
* contrary to {@link StreamOperator#open()} which happens <b>tail to head</b>
* (see {@link #openAllOperators()}.
*/
private void closeAllOperators() throws Exception {
// We need to close them first to last, since upstream operators in the chain might emit
// elements in their close methods.
StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
for (int i = allOperators.length - 1; i >= 0; i--) {
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
operator.close();
}
}
}
示例22
/**
* Execute {@link StreamOperator#dispose()} of each operator in the chain of this
* {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
*/
private void tryDisposeAllOperators() throws Exception {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.dispose();
}
}
}
示例23
/**
* Execute @link StreamOperator#dispose()} of each operator in the chain of this
* {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
*
* <p>The difference with the {@link #tryDisposeAllOperators()} is that in case of an
* exception, this method catches it and logs the message.
*/
private void disposeAllOperators() {
if (operatorChain != null) {
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
try {
if (operator != null) {
operator.dispose();
}
}
catch (Throwable t) {
LOG.error("Error during disposal of stream operator.", t);
}
}
}
}
示例24
@Before
public void before() throws Exception {
mailbox = new TaskMailboxImpl();
decorator = new CountingStreamTaskActionExecutor();
task = new StreamTask<Object, StreamOperator<Object>>(new StreamTaskTest.DeclineDummyEnvironment(), null, FatalExitExceptionHandler.INSTANCE, decorator, mailbox) {
@Override
protected void init() {
}
@Override
protected void processInput(MailboxDefaultAction.Controller controller) {
}
};
task.operatorChain = new OperatorChain<>(task, new NonRecordWriter<>());
}
示例25
@VisibleForTesting
OperatorChain(
StreamOperator<?>[] allOperators,
RecordWriterOutput<?>[] streamOutputs,
WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
OP headOperator) {
this.allOperators = checkNotNull(allOperators);
this.streamOutputs = checkNotNull(streamOutputs);
this.chainEntryPoint = checkNotNull(chainEntryPoint);
this.headOperator = checkNotNull(headOperator);
}
示例26
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// go forward through the operator chain and tell each operator
// to prepare the checkpoint
final StreamOperator<?>[] operators = this.allOperators;
for (int i = operators.length - 1; i >= 0; --i) {
final StreamOperator<?> op = operators[i];
if (op != null) {
op.prepareSnapshotPreBarrier(checkpointId);
}
}
}
示例27
private static StreamOperator<?> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception {
StreamOperator<?> operator = mock(StreamOperator.class);
when(operator.getOperatorID()).thenReturn(new OperatorID());
when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
.thenReturn(operatorSnapshotResult);
return operator;
}
示例28
private OperatorSnapshotFutures buildOperatorSnapshotFutures(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
OperatorChain<?, ?> operatorChain,
StreamOperator<?> op,
Supplier<Boolean> isCanceled,
ChannelStateWriteResult channelStateWriteResult,
CheckpointStreamFactory storage) throws Exception {
OperatorSnapshotFutures snapshotInProgress = checkpointStreamOperator(
op,
checkpointMetaData,
checkpointOptions,
storage,
isCanceled);
if (op == operatorChain.getHeadOperator()) {
snapshotInProgress.setInputChannelStateFuture(
channelStateWriteResult
.getInputChannelStateHandles()
.thenApply(StateObjectCollection::new)
.thenApply(SnapshotResult::of));
}
if (op == operatorChain.getTailOperator()) {
snapshotInProgress.setResultSubpartitionStateFuture(
channelStateWriteResult
.getResultSubpartitionStateHandles()
.thenApply(StateObjectCollection::new)
.thenApply(SnapshotResult::of));
}
return snapshotInProgress;
}
示例29
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
int maxParallelism,
int parallelism,
int subtaskIndex) throws Exception {
this(
operator,
maxParallelism,
parallelism,
subtaskIndex,
new OperatorID());
}
示例30
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
int maxParallelism,
int parallelism,
int subtaskIndex) throws Exception {
this(
operator,
maxParallelism,
parallelism,
subtaskIndex,
new OperatorID());
}