Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
ACCUMULO-471 document the ability to run run over uncompressed data; …
Browse files Browse the repository at this point in the history
…allow the input to be split, don't send millions of duplicate metadata table entries

git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1302537 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Eric C. Newton committed Mar 19, 2012
1 parent 66bb45c commit 2c1666f
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
7 changes: 5 additions & 2 deletions README
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
1. Accumulo, Hadoop, and ZooKeeper must be installed and running
2. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory.
You will want to grab the files with the link name of pages-articles.xml.bz2

3. Though not strictly required, the ingest will go more quickly if the files are decompressed:

$ bunzip2 < enwiki-*-pages-articles.xml.bz2 | hadoop fs -put - /wikipedia/enwiki-pages-articles.xml


INSTRUCTIONS
------------
Expand Down Expand Up @@ -70,4 +73,4 @@
log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1

This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted.


Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,4 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new AggregatingRecordReader();
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public static int getPartitionId(Article article, int numPartitions) throws Ille
return article.getId() % numPartitions;
}

static HashSet<String> metadataSent = new HashSet<String>();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8));
Expand All @@ -137,9 +139,13 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
for (Entry<String,Object> entry : article.getFieldValues().entrySet()) {
m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE);
// Create mutations for the metadata table.
Mutation mm = new Mutation(entry.getKey());
mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
context.write(metadataTableName, mm);
String metadataKey = entry.getKey() + METADATA_EVENT_COLUMN_FAMILY + language;
if (!metadataSent.contains(metadataKey)) {
Mutation mm = new Mutation(entry.getKey());
mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE);
context.write(metadataTableName, mm);
metadataSent.add(metadataKey);
}
}

// Tokenize the content
Expand Down Expand Up @@ -182,10 +188,13 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep
context.write(reverseIndexTableName, grm);

// Create mutations for the metadata table.
Mutation mm = new Mutation(index.getKey());
mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
context.write(metadataTableName, mm);

String metadataKey = index.getKey() + METADATA_INDEX_COLUMN_FAMILY + language;
if (!metadataSent.contains(metadataKey)) {
Mutation mm = new Mutation(index.getKey());
mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE);
context.write(metadataTableName, mm);
metadataSent.add(metadataKey);
}
}
// Add the entire text to the document section of the table.
// row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document
Expand Down

0 comments on commit 2c1666f

Please sign in to comment.