Java源码示例:org.apache.flink.streaming.api.operators.StreamOperator

示例1
protected StreamNode addNode(Integer vertexID,
	String slotSharingGroup,
	@Nullable String coLocationGroup,
	Class<? extends AbstractInvokable> vertexClass,
	StreamOperator<?> operatorObject,
	String operatorName) {

	if (streamNodes.containsKey(vertexID)) {
		throw new RuntimeException("Duplicate vertexID " + vertexID);
	}

	StreamNode vertex = new StreamNode(environment,
		vertexID,
		slotSharingGroup,
		coLocationGroup,
		operatorObject,
		operatorName,
		new ArrayList<OutputSelector<?>>(),
		vertexClass);

	streamNodes.put(vertexID, vertex);

	return vertex;
}
 
示例2
private void decorateNode(Integer vertexID, ObjectNode node) {

		StreamNode vertex = streamGraph.getStreamNode(vertexID);

		node.put(ID, vertexID);
		node.put(TYPE, vertex.getOperatorName());

		if (streamGraph.getSourceIDs().contains(vertexID)) {
			node.put(PACT, "Data Source");
		} else if (streamGraph.getSinkIDs().contains(vertexID)) {
			node.put(PACT, "Data Sink");
		} else {
			node.put(PACT, "Operator");
		}

		StreamOperator<?> operator = streamGraph.getStreamNode(vertexID).getOperator();

		node.put(CONTENTS, vertex.getOperatorName());

		node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
	}
 
示例3
public AbstractStreamOperatorTestHarness(
		StreamOperator<OUT> operator,
		int maxParallelism,
		int parallelism,
		int subtaskIndex,
		OperatorID operatorID) throws Exception {
	this(
			operator,
			SimpleOperatorFactory.of(operator),
			new MockEnvironmentBuilder()
					.setTaskName("MockTask")
					.setManagedMemorySize(3 * 1024 * 1024)
					.setInputSplitProvider(new MockInputSplitProvider())
					.setBufferSize(1024)
					.setMaxParallelism(maxParallelism)
					.setParallelism(parallelism)
					.setSubtaskIndex(subtaskIndex)
					.build(),
			true,
			operatorID);
}
 
示例4
@VisibleForTesting
StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
	// Eagerly perform a deep copy of the configuration, otherwise it will result in undefined behavior
	// when deploying with multiple bootstrap transformations.
	Configuration deepCopy = new Configuration(dataSet.getExecutionEnvironment().getConfiguration());
	final StreamConfig config = new StreamConfig(deepCopy);
	config.setChainStart();
	config.setCheckpointingEnabled(true);
	config.setCheckpointMode(CheckpointingMode.EXACTLY_ONCE);

	if (keyType != null) {
		TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());

		config.setStateKeySerializer(keySerializer);
		config.setStatePartitioner(0, originalKeySelector);
	}

	config.setStreamOperator(operator);
	config.setOperatorName(operatorID.toHexString());
	config.setOperatorID(operatorID);
	config.setStateBackend(stateBackend);
	return config;
}
 
示例5
public AbstractStreamOperatorTestHarness(
		StreamOperator<OUT> operator,
		int maxParallelism,
		int parallelism,
		int subtaskIndex,
		OperatorID operatorID) throws Exception {
	this(
		operator,
		new MockEnvironmentBuilder()
			.setTaskName("MockTask")
			.setMemorySize(3 * 1024 * 1024)
			.setInputSplitProvider(new MockInputSplitProvider())
			.setBufferSize(1024)
			.setMaxParallelism(maxParallelism)
			.setParallelism(parallelism)
			.setSubtaskIndex(subtaskIndex)
			.build(),
		true,
		operatorID);
}
 
示例6
public OWNER finish() {
	List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
	outEdgesInOrder.add(
		new StreamEdge(
			new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
			new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
			0,
			Collections.<String>emptyList(),
			new BroadcastPartitioner<Object>(),
			null));

	tailConfig.setChainEnd();
	tailConfig.setOutputSelectors(Collections.emptyList());
	tailConfig.setNumberOfOutputs(1);
	tailConfig.setOutEdgesInOrder(outEdgesInOrder);
	tailConfig.setNonChainedOutputs(outEdgesInOrder);
	headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
	headConfig.setOutEdgesInOrder(outEdgesInOrder);

	return owner;
}
 
