From e1dfeb6ebbb147baad576dee055ef462a61a9fff Mon Sep 17 00:00:00 2001 From: "Eric C. Newton" Date: Mon, 12 Mar 2012 19:06:21 +0000 Subject: [PATCH] ACCUMULO-446 ACCUMULO-447 fix "or" conditions, partition ranges, jump's and document debug logging git-svn-id: https://svn.apache.org/repos/asf/incubator/accumulo/branches/1.4@1299791 13f79535-47bb-0310-9956-ffa450edef68 --- README | 8 +++ .../wikisearch/iterator/AndIterator.java | 41 +++++-------- .../iterator/BooleanLogicIterator.java | 58 +++---------------- .../iterator/FieldIndexIterator.java | 45 +++++++++----- .../wikisearch/iterator/OrIterator.java | 7 +-- .../wikisearch/logic/AbstractQueryLogic.java | 16 ++--- .../examples/wikisearch/logic/QueryLogic.java | 9 +-- .../wikisearch/parser/RangeCalculator.java | 32 +++++----- .../examples/wikisearch/query/Query.java | 4 -- .../wikisearch/logic/TestQueryLogic.java | 4 -- 10 files changed, 83 insertions(+), 141 deletions(-) diff --git a/README b/README index 4844fe6..daec8e4 100644 --- a/README +++ b/README @@ -63,3 +63,11 @@ There are two parameters to the REST service, query and auths. The query parameter is the same string that you would type into the search box at ui.jsp, and the auths parameter is a comma-separated list of wikis that you want to search (i.e. enwiki,frwiki,dewiki, etc. Or you can use all) + + 10. Optional. Add the following line to the $ACCUMULO_HOME/conf/log4j.properties file to turn off debug messages in the specialized + iterators, which will dramatically increase performance: + + log4j.logger.org.apache.accumulo.examples.wikisearch.iterator=INFO,A1 + + This needs to be propagated to all the tablet server nodes, and accumulo needs to be restarted. + \ No newline at end of file diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/AndIterator.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/AndIterator.java index 74fbc0c..5ace7c8 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/AndIterator.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/AndIterator.java @@ -759,18 +759,19 @@ public void seek(Range range, Collection seekColumnFamilies, boole log.debug("In AndIterator.seek()"); log.debug("AndIterator.seek Given range => " + range); } - // if (firstSeek) { + currentRow = new Text(); + currentDocID.set(emptyByteArray); + doSeek(range, seekColumnFamilies, inclusive); + } + + private void doSeek(Range range, Collection seekColumnFamilies, boolean inclusive) throws IOException { + overallRange = new Range(range); - // firstSeek = false; - // } + if (range.getEndKey() != null && range.getEndKey().getRow() != null) { this.parentEndRow = range.getEndKey().getRow(); } - // overallRange = new Range(range); - currentRow = new Text(); - currentDocID.set(emptyByteArray); - this.seekColumnFamilies = seekColumnFamilies; this.inclusive = inclusive; @@ -801,7 +802,7 @@ public void seek(Range range, Collection seekColumnFamilies, boole if (overallRange != null && !overallRange.contains(topKey)) { topKey = null; if (log.isDebugEnabled()) { - log.debug("seek, topKey is outside of overall range: " + overallRange); + log.debug("doSeek, topKey is outside of overall range: " + overallRange); } } } @@ -853,16 +854,7 @@ public boolean jump(Key jumpKey) throws IOException { if (log.isDebugEnabled()) { log.debug("jump called, but topKey is null, must need to move to next row"); } - // call seek with the jumpKey - - Key endKey = null; - if (parentEndRow != null) { - endKey = new Key(parentEndRow); - } - Range newRange = new Range(jumpKey, true, endKey, false); - this.seek(newRange, seekColumnFamilies, false); - // the parent seek should account for the endKey range check. - return hasTop(); + return false; } else { int comp = this.topKey.getRow().compareTo(jumpKey.getRow()); @@ -909,16 +901,13 @@ public boolean jump(Key jumpKey) throws IOException { if (log.isDebugEnabled()) { log.debug("jump, uid jump"); } - // move one, and then advanceToIntersection will move the rest. Text row = jumpKey.getRow(); - String cq = topKey.getColumnQualifier().toString(); - cq = cq.replaceAll(myUid, jumpUid); - - Key startKey = buildKey(row, topKey.getColumnFamily(), new Text(cq)); - Range range = new Range(startKey, true, null, false); - sources[0].iter.seek(range, seekColumnFamilies, true); - advanceToIntersection(); + Range range = new Range(row); + this.currentRow = row; + this.currentDocID = new Text(this.getUID(jumpKey)); + doSeek(range, seekColumnFamilies, false); + // make sure it is in the range if we have one. if (hasTop() && parentEndRow != null && topKey.getRow().compareTo(parentEndRow) > 0) { topKey = null; diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/BooleanLogicIterator.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/BooleanLogicIterator.java index e2d8d89..09ad8d3 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/BooleanLogicIterator.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/BooleanLogicIterator.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; @@ -41,9 +40,9 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.examples.wikisearch.parser.JexlOperatorConstants; import org.apache.accumulo.examples.wikisearch.parser.QueryParser; -import org.apache.accumulo.examples.wikisearch.parser.TreeNode; import org.apache.accumulo.examples.wikisearch.parser.QueryParser.QueryTerm; import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator.RangeBounds; +import org.apache.accumulo.examples.wikisearch.parser.TreeNode; import org.apache.accumulo.examples.wikisearch.util.FieldIndexKeyParser; import org.apache.commons.jexl2.parser.ASTAndNode; import org.apache.commons.jexl2.parser.ASTEQNode; @@ -63,7 +62,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; - import com.google.common.collect.Multimap; public class BooleanLogicIterator implements SortedKeyValueIterator, OptionDescriber { @@ -593,7 +591,6 @@ private void handleOR(BooleanLogicTreeNode me) { // 3 cases for child: SEL, AND, OR // and negation BooleanLogicTreeNode child = (BooleanLogicTreeNode) children.nextElement(); - // if (child.getType() == BooleanLogicTreeNode.NodeType.SEL || child.getType() == BooleanLogicTreeNode.NodeType.AND) { if (child.getType() == ParserTreeConstants.JJTEQNODE || child.getType() == ParserTreeConstants.JJTNENODE || child.getType() == ParserTreeConstants.JJTANDNODE || child.getType() == ParserTreeConstants.JJTERNODE || child.getType() == ParserTreeConstants.JJTNRNODE || child.getType() == ParserTreeConstants.JJTLENODE @@ -1504,48 +1501,14 @@ private boolean jump(Key jumpKey) throws IOException { if (log.isDebugEnabled()) { log.debug("jump, All leaves need to advance to: " + jumpKey); } - - Key sKeyRow = new Key(jumpKey.getRow()); - Key eKeyRow = new Key(jumpKey.followingKey(PartialKey.ROW)); - Range rowRange = new Range(sKeyRow, true, eKeyRow, false); - - if (log.isDebugEnabled()) { - log.debug("jump, RowRange: " + rowRange); - } - + String advanceUid = getIndexKeyUid(jumpKey); if (log.isDebugEnabled()) { log.debug("advanceUid => " + advanceUid); } boolean ok = true; for (BooleanLogicTreeNode leaf : positives) { - if (leaf.hasTop() && leaf.getTopKey().getRow().toString().compareTo(jumpKey.getRow().toString()) < 0) { - // seek - if (log.isDebugEnabled()) { - log.debug("row Jump on leaf: " + leaf); - } - ok = leaf.jump(jumpKey); - // leaf.seek(rowRange, EMPTY_COL_FAMS, true); - - } else if (leaf.hasTop() && leaf.getTopKey().getRow().toString().compareTo(jumpKey.getRow().toString()) == 0) { - // compare the uid's - if (log.isDebugEnabled()) { - log.debug("leaf topKey: " + leaf.getTopKey()); - log.debug("advanceUid: " + advanceUid + " leafUid: " + getEventKeyUid(leaf.getTopKey())); - } - - if (getEventKeyUid(leaf.getTopKey()).compareTo(advanceUid) < 0) { - if (log.isDebugEnabled()) { - log.debug("uid Jump on leaf: " + leaf); - } - ok = leaf.jump(jumpKey); - } - } else { - if (log.isDebugEnabled()) { - log.debug("this leaf no jump: " + leaf); - } - continue; - } + leaf.jump(jumpKey); } return ok; } @@ -1842,23 +1805,20 @@ public void seek(Range range, Collection columnFamilies, boolean i resetNegatives(); // test Tree, if it's not valid, call next - if (testTreeState()) { + if (testTreeState() && overallRange.contains(root.getTopKey())) { if (!negatives.isEmpty()) { // now advance negatives advanceNegatives(this.root.getTopKey()); if (!testTreeState()) { - if (overallRange.contains(root.getTopKey())) { - next(); - } else { - setTopKey(null); - return; - } + next(); } } - log.debug("overallRange " + overallRange + " topKey " + this.root.getTopKey() + " contains " + overallRange.contains(this.root.getTopKey())); + if (log.isDebugEnabled()) { + log.debug("overallRange " + overallRange + " topKey " + this.root.getTopKey() + " contains " + overallRange.contains(this.root.getTopKey())); + } - if (overallRange.contains(this.root.getTopKey())) { + if (overallRange.contains(this.root.getTopKey()) && this.root.isValid()) { setTopKey(this.root.getTopKey()); } else { setTopKey(null); diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/FieldIndexIterator.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/FieldIndexIterator.java index d3d285f..ad39ab3 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/FieldIndexIterator.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/FieldIndexIterator.java @@ -22,15 +22,6 @@ import java.util.HashMap; import java.util.Map; -import org.apache.commons.jexl2.Expression; -import org.apache.commons.jexl2.JexlContext; -import org.apache.commons.jexl2.JexlEngine; -import org.apache.commons.jexl2.MapContext; -import org.apache.commons.jexl2.parser.ParserTreeConstants; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -40,6 +31,14 @@ import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.examples.wikisearch.function.QueryFunctions; import org.apache.accumulo.examples.wikisearch.util.FieldIndexKeyParser; +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.MapContext; +import org.apache.commons.jexl2.parser.ParserTreeConstants; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; /** * This iterator should only return keys from the fi\0{fieldName}:{fieldValue} part of the shard table. Expect topKey to be CF, {datatype}\0{UID} @@ -474,13 +473,15 @@ public boolean jump(Key jumpKey) throws IOException { } } else if (comp < 0) { // a row behind jump key, need to move forward - String myRow = ""; - if (hasTop()) { - myRow = topKey.getRow().toString(); - } else if (currentRow != null) { - myRow = currentRow.toString(); + if (log.isDebugEnabled()) { + String myRow = ""; + if (hasTop()) { + myRow = topKey.getRow().toString(); + } else if (currentRow != null) { + myRow = currentRow.toString(); + } + log.debug("My row " + myRow + " is less than jump row: " + jumpKey.getRow() + " seeking"); } - log.debug("My row " + myRow + " is less than jump row: " + jumpKey.getRow() + " seeking"); range = buildRange(jumpKey.getRow()); // this.seek(range, EMPTY_COL_FAMS, false); @@ -521,8 +522,20 @@ public boolean jump(Key jumpKey) throws IOException { } if (ucomp < 0) { // need to move up log.debug("my uid is less than jumpUid, topUid: " + myUid + " jumpUid: " + jumpUid); + + Text cq = jumpKey.getColumnQualifier(); + int index = cq.find(NULL_BYTE); + if (0 <= index) { + cq.set(cq.getBytes(), index + 1, cq.getLength() - index - 1); + } else { + log.error("Expected a NULL separator in the column qualifier"); + this.topKey = null; + this.topValue = null; + return false; + } + // note my internal range stays the same, I just need to move forward - Key startKey = new Key(topKey.getRow(), fName, new Text(fValue + NULL_BYTE + jumpKey.getColumnQualifier())); + Key startKey = new Key(topKey.getRow(), fName, new Text(fValue + NULL_BYTE + cq)); Key endKey = new Key(topKey.getRow(), fName, new Text(fValue + ONE_BYTE)); range = new Range(startKey, true, endKey, false); log.debug("Using range: " + range + " to seek"); diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/OrIterator.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/OrIterator.java index a217701..78c8576 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/OrIterator.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/iterator/OrIterator.java @@ -342,10 +342,6 @@ public void seek(Range range, Collection columnFamilies, boolean i log.debug("seek, overallRange: " + overallRange); } - // if (range.getStartKey() != null && range.getStartKey().getRow() != null) { - // this.parentStartRow = range.getStartKey().getRow(); - // } - if (range.getEndKey() != null && range.getEndKey().getRow() != null) { this.parentEndRow = range.getEndKey().getRow(); } @@ -688,11 +684,12 @@ public boolean jump(Key jumpKey) throws IOException { if (log.isDebugEnabled()) { log.debug("jump called, but ts.topKey is null, this one needs to move to next row."); } + Key startKey = new Key(jumpKey.getRow(), ts.dataLocation, new Text(ts.term + "\0" + jumpKey.getColumnFamily())); Key endKey = null; if (parentEndRow != null) { endKey = new Key(parentEndRow); } - Range newRange = new Range(jumpKey, true, endKey, false); + Range newRange = new Range(startKey, true, endKey, false); ts.iter.seek(newRange, columnFamilies, inclusive); ts.setNew(); advanceToMatch(ts); diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java index cb90e92..5c7c20c 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/AbstractQueryLogic.java @@ -47,12 +47,12 @@ import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.normalizer.Normalizer; import org.apache.accumulo.examples.wikisearch.parser.EventFields; +import org.apache.accumulo.examples.wikisearch.parser.EventFields.FieldValue; import org.apache.accumulo.examples.wikisearch.parser.FieldIndexQueryReWriter; import org.apache.accumulo.examples.wikisearch.parser.JexlOperatorConstants; import org.apache.accumulo.examples.wikisearch.parser.QueryParser; -import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; -import org.apache.accumulo.examples.wikisearch.parser.EventFields.FieldValue; import org.apache.accumulo.examples.wikisearch.parser.QueryParser.QueryTerm; +import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.sample.Document; import org.apache.accumulo.examples.wikisearch.sample.Field; import org.apache.accumulo.examples.wikisearch.sample.Results; @@ -207,7 +207,6 @@ public void add(String term, Range r) { private Kryo kryo = new Kryo(); private EventFields eventFields = new EventFields(); private List unevaluatedFields = null; - private int numPartitions = 0; private Map,Normalizer> normalizerCacheMap = new HashMap,Normalizer>(); private static final String NULL_BYTE = "\u0000"; @@ -395,20 +394,13 @@ public void setUnevaluatedFields(String unevaluatedFieldList) { this.unevaluatedFields.add(field); } - public int getNumPartitions() { - return numPartitions; - } - - public void setNumPartitions(int numPartitions) { - this.numPartitions = numPartitions; - } - public Document createDocument(Key key, Value value) { + Document doc = new Document(); + eventFields.clear(); ByteBuffer buf = ByteBuffer.wrap(value.get()); eventFields.readObjectData(kryo, buf); - Document doc = new Document(); // Set the id to the document id which is located in the colf String row = key.getRow().toString(); String colf = key.getColumnFamily().toString(); diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java index 7d4adc0..bcfeb70 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/logic/QueryLogic.java @@ -33,8 +33,8 @@ import org.apache.accumulo.examples.wikisearch.iterator.EvaluatingIterator; import org.apache.accumulo.examples.wikisearch.normalizer.LcNoDiacriticsNormalizer; import org.apache.accumulo.examples.wikisearch.normalizer.Normalizer; -import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.parser.QueryParser.QueryTerm; +import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; import org.apache.accumulo.examples.wikisearch.protobuf.Uid; import org.apache.accumulo.examples.wikisearch.util.TextUtil; import org.apache.hadoop.io.Text; @@ -90,8 +90,6 @@ public class QueryLogic extends AbstractQueryLogic { protected static Logger log = Logger.getLogger(QueryLogic.class); - private static String startPartition = "0"; - public QueryLogic() { super(); } @@ -106,10 +104,7 @@ protected RangeCalculator getTermIndexInformation(Connector c, Authorizations au } protected Collection getFullScanRange(Date begin, Date end, Multimap terms) { - String startKey = startPartition; - String endKey = Integer.toString(this.getNumPartitions()); - Range r = new Range(startKey, true, endKey, false); - return Collections.singletonList(r); + return Collections.singletonList(new Range()); } @Override diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/parser/RangeCalculator.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/parser/RangeCalculator.java index d416f60..8a5474b 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/parser/RangeCalculator.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/parser/RangeCalculator.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -249,7 +250,6 @@ static class EvaluationContext { protected static Logger log = Logger.getLogger(RangeCalculator.class); private static String WILDCARD = ".*"; private static String SINGLE_WILDCARD = "\\."; - protected static String START_ROW = "0"; protected Connector c; protected Authorizations auths; @@ -258,7 +258,6 @@ static class EvaluationContext { protected String indexTableName; protected String reverseIndexTableName; protected int queryThreads = 8; - protected String END_ROW = null; /* final results of index lookups, ranges for the shard table */ protected Set result = null; @@ -294,7 +293,6 @@ public void execute(Connector c, Authorizations auths, Multimap> indexRanges = new HashMap>(); Map> trailingWildcardRanges = new HashMap>(); @@ -340,9 +338,8 @@ public void execute(Connector c, Authorizations auths, Multimap> ranges = new HashMap>(); MapKey key = new MapKey(entry.getKey().toString(), entry.getValue().getLower().toString()); diff --git a/query/src/main/java/org/apache/accumulo/examples/wikisearch/query/Query.java b/query/src/main/java/org/apache/accumulo/examples/wikisearch/query/Query.java index bffe8ad..d7dab3a 100644 --- a/query/src/main/java/org/apache/accumulo/examples/wikisearch/query/Query.java +++ b/query/src/main/java/org/apache/accumulo/examples/wikisearch/query/Query.java @@ -70,9 +70,6 @@ public class Query implements IQuery { @Resource(name = "tableName") private String tableName; - @Resource(name = "partitions") - private int partitions; - @Resource(name = "threads") private int threads; @@ -235,7 +232,6 @@ public Results query(String query, String auths) { table.setReverseIndexTableName(tableName + "ReverseIndex"); table.setQueryThreads(threads); table.setUnevaluatedFields("TEXT"); - table.setNumPartitions(partitions); table.setUseReadAheadIterator(false); return table.runQuery(connector, authorizations, query, null, null, null); } diff --git a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java index 7276360..8400fb5 100644 --- a/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java +++ b/query/src/test/java/org/apache/accumulo/examples/wikisearch/logic/TestQueryLogic.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaConfiguration; -import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaInputFormat.WikipediaInputSplit; import org.apache.accumulo.examples.wikisearch.ingest.WikipediaMapper; import org.apache.accumulo.examples.wikisearch.parser.RangeCalculator; @@ -50,7 +49,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; @@ -162,8 +160,6 @@ public void setup() throws Exception { table.setIndexTableName(INDEX_TABLE_NAME); table.setReverseIndexTableName(RINDEX_TABLE_NAME); table.setUseReadAheadIterator(false); - table.setNumPartitions(1); - } void debugQuery(String tableName) throws Exception {