This repository has been archived by the owner on Aug 19, 2020. It is now read-only.
forked from thinkaurelius/faunus
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor FaunusCompiler to allow for getting job status
This patch splits FaunusCompiler into two classes. The first factors out the management of running the multistage job into a FaunusJobControl class. The second class is a FaunusTool that implements the hadoop Tool interface and uses the FaunusJobControl to run the jobs. This makes FaunusCompiler a bit more orthogonal This closes thinkaurelius#164.
- Loading branch information
Erick Tryzelaar
committed
Dec 10, 2013
1 parent
3b3fad2
commit 0226d4e
Showing
4 changed files
with
234 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusJobControl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package com.thinkaurelius.faunus.mapreduce; | ||
|
||
import com.thinkaurelius.faunus.FaunusGraph; | ||
import com.thinkaurelius.faunus.Tokens; | ||
import com.thinkaurelius.faunus.formats.FormatTools; | ||
import com.thinkaurelius.faunus.formats.JobConfigurationFormat; | ||
import org.apache.hadoop.fs.FileStatus; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.mapreduce.Job; | ||
import org.apache.hadoop.util.StringUtils; | ||
import org.apache.log4j.Logger; | ||
|
||
import java.io.IOException; | ||
import java.util.*; | ||
|
||
public class FaunusJobControl implements Runnable { | ||
|
||
public static final Logger logger = Logger.getLogger(FaunusJobControl.class); | ||
|
||
private final FaunusGraph graph; | ||
private final LinkedList<Job> jobsInProgress = new LinkedList<>(); | ||
private Job runningJob = null; | ||
private final LinkedList<Job> successfulJobs = new LinkedList<>(); | ||
private final LinkedList<Job> failedJobs = new LinkedList<>(); | ||
|
||
public FaunusJobControl(FaunusGraph graph, Collection<Job> jobs) { | ||
this.graph = graph; | ||
jobsInProgress.addAll(jobs); | ||
} | ||
|
||
private List<Job> toList(LinkedList<Job> jobs) { | ||
ArrayList<Job> retv = new ArrayList<>(); | ||
synchronized (jobs) { | ||
retv.addAll(jobs); | ||
} | ||
return retv; | ||
} | ||
|
||
public List<Job> getJobsInProgress() { | ||
return toList(jobsInProgress); | ||
} | ||
|
||
synchronized public Job getRunningJob() { | ||
return runningJob; | ||
} | ||
|
||
public List<Job> getSuccessfulJobs() { | ||
return toList(successfulJobs); | ||
} | ||
|
||
public List<Job> getFailedJobs() { | ||
return toList(failedJobs); | ||
} | ||
|
||
synchronized public boolean allFinished() { | ||
return jobsInProgress.isEmpty(); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
try { | ||
final FileSystem hdfs = FileSystem.get(this.graph.getConf()); | ||
if (this.graph.getOutputLocationOverwrite() && hdfs.exists(this.graph.getOutputLocation())) { | ||
hdfs.delete(this.graph.getOutputLocation(), true); | ||
} | ||
|
||
int jobCount = this.jobsInProgress.size(); | ||
final String jobPath = this.graph.getOutputLocation().toString() + "/" + Tokens.JOB; | ||
|
||
Iterator<Job> it = jobsInProgress.iterator(); | ||
for (int i = 0; i < jobCount; ++i) { | ||
synchronized (this) { | ||
runningJob = it.next(); | ||
|
||
try { | ||
((JobConfigurationFormat) (FormatTools.getBaseOutputFormatClass(runningJob).newInstance())).updateJob(runningJob); | ||
} catch (final Exception e) { | ||
} | ||
|
||
logger.info("Executing job " + (i + 1) + " out of " + jobCount + ": " + runningJob.getJobName()); | ||
logger.info("Job data location: " + jobPath + "-" + i); | ||
|
||
runningJob.submit(); | ||
} | ||
|
||
boolean success = runningJob.waitForCompletion(true); | ||
|
||
synchronized (this) { | ||
if (success) { | ||
successfulJobs.add(runningJob); | ||
} else { | ||
failedJobs.add(runningJob); | ||
} | ||
|
||
it.remove(); | ||
runningJob = null; | ||
} | ||
|
||
if (i > 0) { | ||
final Path path = new Path(jobPath + "-" + (i - 1)); | ||
// delete previous intermediate graph data | ||
for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.GRAPH + "*"))) { | ||
hdfs.delete(temp.getPath(), true); | ||
} | ||
// delete previous intermediate graph data | ||
for (final FileStatus temp : hdfs.globStatus(new Path(path.toString() + "/" + Tokens.PART + "*"))) { | ||
hdfs.delete(temp.getPath(), true); | ||
} | ||
} | ||
if (!success) { | ||
logger.error("Faunus job error -- remaining MapReduce jobs have been canceled"); | ||
break; | ||
} | ||
} | ||
|
||
} catch (Throwable t) { | ||
logger.error("Error while trying to run jobs.", t); | ||
// Mark all the jobs as failed because we got something bad. | ||
failAllJobs(t); | ||
} | ||
} | ||
|
||
synchronized private void failAllJobs(Throwable t) { | ||
String message = "Unexpected System Error Occurred " + StringUtils.stringifyException(t); | ||
|
||
if (runningJob != null) { | ||
try { | ||
runningJob.killJob(); | ||
} catch (IOException e) { | ||
logger.error("Error trying to clean up " + runningJob.getJobName(), e); | ||
} finally { | ||
failedJobs.add(runningJob); | ||
runningJob = null; | ||
} | ||
} | ||
} | ||
} |
64 changes: 64 additions & 0 deletions
64
src/main/java/com/thinkaurelius/faunus/mapreduce/FaunusTool.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package com.thinkaurelius.faunus.mapreduce; | ||
|
||
import com.thinkaurelius.faunus.FaunusGraph; | ||
import org.apache.hadoop.conf.Configured; | ||
import org.apache.hadoop.mapreduce.Job; | ||
import org.apache.hadoop.util.Tool; | ||
import org.apache.log4j.Logger; | ||
|
||
import java.util.List; | ||
|
||
public class FaunusTool extends Configured implements Tool { | ||
public static final Logger logger = Logger.getLogger(FaunusTool.class); | ||
|
||
private final FaunusGraph graph; | ||
private final List<Job> jobs; | ||
|
||
public FaunusTool(FaunusGraph graph, List<Job> jobs) { | ||
this.graph = graph; | ||
this.jobs = jobs; | ||
} | ||
|
||
@Override | ||
public int run(String[] args) throws Exception { | ||
|
||
String script = null; | ||
boolean showHeader = true; | ||
|
||
if (args.length == 2) { | ||
script = args[0]; | ||
showHeader = Boolean.valueOf(args[1]); | ||
} | ||
|
||
if (showHeader) { | ||
logger.info("Faunus: Graph Analytics Engine"); | ||
logger.info(" ,"); | ||
logger.info(" , |\\ ,__"); | ||
logger.info(" |\\ \\/ `\\"); | ||
logger.info(" \\ `-.:. `\\"); | ||
logger.info(" `-.__ `\\/\\/\\|"); | ||
logger.info(" / `'/ () \\"); | ||
logger.info(" .' /\\ )"); | ||
logger.info(" .-' .'| \\ \\__"); | ||
logger.info(" .' __( \\ '`(()"); | ||
logger.info("/_.'` `. | )("); | ||
logger.info(" \\ |"); | ||
logger.info(" |/"); | ||
} | ||
|
||
if (null != script && !script.isEmpty()) { | ||
logger.info("Generating job chain: " + script); | ||
} | ||
|
||
logger.info("Compiled to " + this.jobs.size() + " MapReduce job(s)"); | ||
|
||
FaunusJobControl faunusJobControl = new FaunusJobControl(graph, jobs); | ||
faunusJobControl.run(); | ||
|
||
if (faunusJobControl.getFailedJobs().size() == 0) { | ||
return 0; | ||
} else { | ||
return -1; | ||
} | ||
} | ||
} |