Java源码示例:org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup
示例1
@Test
public void testWatermarkMetrics() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
OperatorID headOperatorId = new OperatorID();
OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
OperatorID chainedOperatorId = new OperatorID();
testHarness.setupOperatorChain(headOperatorId, headOperator)
.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};
testHarness.invoke(env);
testHarness.waitForTaskRunning();
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Assert.assertEquals("A metric was registered multiple times.",
5,
new HashSet<>(Arrays.asList(
taskInputWatermarkGauge,
headInputWatermarkGauge,
headOutputWatermarkGauge,
chainedInputWatermarkGauge,
chainedOutputWatermarkGauge))
.size());
Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L));
testHarness.waitForInputProcessing();
Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L));
testHarness.waitForInputProcessing();
Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
示例2
@Test
public void testWatermarkMetrics() throws Exception {
final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
CoStreamMap<String, Integer, String> headOperator = new CoStreamMap<>(new IdentityMap());
final OperatorID headOperatorId = new OperatorID();
OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
OperatorID chainedOperatorId = new OperatorID();
testHarness.setupOperatorChain(headOperatorId, headOperator)
.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};
testHarness.invoke(env);
testHarness.waitForTaskRunning();
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Assert.assertEquals("A metric was registered multiple times.",
7,
new HashSet<>(Arrays.asList(
taskInputWatermarkGauge,
headInput1WatermarkGauge,
headInput2WatermarkGauge,
headInputWatermarkGauge,
headOutputWatermarkGauge,
chainedInputWatermarkGauge,
chainedOutputWatermarkGauge))
.size());
Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L), 0, 0);
testHarness.waitForInputProcessing();
Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L), 1, 0);
testHarness.waitForInputProcessing();
Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(3L), 0, 0);
testHarness.waitForInputProcessing();
Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
示例3
@Test
public void testWatermarkMetrics() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
OperatorID headOperatorId = new OperatorID();
OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
OperatorID chainedOperatorId = new OperatorID();
testHarness.setupOperatorChain(headOperatorId, headOperator)
.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};
testHarness.invoke(env);
testHarness.waitForTaskRunning();
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Assert.assertEquals("A metric was registered multiple times.",
5,
new HashSet<>(Arrays.asList(
taskInputWatermarkGauge,
headInputWatermarkGauge,
headOutputWatermarkGauge,
chainedInputWatermarkGauge,
chainedOutputWatermarkGauge))
.size());
Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L));
testHarness.waitForInputProcessing();
Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L));
testHarness.waitForInputProcessing();
Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
示例4
@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
OneInputStreamOperator<String, String> headOperator = new WatermarkMetricOperator();
OperatorID headOperatorId = new OperatorID();
OneInputStreamOperator<String, String> chainedOperator = new WatermarkMetricOperator();
OperatorID chainedOperatorId = new OperatorID();
testHarness.setupOperatorChain(headOperatorId, headOperator)
.chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
StreamMockEnvironment env = new StreamMockEnvironment(
testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, new TestTaskStateManager()) {
@Override
public TaskMetricGroup getMetricGroup() {
return taskMetricGroup;
}
};
testHarness.invoke(env);
testHarness.waitForTaskRunning();
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Assert.assertEquals("A metric was registered multiple times.",
5,
new HashSet<>(Arrays.asList(
taskInputWatermarkGauge,
headInputWatermarkGauge,
headOutputWatermarkGauge,
chainedInputWatermarkGauge,
chainedOutputWatermarkGauge))
.size());
Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L));
testHarness.waitForInputProcessing();
Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L));
testHarness.waitForInputProcessing();
Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(8L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
示例5
@Test
@SuppressWarnings("unchecked")
public void testWatermarkMetrics() throws Exception {
OperatorID headOperatorId = new OperatorID();
OperatorID chainedOperatorId = new OperatorID();
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
return headOperatorMetricGroup;
} else if (id.equals(chainedOperatorId)) {
return chainedOperatorMetricGroup;
} else {
return super.getOrAddOperator(id, name);
}
}
};
try (StreamTaskMailboxTestHarness<String> testHarness =
new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
.setupOperatorChain(headOperatorId, new MapToStringMultipleInputOperatorFactory())
.chain(
chainedOperatorId,
new WatermarkMetricOperator(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
.finish()
.setTaskMetricGroup(taskMetricGroup)
.build()) {
Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(1));
Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(2));
Gauge<Long> headInput3WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName(3));
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(1L), 0);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L), 1);
assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headInput3WatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(Long.MIN_VALUE, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(2L), 2);
assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
assertEquals(1L, headOutputWatermarkGauge.getValue().longValue());
assertEquals(1L, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(2L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.processElement(new Watermark(4L), 0);
testHarness.processElement(new Watermark(3L), 1);
assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
assertEquals(4L, headInput1WatermarkGauge.getValue().longValue());
assertEquals(3L, headInput2WatermarkGauge.getValue().longValue());
assertEquals(2L, headInput3WatermarkGauge.getValue().longValue());
assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
assertEquals(4L, chainedOutputWatermarkGauge.getValue().longValue());
testHarness.endInput();
testHarness.waitForTaskCompletion();
}
}