public static void main(String[] args) {
if (args.length < 2) {
throw new IllegalArgumentException("Please specify input and ouput paths as arguments.");
}
Fields token = new Fields( "token", String.class );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "\\s+" );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );
Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new AggregateBy( wcPipe, token, new CountBy(new Fields("count")));
Tap inTap = new Hfs(new TextDelimited(text, "\n" ), args[0]);
Tap outTap = new Hfs(new TextDelimited(false, "\n"), args[1], SinkMode.REPLACE);
FlowDef flowDef = FlowDef.flowDef().setName( "wc" )
.addSource( docPipe, inTap )
.addTailSink( wcPipe, outTap );
FlowConnector flowConnector = new FlinkConnector();
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.complete();
}
public static void main(String[] args) {
String fooInputPath = args[0];
String barInputPath = args[1];
String outputPath = args[2];
int fooValMax = Integer.parseInt(args[3]);
int joinValMax = Integer.parseInt(args[4]);
int numberOfReducers = Integer.parseInt(args[5]);
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties,
JoinFilterExampleCascading.class);
properties.setProperty("mapred.reduce.tasks", Integer.toString(numberOfReducers));
properties.setProperty("mapreduce.job.reduces", Integer.toString(numberOfReducers));
SpillableProps props = SpillableProps.spillableProps()
.setCompressSpill( true )
.setMapSpillThreshold( 50 * 1000 );
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
// create source and sink taps
Fields fooFields = new Fields("fooId", "fooVal", "foobarId");
Tap fooTap = new Hfs(new TextDelimited(fooFields, "|"), fooInputPath);
Fields barFields = new Fields("barId", "barVal");
Tap barTap = new Hfs(new TextDelimited(barFields, "|"), barInputPath);
Tap outputTap = new Hfs(new TextDelimited(false, "|"), outputPath);
Fields joinFooFields = new Fields("foobarId");
Fields joinBarFields = new Fields("barId");
Pipe fooPipe = new Pipe("fooPipe");
Pipe barPipe = new Pipe("barPipe");
Pipe fooFiltered = new Each(fooPipe, fooFields, new FooFilter(fooValMax));
Pipe joinedPipe = new HashJoin(fooFiltered, joinFooFields, barPipe,
joinBarFields);
props.setProperties( joinedPipe.getConfigDef(), Mode.REPLACE );
Fields joinFields = new Fields("fooId", "fooVal", "foobarId", "barVal");
Pipe joinedFilteredPipe = new Each(joinedPipe, joinFields,
new JoinedFilter(joinValMax));
FlowDef flowDef = FlowDef.flowDef().setName("wc")
.addSource(fooPipe, fooTap).addSource(barPipe, barTap)
.addTailSink(joinedFilteredPipe, outputTap);
Flow wcFlow = flowConnector.connect(flowDef);
wcFlow.writeDOT("dot/wc.dot");
wcFlow.complete();
}