Skip to content

Commit

Permalink
Fix faunus edge degrees service
Browse files Browse the repository at this point in the history
It turns out we can't do edge counting in one pass. Instead we
need to have one pass count the edges, and the other to filter
them out. Unfortunately we can't use binary sequence files
as our intermediate format because of this bug:

thinkaurelius/faunus#170
  • Loading branch information
Erick Tryzelaar committed Jan 28, 2014
1 parent 77c2d4f commit cbf8add
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.thinkaurelius.faunus.FaunusGraph;
import com.thinkaurelius.faunus.FaunusPipeline;
import com.thinkaurelius.faunus.formats.graphson.GraphSONOutputFormat;
import com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat;
import com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseOutputFormat;
import com.thinkaurelius.faunus.mapreduce.FaunusCompiler;
Expand All @@ -16,6 +17,7 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.lab41.dendrite.jobs.FaunusJob;
import org.lab41.dendrite.metagraph.DendriteGraph;
Expand All @@ -24,6 +26,7 @@
import org.lab41.dendrite.services.analysis.FaunusPipelineService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

Expand All @@ -35,6 +38,9 @@ public class EdgeDegreesService extends AnalysisService {

Logger logger = LoggerFactory.getLogger(EdgeDegreesService.class);

@Autowired
FaunusPipelineService faunusPipelineService;

@Async
public void titanCountDegrees(DendriteGraph graph, String jobId) throws Exception {

Expand Down Expand Up @@ -129,6 +135,10 @@ private void createIndices(DendriteGraph graph) {
}

private void runFaunus(DendriteGraph graph, String jobId) throws Exception {
// We do the edge counting in two passes. First, we count all the edges and write the graph to a sequence file.
// Second, we load the graph back in filtering out all the edges. We do this because I haven't figured out a
// way to count edges and filter them out at the same time.

FileSystem fs = FileSystem.get(new Configuration());

// Create the temporary directory.
Expand All @@ -137,34 +147,54 @@ private void runFaunus(DendriteGraph graph, String jobId) throws Exception {
fs.deleteOnExit(tmpDir);

try {
FaunusGraph faunusGraph = new FaunusGraph();
FaunusGraph faunusExportGraph = new FaunusGraph();
faunusPipelineService.configureGraph(faunusExportGraph, new Path(tmpDir, "export"), graph);

faunusExportGraph.setGraphInputFormat(TitanHBaseInputFormat.class);

// FIXME: https://github.com/thinkaurelius/faunus/issues/170. The sequence file would be more efficient,
// but it doesn't seem to support vertex query filters.
//faunusExportGraph.setGraphOutputFormat(SequenceFileOutputFormat.class);
faunusExportGraph.setGraphOutputFormat(GraphSONOutputFormat.class);

String sideEffect = "{ it ->\n" +
"it.in_degrees = it.inE().count()\n" +
"it.out_degrees = it.outE().count()\n" +
"it.degrees = it.in_degrees + it.out_degrees\n" +
"}";

FaunusPipeline pipeline = new FaunusPipeline(faunusExportGraph);
pipeline.V().sideEffect(sideEffect);
pipeline.done();

logger.debug("starting export of '" + graph.getId() + "'");

faunusGraph.setGraphInputFormat(TitanHBaseInputFormat.class);
faunusGraph.setGraphOutputFormat(TitanHBaseOutputFormat.class);
FaunusJob faunusJob = new FaunusJob(metaGraphService.getMetaGraph(), jobId, pipeline);
faunusJob.call();

logger.debug("finished export of '" + graph.getId() + "'");

// Filter out all the edges
faunusGraph.getConf().set("faunus.graph.input.vertex-query-filter", "v.query().limit(0)");

String sideEffect =
"{ it ->\n" +
"it.in_degrees = it.inE().count()\n" +
"it.out_degrees = it.outE().count()\n" +
"it.degrees = it.in_degrees + it.out_degrees\n" +
"}";
FaunusPipelineService faunusPipelineService = new FaunusPipelineService();
FaunusPipeline exportPipeline = faunusPipelineService.graphPipeline(faunusGraph, tmpDir, graph);
exportPipeline.V().sideEffect(sideEffect);
exportPipeline.done();

logger.debug("starting export/import of '" + graph.getId() + "'");

FaunusJob faunusJob = new FaunusJob(metaGraphService.getMetaGraph(), jobId, exportPipeline);
FaunusGraph faunusImportGraph = faunusExportGraph.getNextGraph();
faunusPipelineService.configureGraph(faunusImportGraph, new Path(tmpDir, "import"), graph);

faunusImportGraph.setGraphOutputFormat(TitanHBaseOutputFormat.class);
faunusImportGraph.getConf().set("faunus.graph.input.vertex-query-filter", "v.query().limit(0)");

pipeline = new FaunusPipeline(faunusImportGraph);
pipeline.V();
pipeline.done();

logger.debug("starting import of '" + graph.getId() + "'");

faunusJob = new FaunusJob(metaGraphService.getMetaGraph(), jobId, pipeline);
faunusJob.call();

logger.debug("finished export/import of '" + graph.getId() + "'");
logger.debug("finished import of '" + graph.getId() + "'");

} finally {
// Clean up after ourselves.
fs.delete(tmpDir, true);
//fs.delete(tmpDir, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
@Service
public class FaunusPipelineService extends AnalysisService {

Logger logger = LoggerFactory.getLogger(FaunusPipelineService.class);
static Logger logger = LoggerFactory.getLogger(FaunusPipelineService.class);

public FaunusPipeline graphPipeline(FaunusGraph faunusGraph, Path tmpDir, DendriteGraph graph) {
public void configureGraph(FaunusGraph faunusGraph, Path tmpDir, DendriteGraph graph) {
org.apache.commons.configuration.Configuration config = graph.getConfiguration();

faunusGraph.getConf().set("mapred.jar", "../faunus/target/faunus-0.4.1-Lab41-SNAPSHOT-job.jar");
Expand Down Expand Up @@ -62,8 +62,6 @@ public FaunusPipeline graphPipeline(FaunusGraph faunusGraph, Path tmpDir, Dendri
setProp(faunusConfig, "faunus.graph.output.titan.storage.index.search.local-mode", config.getString("storage.index.search.local-mode", null));

faunusGraph.getConf().set("faunus.graph.output.blueprints.script-file", "dendrite/dendrite-import.groovy");

return new FaunusPipeline(faunusGraph);
}

private void setProp(Configuration config, String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private void runExport(DendriteGraph graph, String jobId, Path exportDir) throws
faunusGraph.setGraphInputFormat(TitanHBaseInputFormat.class);
faunusGraph.setGraphOutputFormat(AdjacencyFileOutputFormat.class);

FaunusPipeline exportPipeline = faunusPipelineService.graphPipeline(faunusGraph, exportDir, graph);
faunusPipelineService.configureGraph(faunusGraph, exportDir, graph);
FaunusPipeline exportPipeline = new FaunusPipeline(faunusGraph);
exportPipeline._();

exportPipeline.done();
Expand Down

0 comments on commit cbf8add

Please sign in to comment.