diff --git a/README b/README index daec8e4..43077a7 100644 --- a/README +++ b/README @@ -11,7 +11,10 @@ 1. Accumulo, Hadoop, and ZooKeeper must be installed and running 2. One or more wikipedia dump files (http://dumps.wikimedia.org/backup-index.html) placed in an HDFS directory. You will want to grab the files with the link name of pages-articles.xml.bz2 - + 3. Though not strictly required, the ingest will go more quickly if the files are decompressed: + + $ bunzip2 < enwiki-*-pages-articles.xml.bz2 | hadoop fs -put - /wikipedia/enwiki-pages-articles.xml + INSTRUCTIONS ------------ @@ -70,4 +73,4 @@ log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1 This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted. - \ No newline at end of file + diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java index e682f2f..c582cbf 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaInputFormat.java @@ -133,10 +133,4 @@ public List getSplits(JobContext job) throws IOException { public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { return new AggregatingRecordReader(); } - - @Override - protected boolean isSplitable(JobContext context, Path file) { - return false; - } - } diff --git a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java index fc328cc..8565b09 100644 --- a/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java +++ b/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaMapper.java @@ -119,6 +119,8 @@ public static int getPartitionId(Article article, int numPartitions) throws Ille return article.getId() % numPartitions; } + static HashSet metadataSent = new HashSet(); + @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Article article = extractor.extract(new InputStreamReader(new ByteArrayInputStream(value.getBytes()), UTF8)); @@ -137,9 +139,13 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep for (Entry entry : article.getFieldValues().entrySet()) { m.put(colfPrefix + article.getId(), entry.getKey() + NULL_BYTE + entry.getValue().toString(), cv, article.getTimestamp(), NULL_VALUE); // Create mutations for the metadata table. - Mutation mm = new Mutation(entry.getKey()); - mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); + String metadataKey = entry.getKey() + METADATA_EVENT_COLUMN_FAMILY + language; + if (!metadataSent.contains(metadataKey)) { + Mutation mm = new Mutation(entry.getKey()); + mm.put(METADATA_EVENT_COLUMN_FAMILY, language, cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + metadataSent.add(metadataKey); + } } // Tokenize the content @@ -182,10 +188,13 @@ protected void map(LongWritable key, Text value, Context context) throws IOExcep context.write(reverseIndexTableName, grm); // Create mutations for the metadata table. - Mutation mm = new Mutation(index.getKey()); - mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); - context.write(metadataTableName, mm); - + String metadataKey = index.getKey() + METADATA_INDEX_COLUMN_FAMILY + language; + if (!metadataSent.contains(metadataKey)) { + Mutation mm = new Mutation(index.getKey()); + mm.put(METADATA_INDEX_COLUMN_FAMILY, language + NULL_BYTE + LcNoDiacriticsNormalizer.class.getName(), cv, article.getTimestamp(), NULL_VALUE); + context.write(metadataTableName, mm); + metadataSent.add(metadataKey); + } } // Add the entire text to the document section of the table. // row is the partition, colf is 'd', colq is language\0articleid, value is Base64 encoded GZIP'd document