示例7
@Test
public void testFullJoin() throws Exception {
	StreamOperator joinOperator = newOperator(FlinkJoinType.FULL, leftIsSmall);
	TwoInputStreamTaskTestHarness<BinaryRow, BinaryRow, JoinedRow> testHarness =
			buildSortMergeJoin(joinOperator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
	expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
	expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
	expectedOutput.add(new StreamRecord<>(newRow("c", "2null")));
	expectedOutput.add(new StreamRecord<>(newRow("d", "0null")));

	testHarness.waitForTaskCompletion();
	TestHarnessUtil.assertOutputEquals("Output was not correct.",
			expectedOutput,
			transformToBinary(testHarness.getOutput()));
}
 
示例8
/**
 * Execute @link StreamOperator#dispose()} of each operator in the chain of this
 * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
 */
private void disposeAllOperators() throws Exception {
	if (operatorChain != null && !disposedOperators) {
		Exception disposalException = null;
		for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
			StreamOperator<?> operator = operatorWrapper.getStreamOperator();
			try {
				operator.dispose();
			}
			catch (Exception e) {
				disposalException = ExceptionUtils.firstOrSuppressed(e, disposalException);
			}
		}
		disposedOperators = true;
		if (disposalException != null) {
			throw disposalException;
		}
	}
}
 
示例9
@VisibleForTesting
StreamConfig getConfig(OperatorID operatorID, StateBackend stateBackend, StreamOperator<TaggedOperatorSubtaskState> operator) {
	final StreamConfig config;
	if (keyType == null) {
		config = new BoundedStreamConfig();
	} else {
		TypeSerializer<?> keySerializer = keyType.createSerializer(dataSet.getExecutionEnvironment().getConfig());
		config = new BoundedStreamConfig(keySerializer, originalKeySelector);
	}

	config.setStreamOperator(operator);
	config.setOperatorName(operatorID.toHexString());
	config.setOperatorID(operatorID);
	config.setStateBackend(stateBackend);
	return config;
}
 
示例10
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public void setupOutputForSingletonOperatorChain() {
	Preconditions.checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);
	streamConfig.setOperatorID(new OperatorID(4711L, 123L));

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<>();
	StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<>(), new BroadcastPartitioner<>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
 
示例11
@SuppressWarnings("unchecked")
@Override
public StreamOperator createStreamOperator(StreamOperatorParameters initializer) {
	WatermarkGenerator watermarkGenerator = generatedWatermarkGenerator.newInstance(
		initializer.getContainingTask().getUserCodeClassLoader());
	WatermarkAssignerOperator operator = new WatermarkAssignerOperator(
		rowtimeFieldIndex,
		watermarkGenerator,
		idleTimeout,
		processingTimeService);
	operator.setup(
		initializer.getContainingTask(),
		initializer.getStreamConfig(),
		initializer.getOutput());
	return operator;
}
 
示例12
/**
 * Users of the test harness can call this utility method to setup the stream config
 * if there will only be a single operator to be tested. The method will setup the
 * outgoing network connection for the operator.
 *
 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 * please manually configure the stream config.
 */
public void setupOutputForSingletonOperatorChain() {
	Preconditions.checkState(!setupCalled, "This harness was already setup.");
	setupCalled = true;
	streamConfig.setChainStart();
	streamConfig.setBufferTimeout(0);
	streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
	streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
	streamConfig.setNumberOfOutputs(1);
	streamConfig.setTypeSerializerOut(outputSerializer);
	streamConfig.setVertexID(0);
	streamConfig.setOperatorID(new OperatorID(4711L, 123L));

	StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
		private static final long serialVersionUID = 1L;
	};

	List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
	StreamNode sourceVertexDummy = new StreamNode(0, "group", null, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
	StreamNode targetVertexDummy = new StreamNode(1, "group", null, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);

	outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>(), null /* output tag */));

	streamConfig.setOutEdgesInOrder(outEdgesInOrder);
	streamConfig.setNonChainedOutputs(outEdgesInOrder);
}
 
示例13
public void finish() {

		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
		outEdgesInOrder.add(
			new StreamEdge(
				new StreamNode(chainIndex, null, null, (StreamOperator<?>) null, null, null, null),
				new StreamNode(chainIndex , null, null, (StreamOperator<?>) null, null, null, null),
				0,
				Collections.<String>emptyList(),
				new BroadcastPartitioner<Object>(),
				null));

		tailConfig.setBufferTimeout(0);
		tailConfig.setChainEnd();
		tailConfig.setOutputSelectors(Collections.emptyList());
		tailConfig.setNumberOfOutputs(1);
		tailConfig.setOutEdgesInOrder(outEdgesInOrder);
		tailConfig.setNonChainedOutputs(outEdgesInOrder);
		headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
		headConfig.setOutEdgesInOrder(outEdgesInOrder);
	}
 
