Java源码示例:org.apache.flink.optimizer.testfunctions.IdentityGroupReducerCombinable
示例1
@Test
public void testPartitionCustomOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionCustom(new Partitioner<Long>() {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例2
@Test
public void testRangePartitionOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionByRange(1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());
SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
assertEquals(2, sourceOutgoingChannels.size());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例3
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例4
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例5
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例6
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
.output(new DiscardingOutputFormat<Pojo2>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例7
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo3> data = env.fromElements(new Pojo3())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
.output(new DiscardingOutputFormat<Pojo3>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例8
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo4> data = env.fromElements(new Pojo4())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
.output(new DiscardingOutputFormat<Pojo4>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例9
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例10
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例11
@Test
public void testPartitionCustomOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionCustom(new Partitioner<Long>() {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例12
@Test
public void testRangePartitionOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionByRange(1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());
SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
assertEquals(2, sourceOutgoingChannels.size());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例13
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例14
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例15
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例16
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
.output(new DiscardingOutputFormat<Pojo2>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例17
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo3> data = env.fromElements(new Pojo3())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
.output(new DiscardingOutputFormat<Pojo3>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例18
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo4> data = env.fromElements(new Pojo4())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
.output(new DiscardingOutputFormat<Pojo4>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例19
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例20
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例21
@Test
public void testPartitionCustomOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionCustom(new Partitioner<Long>() {
public int partition(Long key, int numPartitions) { return key.intValue(); }
}, 1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long, Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitioner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例22
@Test
public void testRangePartitionOperatorPreservesFields() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> data = env.fromCollection(Collections.singleton(new Tuple2<>(0L, 0L)));
data.partitionByRange(1)
.groupBy(1)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Long,Long>>())
.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode partitionNode = (SingleInputPlanNode)reducer.getInput().getSource();
SingleInputPlanNode partitionIDRemover = (SingleInputPlanNode) partitionNode.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, partitionNode.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitionIDRemover.getInput().getShipStrategy());
SourcePlanNode sourcePlanNode = op.getDataSources().iterator().next();
List<Channel> sourceOutgoingChannels = sourcePlanNode.getOutgoingChannels();
assertEquals(2, sourceOutgoingChannels.size());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(0).getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, sourceOutgoingChannels.get(1).getShipStrategy());
assertEquals(DataExchangeMode.PIPELINED, sourceOutgoingChannels.get(0).getDataExchangeMode());
assertEquals(DataExchangeMode.BATCH, sourceOutgoingChannels.get(1).getDataExchangeMode());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例23
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例24
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例25
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple4<Integer,Integer,Integer, Integer>> data = env.fromElements(new Tuple4<Integer,Integer,Integer,Integer>(0, 0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(0).withPartitioner(new TestPartitionerInt())
.sortGroup(1, Order.ASCENDING)
.sortGroup(2, Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple4<Integer,Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple4<Integer, Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例26
@Test
public void testCustomPartitioningTupleGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo2> data = env.fromElements(new Pojo2())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Pojo2>())
.output(new DiscardingOutputFormat<Pojo2>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例27
@Test
public void testCustomPartitioningTupleGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo3> data = env.fromElements(new Pojo3())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo3>())
.output(new DiscardingOutputFormat<Pojo3>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例28
@Test
public void testCustomPartitioningTupleGroupReduceSorted2() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Pojo4> data = env.fromElements(new Pojo4())
.rebalance().setParallelism(4);
data.groupBy("a").withPartitioner(new TestPartitionerInt())
.sortGroup("b", Order.ASCENDING)
.sortGroup("c", Order.DESCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Pojo4>())
.output(new DiscardingOutputFormat<Pojo4>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例29
@Test
public void testCustomPartitioningKeySelectorGroupReduce() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer, Integer>(0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple2<Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.reduceGroup(new IdentityGroupReducerCombinable<Tuple2<Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
示例30
@Test
public void testCustomPartitioningKeySelectorGroupReduceSorted() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Integer, Integer>> data = env.fromElements(new Tuple3<Integer, Integer, Integer>(0, 0, 0))
.rebalance().setParallelism(4);
data.groupBy(new TestKeySelector<Tuple3<Integer,Integer,Integer>>())
.withPartitioner(new TestPartitionerInt())
.sortGroup(new TestKeySelector<Tuple3<Integer, Integer, Integer>>(), Order.ASCENDING)
.reduceGroup(new IdentityGroupReducerCombinable<Tuple3<Integer,Integer,Integer>>())
.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
SinkPlanNode sink = op.getDataSinks().iterator().next();
SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getInput().getSource();
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_CUSTOM, reducer.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}