Java源码示例:org.apache.flink.graph.examples.IncrementalSSSP
示例1
@Test
public void testIncrementalSSSP() throws Exception {
IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
expected = IncrementalSSSPData.RESULTED_VERTICES;
}
示例2
@Test
public void testIncrementalSSSPNonSPEdge() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
// the edge to be removed is a non-SP edge
Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
// Assumption: all minimum weight paths are kept
Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
// remove the edge
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
if (IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
parameters.setDirection(EdgeDirection.IN);
parameters.setOptDegrees(true);
// run the scatter gather iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
new IncrementalSSSP.VertexDistanceUpdater(),
IncrementalSSSPData.NUM_VERTICES, parameters);
DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
} else {
vertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
}
expected = IncrementalSSSPData.VERTICES;
}
示例3
@Test
public void testIncrementalSSSP() throws Exception {
IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
expected = IncrementalSSSPData.RESULTED_VERTICES;
}
示例4
@Test
public void testIncrementalSSSPNonSPEdge() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
// the edge to be removed is a non-SP edge
Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
// Assumption: all minimum weight paths are kept
Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
// remove the edge
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
if (IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
parameters.setDirection(EdgeDirection.IN);
parameters.setOptDegrees(true);
// run the scatter gather iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
new IncrementalSSSP.VertexDistanceUpdater(),
IncrementalSSSPData.NUM_VERTICES, parameters);
DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
} else {
vertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
}
expected = IncrementalSSSPData.VERTICES;
}
示例5
@Test
public void testIncrementalSSSP() throws Exception {
IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
expected = IncrementalSSSPData.RESULTED_VERTICES;
}
示例6
@Test
public void testIncrementalSSSPNonSPEdge() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
// the edge to be removed is a non-SP edge
Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
// Assumption: all minimum weight paths are kept
Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
// remove the edge
graph.removeEdge(edgeToBeRemoved);
// configure the iteration
ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
if (IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
parameters.setDirection(EdgeDirection.IN);
parameters.setOptDegrees(true);
// run the scatter gather iteration to propagate info
Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
new IncrementalSSSP.VertexDistanceUpdater(),
IncrementalSSSPData.NUM_VERTICES, parameters);
DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
resultedVertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
} else {
vertices.writeAsCsv(resultPath, "\n", ",");
env.execute();
}
expected = IncrementalSSSPData.VERTICES;
}