Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
ACCUMULO-375
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243961 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Adam Fuchs committed Feb 14, 2012
1 parent b4f3087 commit 0e1e67d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
Expand All @@ -58,9 +59,12 @@
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class WikipediaPartitionedIngester extends Configured implements Tool {


private static final Logger log = Logger.getLogger(WikipediaPartitionedIngester.class);

public final static String INGEST_LANGUAGE = "wikipedia.ingest_language";
public final static String SPLIT_FILE = "wikipedia.split_file";
public final static String TABLE_NAME = "wikipedia.table";
Expand Down Expand Up @@ -150,7 +154,7 @@ public int run(String[] args) throws Exception {
return 0;
}

public int runPartitionerJob() throws Exception
private int runPartitionerJob() throws Exception
{
Job partitionerJob = new Job(getConf(), "Partition Wikipedia");
Configuration partitionerConf = partitionerJob.getConfiguration();
Expand Down Expand Up @@ -191,7 +195,7 @@ public int runPartitionerJob() throws Exception
return partitionerJob.waitForCompletion(true) ? 0 : 1;
}

public int runIngestJob() throws Exception
private int runIngestJob() throws Exception
{
Job ingestJob = new Job(getConf(), "Ingest Partitioned Wikipedia");
Configuration ingestConf = ingestJob.getConfiguration();
Expand Down Expand Up @@ -221,23 +225,29 @@ public int runIngestJob() throws Exception

if(WikipediaConfiguration.bulkIngest(ingestConf))
{
ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
String bulkIngestDir = WikipediaConfiguration.bulkIngestDir(ingestConf);
if(bulkIngestDir == null)
{
log.error("Bulk ingest dir not set");
return 1;
}
SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
} else {
ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
String user = WikipediaConfiguration.getUser(ingestConf);
byte[] password = WikipediaConfiguration.getPassword(ingestConf);
AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
} else {
ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
}

return ingestJob.waitForCompletion(true) ? 0 : 1;
}

public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
private int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
{
Configuration conf = getConf();

Expand All @@ -253,7 +263,9 @@ public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecuri
if(status.isDir() == false)
continue;
Path dir = status.getPath();
connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
Path failPath = new Path(failureDirectory+"/"+dir.getName());
fs.mkdirs(failPath);
connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failPath.toString(), true);
}

return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand All @@ -12,9 +13,12 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;

public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {


private static final Logger log = Logger.getLogger(SortingRFileOutputFormat.class);

public static final String PATH_NAME = "sortingrfileoutputformat.path";
public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";

Expand Down

0 comments on commit 0e1e67d

Please sign in to comment.