Java源码示例:org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup

示例1
@Test
public void testWatermarkMetrics() throws Exception {
	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
	OperatorID headOperatorId = new OperatorID();

	OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
	OperatorID chainedOperatorId = new OperatorID();

	testHarness.setupOperatorChain(headOperatorId, headOperator)
		.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
		.finish();

	InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
		@Override
		public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
			if (id.equals(headOperatorId)) {
				return headOperatorMetricGroup;
			} else if (id.equals(chainedOperatorId)) {
				return chainedOperatorMetricGroup;
			} else {
				return super.getOrAddOperator(id, name);
			}
		}
	};

	StreamMockEnvironment env = new StreamMockEnvironment(
		testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
		@Override
		public TaskMetricGroup getMetricGroup() {
			return taskMetricGroup;
		}
	};

	testHarness.invoke(env);
	testHarness.waitForTaskRunning();

	Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
	Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);

	Assert.assertEquals("A metric was registered multiple times.",
		5,
		new HashSet<>(Arrays.asList(
			taskInputWatermarkGauge,
			headInputWatermarkGauge,
			headOutputWatermarkGauge,
			chainedInputWatermarkGauge,
			chainedOutputWatermarkGauge))
			.size());

	Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(1L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(2L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.endInput();
	testHarness.waitForTaskCompletion();
}
 
示例2
@Test
public void testWatermarkMetrics() throws Exception {
	final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap());
	final OperatorID headOperatorId = new OperatorID();

	OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
	OperatorID chainedOperatorId = new OperatorID();

	testHarness.setupOperatorChain(headOperatorId, headOperator)
		.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
		.finish();

	InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
		@Override
		public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
			if (id.equals(headOperatorId)) {
				return headOperatorMetricGroup;
			} else if (id.equals(chainedOperatorId)) {
				return chainedOperatorMetricGroup;
			} else {
				return super.getOrAddOperator(id, name);
			}
		}
	};

	StreamMockEnvironment env = new StreamMockEnvironment(
		testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
		@Override
		public TaskMetricGroup getMetricGroup() {
			return taskMetricGroup;
		}
	};

	testHarness.invoke(env);
	testHarness.waitForTaskRunning();

	Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
	Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
	Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
	Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);

	Assert.assertEquals("A metric was registered multiple times.",
		7,
		new HashSet<>(Arrays.asList(
			taskInputWatermarkGauge,
			headInput1WatermarkGauge,
			headInput2WatermarkGauge,
			headInputWatermarkGauge,
			headOutputWatermarkGauge,
			chainedInputWatermarkGauge,
			chainedOutputWatermarkGauge))
			.size());

	Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(1L), 0, 0);
	testHarness.waitForInputProcessing();
	Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(2L), 1, 0);
	testHarness.waitForInputProcessing();
	Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(3L), 0, 0);
	testHarness.waitForInputProcessing();
	Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.endInput();
	testHarness.waitForTaskCompletion();
}
 
示例3
@Test
public void testWatermarkMetrics() throws Exception {
	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
	OperatorID headOperatorId = new OperatorID();

	OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
	OperatorID chainedOperatorId = new OperatorID();

	testHarness.setupOperatorChain(headOperatorId, headOperator)
		.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
		.finish();

	InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
		@Override
		public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
			if (id.equals(headOperatorId)) {
				return headOperatorMetricGroup;
			} else if (id.equals(chainedOperatorId)) {
				return chainedOperatorMetricGroup;
			} else {
				return super.getOrAddOperator(id, name);
			}
		}
	};

	StreamMockEnvironment env = new StreamMockEnvironment(
		testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
		@Override
		public TaskMetricGroup getMetricGroup() {
			return taskMetricGroup;
		}
	};

	testHarness.invoke(env);
	testHarness.waitForTaskRunning();

	Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
	Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);

	Assert.assertEquals("A metric was registered multiple times.",
		5,
		new HashSet<>(Arrays.asList(
			taskInputWatermarkGauge,
			headInputWatermarkGauge,
			headOutputWatermarkGauge,
			chainedInputWatermarkGauge,
			chainedOutputWatermarkGauge))
			.size());

	Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(1L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(2L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.endInput();
	testHarness.waitForTaskCompletion();
}
 
示例4
@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
	final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

	OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
	OperatorID headOperatorId = new OperatorID();

	OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
	OperatorID chainedOperatorId = new OperatorID();

	testHarness.setupOperatorChain(headOperatorId, headOperator)
		.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
		.finish();

	InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
		@Override
		public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
			if (id.equals(headOperatorId)) {
				return headOperatorMetricGroup;
			} else if (id.equals(chainedOperatorId)) {
				return chainedOperatorMetricGroup;
			} else {
				return super.getOrAddOperator(id, name);
			}
		}
	};

	StreamMockEnvironment env = new StreamMockEnvironment(
		testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
		@Override
		public TaskMetricGroup getMetricGroup() {
			return taskMetricGroup;
		}
	};

	testHarness.invoke(env);
	testHarness.waitForTaskRunning();

	Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
	Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
	Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);

	Assert.assertEquals("A metric was registered multiple times.",
		5,
		new HashSet<>(Arrays.asList(
			taskInputWatermarkGauge,
			headInputWatermarkGauge,
			headOutputWatermarkGauge,
			chainedInputWatermarkGauge,
			chainedOutputWatermarkGauge))
			.size());

	Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(1L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.processElement(new Watermark(2L));
	testHarness.waitForInputProcessing();
	Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
	Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());

	testHarness.endInput();
	testHarness.waitForTaskCompletion();
}
 
示例5
@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
	OperatorID headOperatorId = new OperatorID();
	OperatorID chainedOperatorId = new OperatorID();

	InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
	InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
		@Override
		public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
			if (id.equals(headOperatorId)) {
				return headOperatorMetricGroup;
			} else if (id.equals(chainedOperatorId)) {
				return chainedOperatorMetricGroup;
			} else {
				return super.getOrAddOperator(id, name);
			}
		}
	};

	try (StreamTaskMailboxTestHarness<String> testHarness =
			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
				.addInput(BasicTypeInfo.INT_TYPE_INFO)
				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
				.setupOperatorChain(headOperatorId, new MapToStringMultipleInputOperatorFactory())
				.chain(
					chainedOperatorId,
					new WatermarkMetricOperator(),
					BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
				.finish()
				.setTaskMetricGroup(taskMetricGroup)
				.build()) {
		Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
		Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1));
		Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2));
		Gauge<Long> headInput3WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3));
		Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
		Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
		Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
		Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);

		assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

		testHarness.processElement(new Watermark(1L), 0);
		assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
		assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

		testHarness.processElement(new Watermark(2L), 1);
		assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
		assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
		assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
		assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());

		testHarness.processElement(new Watermark(2L), 2);
		assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
		assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
		assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
		assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
		assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
		assertEquals(1L, headOutputWatermarkGauge.getValue().longValue());
		assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue());
		assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue());

		testHarness.processElement(new Watermark(4L), 0);
		testHarness.processElement(new Watermark(3L), 1);
		assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
		assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
		assertEquals(4L, headInput1WatermarkGauge.getValue().longValue());
		assertEquals(3L, headInput2WatermarkGauge.getValue().longValue());
		assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
		assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
		assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
		assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());

		testHarness.endInput();
		testHarness.waitForTaskCompletion();
	}
}