Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
ACCUMULO-381 added a bulk ingest option for wikisearch ingest
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1243506 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Adam Fuchs committed Feb 13, 2012
1 parent 72b7221 commit b4f3087
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 12 deletions.
16 changes: 16 additions & 0 deletions ingest/conf/wikipedia_parallel.xml.example
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,20 @@
<name>wikipedia.run.ingest</name>
<value><!--whether to run the ingest step --></value>
</property>
<property>
<name>wikipedia.bulk.ingest</name>
<value><!--whether to use bulk ingest vice streaming ingest --></value>
</property>
<property>
<name>wikipedia.bulk.ingest.dir</name>
<value><!--the directory to store rfiles for bulk ingest --></value>
</property>
<property>
<name>wikipedia.bulk.ingest.failure.dir</name>
<value><!--the directory to store failed rfiles after bulk ingest --></value>
</property>
<property>
<name>wikipedia.bulk.ingest.buffer.size</name>
<value><!--the ammount of memory to use for buffering and sorting key/value pairs in each mapper before writing rfiles --></value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public class WikipediaConfiguration {

public final static String RUN_PARTITIONER = "wikipedia.run.partitioner";
public final static String RUN_INGEST = "wikipedia.run.ingest";
public final static String BULK_INGEST = "wikipedia.bulk.ingest";
public final static String BULK_INGEST_DIR = "wikipedia.bulk.ingest.dir";
public final static String BULK_INGEST_FAILURE_DIR = "wikipedia.bulk.ingest.failure.dir";
public final static String BULK_INGEST_BUFFER_SIZE = "wikipedia.bulk.ingest.buffer.size";


public static String getUser(Configuration conf) {
Expand Down Expand Up @@ -134,6 +138,22 @@ public static boolean runIngest(Configuration conf) {
return conf.getBoolean(RUN_INGEST, true);
}

public static boolean bulkIngest(Configuration conf) {
return conf.getBoolean(BULK_INGEST, true);
}

public static String bulkIngestDir(Configuration conf) {
return conf.get(BULK_INGEST_DIR);
}

public static String bulkIngestFailureDir(Configuration conf) {
return conf.get(BULK_INGEST_FAILURE_DIR);
}

public static long bulkIngestBufferSize(Configuration conf) {
return conf.getLong(BULK_INGEST_BUFFER_SIZE,1l<<28);
}

/**
* Helper method to get properties from Hadoop configuration
*
Expand Down Expand Up @@ -169,5 +189,5 @@ else if (resultClass.equals(Double.class))
throw new IllegalArgumentException(resultClass.getSimpleName() + " is unhandled.");

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;
import org.apache.accumulo.examples.wikisearch.output.SortingRFileOutputFormat;
import org.apache.accumulo.examples.wikisearch.reader.AggregatingRecordReader;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -53,7 +54,6 @@
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
Expand Down Expand Up @@ -140,7 +140,13 @@ public int run(String[] args) throws Exception {
return result;
}
if(WikipediaConfiguration.runIngest(conf))
return runIngestJob();
{
int result = runIngestJob();
if(result != 0)
return result;
if(WikipediaConfiguration.bulkIngest(conf))
return loadBulkFiles();
}
return 0;
}

Expand Down Expand Up @@ -195,11 +201,6 @@ public int runIngestJob() throws Exception

String tablename = WikipediaConfiguration.getTableName(ingestConf);

String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);

String user = WikipediaConfiguration.getUser(ingestConf);
byte[] password = WikipediaConfiguration.getPassword(ingestConf);
Connector connector = WikipediaConfiguration.getConnector(ingestConf);

TableOperations tops = connector.tableOperations();
Expand All @@ -217,13 +218,47 @@ public int runIngestJob() throws Exception
// setup output format
ingestJob.setMapOutputKeyClass(Text.class);
ingestJob.setMapOutputValueClass(Mutation.class);
ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);

if(WikipediaConfiguration.bulkIngest(ingestConf))
{
ingestJob.setOutputFormatClass(AccumuloOutputFormat.class);
String zookeepers = WikipediaConfiguration.getZookeepers(ingestConf);
String instanceName = WikipediaConfiguration.getInstanceName(ingestConf);
String user = WikipediaConfiguration.getUser(ingestConf);
byte[] password = WikipediaConfiguration.getPassword(ingestConf);
AccumuloOutputFormat.setOutputInfo(ingestJob.getConfiguration(), user, password, true, tablename);
AccumuloOutputFormat.setZooKeeperInstance(ingestJob.getConfiguration(), instanceName, zookeepers);
} else {
ingestJob.setOutputFormatClass(SortingRFileOutputFormat.class);
SortingRFileOutputFormat.setMaxBufferSize(ingestConf, WikipediaConfiguration.bulkIngestBufferSize(ingestConf));
SortingRFileOutputFormat.setPathName(ingestConf, WikipediaConfiguration.bulkIngestDir(ingestConf));
}

return ingestJob.waitForCompletion(true) ? 0 : 1;
}

public int loadBulkFiles() throws IOException, AccumuloException, AccumuloSecurityException, TableNotFoundException
{
Configuration conf = getConf();

Connector connector = WikipediaConfiguration.getConnector(conf);

FileSystem fs = FileSystem.get(conf);
String directory = WikipediaConfiguration.bulkIngestDir(conf);

String failureDirectory = WikipediaConfiguration.bulkIngestFailureDir(conf);

for(FileStatus status: fs.listStatus(new Path(directory)))
{
if(status.isDir() == false)
continue;
Path dir = status.getPath();
connector.tableOperations().importDirectory(dir.getName(), dir.toString(), failureDirectory+"/"+dir.getName(), true);
}

return 0;
}

public final static PathFilter partFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
Expand All @@ -241,7 +276,6 @@ protected void configurePartitionerJob(Job job) {

protected void configureIngestJob(Job job) {
job.setJarByClass(WikipediaPartitionedIngester.class);
job.setInputFormatClass(WikipediaInputFormat.class);
}

protected static final Pattern filePattern = Pattern.compile("([a-z_]+).*.xml(.bz2)?");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.apache.accumulo.examples.wikisearch.output;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.file.rfile.RFileOperations;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

final class BufferingRFileRecordWriter extends RecordWriter<Text,Mutation> {
private final long maxSize;
private final AccumuloConfiguration acuconf;
private final Configuration conf;
private final String filenamePrefix;
private final String taskID;
private final FileSystem fs;
private int fileCount = 0;
private long size;

private Map<Text,TreeMap<Key,Value>> buffers = new HashMap<Text,TreeMap<Key,Value>>();
private Map<Text,Long> bufferSizes = new HashMap<Text,Long>();

private TreeMap<Key,Value> getBuffer(Text tablename)
{
TreeMap<Key,Value> buffer = buffers.get(tablename);
if(buffer == null)
{
buffer = new TreeMap<Key,Value>();
buffers.put(tablename, buffer);
bufferSizes.put(tablename, 0l);
}
return buffer;
}

private Text getLargestTablename()
{
long max = 0;
Text table = null;
for(Entry<Text,Long> e:bufferSizes.entrySet())
{
if(e.getValue() > max)
{
max = e.getValue();
table = e.getKey();
}
}
return table;
}

private void flushLargestTable() throws IOException
{
Text tablename = getLargestTablename();
if(tablename == null)
return;
long bufferSize = bufferSizes.get(tablename);
TreeMap<Key,Value> buffer = buffers.get(tablename);
if (buffer.size() == 0)
return;

// TODO fix the filename
String file = filenamePrefix + "/" + tablename + "/" + taskID + "_" + (fileCount++) + ".rf";
FileSKVWriter writer = RFileOperations.getInstance().openWriter(file, fs, conf, acuconf);

// forget locality groups for now, just write everything to the default
writer.startDefaultLocalityGroup();

for (Entry<Key,Value> e : buffer.entrySet()) {
writer.append(e.getKey(), e.getValue());
}

writer.close();

size -= bufferSize;
buffer.clear();
bufferSizes.put(tablename, 0l);
}

BufferingRFileRecordWriter(long maxSize, AccumuloConfiguration acuconf, Configuration conf, String filenamePrefix, String taskID, FileSystem fs) {
this.maxSize = maxSize;
this.acuconf = acuconf;
this.conf = conf;
this.filenamePrefix = filenamePrefix;
this.taskID = taskID;
this.fs = fs;
}

@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
while(size > 0)
flushLargestTable();
}

@Override
public void write(Text table, Mutation mutation) throws IOException, InterruptedException {
TreeMap<Key,Value> buffer = getBuffer(table);
int mutationSize = 0;
for(ColumnUpdate update: mutation.getUpdates())
{
Key k = new Key(mutation.getRow(),update.getColumnFamily(),update.getColumnQualifier(),update.getColumnVisibility(),update.getTimestamp(),update.isDeleted());
Value v = new Value(update.getValue());
mutationSize += k.getSize();
mutationSize += v.getSize();
buffer.put(k, v);
}
size += mutationSize;
long bufferSize = bufferSizes.get(table);
bufferSize += mutationSize;
bufferSizes.put(table, bufferSize);

// TODO add object overhead size

while (size >= maxSize) {
flushLargestTable();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.apache.accumulo.examples.wikisearch.output;

import java.io.IOException;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;

public class SortingRFileOutputFormat extends OutputFormat<Text,Mutation> {

public static final String PATH_NAME = "sortingrfileoutputformat.path";
public static final String MAX_BUFFER_SIZE = "sortingrfileoutputformat.max.buffer.size";

public static void setPathName(Configuration conf, String path) {
conf.set(PATH_NAME, path);
}

public static String getPathName(Configuration conf) {
return conf.get(PATH_NAME);
}

public static void setMaxBufferSize(Configuration conf, long maxBufferSize) {
conf.setLong(MAX_BUFFER_SIZE, maxBufferSize);
}

public static long getMaxBufferSize(Configuration conf) {
return conf.getLong(MAX_BUFFER_SIZE, -1);
}

@Override
public void checkOutputSpecs(JobContext job) throws IOException, InterruptedException {
// TODO make sure the path is writable?
// TODO make sure the max buffer size is set and is reasonable
}

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException {
return new OutputCommitter() {

@Override
public void setupTask(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void setupJob(JobContext arg0) throws IOException {
// TODO Auto-generated method stub

}

@Override
public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub
return false;
}

@Override
public void commitTask(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void cleanupJob(JobContext arg0) throws IOException {
// TODO Auto-generated method stub

}

@Override
public void abortTask(TaskAttemptContext arg0) throws IOException {
// TODO Auto-generated method stub

}
};
}

@Override
public RecordWriter<Text,Mutation> getRecordWriter(TaskAttemptContext attempt) throws IOException, InterruptedException {

// grab the configuration
final Configuration conf = attempt.getConfiguration();
// create a filename
final String filenamePrefix = getPathName(conf);
final String taskID = attempt.getTaskAttemptID().toString();
// grab the max size
final long maxSize = getMaxBufferSize(conf);
// grab the FileSystem
final FileSystem fs = FileSystem.get(conf);
// create a default AccumuloConfiguration
final AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();

return new BufferingRFileRecordWriter(maxSize, acuconf, conf, filenamePrefix, taskID, fs);
}

}

0 comments on commit b4f3087

Please sign in to comment.