示例14
@Test
public void testLeftOuterJoin() throws Exception {
	StreamOperator joinOperator = newOperator(FlinkJoinType.LEFT, leftIsSmall);
	TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness =
			buildSortMergeJoin(joinOperator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
	expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
	expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
	expectedOutput.add(new StreamRecord<>(newRow("d", "0null")));
	testHarness.waitForTaskCompletion();
	TestHarnessUtil.assertOutputEquals("Output was not correct.",
			expectedOutput,
			transformToBinary(testHarness.getOutput()));
}
 
示例15
/**
 * Initialize state and open all operators in the chain from <b>tail to head</b>,
 * contrary to {@link StreamOperator#close()} which happens <b>head to tail</b>
 * (see {@link #closeOperators(StreamTaskActionExecutor)}).
 */
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
	for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
		StreamOperator<?> operator = operatorWrapper.getStreamOperator();
		operator.initializeState(streamTaskStateInitializer);
		operator.open();
	}
}
 
示例16
public void setStreamOperator(StreamOperator<?> operator) {
	if (operator != null) {
		config.setClass(USER_FUNCTION, operator.getClass());

		try {
			InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
		} catch (IOException e) {
			throw new StreamTaskException("Cannot serialize operator object "
					+ operator.getClass() + ".", e);
		}
	}
}
 
示例17
public <IN, OUT> void addSource(Integer vertexID,
	String slotSharingGroup,
	@Nullable String coLocationGroup,
	StreamOperator<OUT> operatorObject,
	TypeInformation<IN> inTypeInfo,
	TypeInformation<OUT> outTypeInfo,
	String operatorName) {
	addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
	sources.add(vertexID);
}
 
示例18
public <IN, OUT> void addSink(Integer vertexID,
	String slotSharingGroup,
	@Nullable String coLocationGroup,
	StreamOperator<OUT> operatorObject,
	TypeInformation<IN> inTypeInfo,
	TypeInformation<OUT> outTypeInfo,
	String operatorName) {
	addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
	sinks.add(vertexID);
}
 
示例19
public <IN, OUT> void addOperator(
		Integer vertexID,
		String slotSharingGroup,
		@Nullable String coLocationGroup,
		StreamOperator<OUT> operatorObject,
		TypeInformation<IN> inTypeInfo,
		TypeInformation<OUT> outTypeInfo,
		String operatorName) {

	if (operatorObject instanceof StoppableStreamSource) {
		addNode(vertexID, slotSharingGroup, coLocationGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
	} else if (operatorObject instanceof StreamSource) {
		addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorObject, operatorName);
	} else {
		addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorObject, operatorName);
	}

	TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;

	TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;

	setSerializers(vertexID, inSerializer, null, outSerializer);

	if (operatorObject instanceof OutputTypeConfigurable && outTypeInfo != null) {
		@SuppressWarnings("unchecked")
		OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
		// sets the output type which must be know at StreamGraph creation time
		outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
	}

	if (operatorObject instanceof InputTypeConfigurable) {
		InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
		inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
	}

	if (LOG.isDebugEnabled()) {
		LOG.debug("Vertex: {}", vertexID);
	}
}
 
示例20
@Test
public void testRightOuterJoin() throws Exception {
	StreamOperator joinOperator = newOperator(FlinkJoinType.RIGHT, leftIsSmall);
	TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness =
			buildSortMergeJoin(joinOperator);

	ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
	expectedOutput.add(new StreamRecord<>(newRow("a", "02")));
	expectedOutput.add(new StreamRecord<>(newRow("b", "14")));
	expectedOutput.add(new StreamRecord<>(newRow("c", "2null")));
	testHarness.waitForTaskCompletion();
	TestHarnessUtil.assertOutputEquals("Output was not correct.",
			expectedOutput,
			transformToBinary(testHarness.getOutput()));
}
 
示例21
/**
 * Execute {@link StreamOperator#close()} of each operator in the chain of this
 * {@link StreamTask}. Closing happens from <b>head to tail</b> operator in the chain,
 * contrary to {@link StreamOperator#open()} which happens <b>tail to head</b>
 * (see {@link #openAllOperators()}.
 */
private void closeAllOperators() throws Exception {
	// We need to close them first to last, since upstream operators in the chain might emit
	// elements in their close methods.
	StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
	for (int i = allOperators.length - 1; i >= 0; i--) {
		StreamOperator<?> operator = allOperators[i];
		if (operator != null) {
			operator.close();
		}
	}
}
 
