Java源码示例:cascading.pipe.Each

示例1
public static void main(String[] args) {

		if (args.length < 2) {
			throw new IllegalArgumentException("Please specify input and ouput paths as arguments.");
		}

		Fields token = new Fields( "token", String.class );
		Fields text = new Fields( "text" );
		RegexSplitGenerator splitter = new RegexSplitGenerator( token, "\\s+" );
		// only returns "token"
		Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );

		Pipe wcPipe = new Pipe( "wc", docPipe );
		wcPipe = new AggregateBy( wcPipe, token, new CountBy(new Fields("count")));

		Tap inTap = new Hfs(new TextDelimited(text, "\n" ), args[0]);
		Tap outTap = new Hfs(new TextDelimited(false, "\n"), args[1], SinkMode.REPLACE);

		FlowDef flowDef = FlowDef.flowDef().setName( "wc" )
				.addSource( docPipe, inTap )
				.addTailSink( wcPipe, outTap );

		FlowConnector flowConnector = new FlinkConnector();

		Flow wcFlow = flowConnector.connect( flowDef );

		wcFlow.complete();
	}
 
示例2
public static void main(String[] args) {
  String fooInputPath = args[0];
  String barInputPath = args[1];
  String outputPath = args[2];
  int fooValMax = Integer.parseInt(args[3]);
  int joinValMax = Integer.parseInt(args[4]);
  int numberOfReducers = Integer.parseInt(args[5]);

  Properties properties = new Properties();
  AppProps.setApplicationJarClass(properties,
      JoinFilterExampleCascading.class);
  properties.setProperty("mapred.reduce.tasks", Integer.toString(numberOfReducers));
  properties.setProperty("mapreduce.job.reduces", Integer.toString(numberOfReducers));
  
  SpillableProps props = SpillableProps.spillableProps()
      .setCompressSpill( true )
      .setMapSpillThreshold( 50 * 1000 );
      

  
  HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);

  // create source and sink taps
  Fields fooFields = new Fields("fooId", "fooVal", "foobarId");
  Tap fooTap = new Hfs(new TextDelimited(fooFields, "|"), fooInputPath);
  Fields barFields = new Fields("barId", "barVal");
  Tap barTap = new Hfs(new TextDelimited(barFields, "|"), barInputPath);

  Tap outputTap = new Hfs(new TextDelimited(false, "|"), outputPath);

  Fields joinFooFields = new Fields("foobarId");
  Fields joinBarFields = new Fields("barId");

  Pipe fooPipe = new Pipe("fooPipe");
  Pipe barPipe = new Pipe("barPipe");

  Pipe fooFiltered = new Each(fooPipe, fooFields, new FooFilter(fooValMax));

  Pipe joinedPipe = new HashJoin(fooFiltered, joinFooFields, barPipe,
      joinBarFields);
  props.setProperties( joinedPipe.getConfigDef(), Mode.REPLACE );
  
  
  Fields joinFields = new Fields("fooId", "fooVal", "foobarId", "barVal");
  Pipe joinedFilteredPipe = new Each(joinedPipe, joinFields,
      new JoinedFilter(joinValMax));

  FlowDef flowDef = FlowDef.flowDef().setName("wc")
      .addSource(fooPipe, fooTap).addSource(barPipe, barTap)
      .addTailSink(joinedFilteredPipe, outputTap);

  Flow wcFlow = flowConnector.connect(flowDef);
  wcFlow.writeDOT("dot/wc.dot");
  wcFlow.complete();
}