示例22
/**
 * Execute {@link StreamOperator#dispose()} of each operator in the chain of this
 * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
 */
private void tryDisposeAllOperators() throws Exception {
	for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
		if (operator != null) {
			operator.dispose();
		}
	}
}
 
示例23
/**
 * Execute @link StreamOperator#dispose()} of each operator in the chain of this
 * {@link StreamTask}. Disposing happens from <b>tail to head</b> operator in the chain.
 *
 * <p>The difference with the {@link #tryDisposeAllOperators()} is that in case of an
 * exception, this method catches it and logs the message.
 */
private void disposeAllOperators() {
	if (operatorChain != null) {
		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
			try {
				if (operator != null) {
					operator.dispose();
				}
			}
			catch (Throwable t) {
				LOG.error("Error during disposal of stream operator.", t);
			}
		}
	}
}
 
示例24
@Before
public void before() throws Exception {
	mailbox = new TaskMailboxImpl();
	decorator = new CountingStreamTaskActionExecutor();
	task = new StreamTask<Object, StreamOperator<Object>>(new StreamTaskTest.DeclineDummyEnvironment(), null, FatalExitExceptionHandler.INSTANCE, decorator, mailbox) {
		@Override
		protected void init() {
		}

		@Override
		protected void processInput(MailboxDefaultAction.Controller controller) {
		}
	};
	task.operatorChain = new OperatorChain<>(task, new NonRecordWriter<>());
}
 
示例25
@VisibleForTesting
OperatorChain(
		StreamOperator<?>[] allOperators,
		RecordWriterOutput<?>[] streamOutputs,
		WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainEntryPoint,
		OP headOperator) {

	this.allOperators = checkNotNull(allOperators);
	this.streamOutputs = checkNotNull(streamOutputs);
	this.chainEntryPoint = checkNotNull(chainEntryPoint);
	this.headOperator = checkNotNull(headOperator);
}
 
示例26
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
	// go forward through the operator chain and tell each operator
	// to prepare the checkpoint
	final StreamOperator<?>[] operators = this.allOperators;
	for (int i = operators.length - 1; i >= 0; --i) {
		final StreamOperator<?> op = operators[i];
		if (op != null) {
			op.prepareSnapshotPreBarrier(checkpointId);
		}
	}
}
 
示例27
private static StreamOperator<?> streamOperatorWithSnapshot(OperatorSnapshotFutures operatorSnapshotResult) throws Exception {
	StreamOperator<?> operator = mock(StreamOperator.class);
	when(operator.getOperatorID()).thenReturn(new OperatorID());

	when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
		.thenReturn(operatorSnapshotResult);

	return operator;
}
 
示例28
private OperatorSnapshotFutures buildOperatorSnapshotFutures(
		CheckpointMetaData checkpointMetaData,
		CheckpointOptions checkpointOptions,
		OperatorChain<?, ?> operatorChain,
		StreamOperator<?> op,
		Supplier<Boolean> isCanceled,
		ChannelStateWriteResult channelStateWriteResult,
		CheckpointStreamFactory storage) throws Exception {
	OperatorSnapshotFutures snapshotInProgress = checkpointStreamOperator(
		op,
		checkpointMetaData,
		checkpointOptions,
		storage,
		isCanceled);
	if (op == operatorChain.getHeadOperator()) {
		snapshotInProgress.setInputChannelStateFuture(
			channelStateWriteResult
				.getInputChannelStateHandles()
				.thenApply(StateObjectCollection::new)
				.thenApply(SnapshotResult::of));
	}
	if (op == operatorChain.getTailOperator()) {
		snapshotInProgress.setResultSubpartitionStateFuture(
			channelStateWriteResult
				.getResultSubpartitionStateHandles()
				.thenApply(StateObjectCollection::new)
				.thenApply(SnapshotResult::of));
	}
	return snapshotInProgress;
}
 
示例29
public AbstractStreamOperatorTestHarness(
		StreamOperator<OUT> operator,
		int maxParallelism,
		int parallelism,
		int subtaskIndex) throws Exception {
	this(
		operator,
		maxParallelism,
		parallelism,
		subtaskIndex,
		new OperatorID());
}
 
示例30
public AbstractStreamOperatorTestHarness(
		StreamOperator<OUT> operator,
		int maxParallelism,
		int parallelism,
		int subtaskIndex) throws Exception {
	this(
		operator,
		maxParallelism,
		parallelism,
		subtaskIndex,
		new OperatorID());
}