Skip to content

Commit 9cc6373

Browse files
authored
Lucene query improvements (#311)
* Rework lucene collectors, add executor service Reworks collectors to use executor service, removes unnecceary collectors depending on provided params * Simplify stats collector logic * PR feedback - add indexing sorting * Revert "Remove overrides for ramBuffer, compound file" This reverts commit e82bddd. * PR feedback * Fix count discrepancy * PR feedback
1 parent bfea814 commit 9cc6373

File tree

3 files changed

+82
-36
lines changed

3 files changed

+82
-36
lines changed

kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
2222
import org.apache.lucene.index.SnapshotDeletionPolicy;
2323
import org.apache.lucene.search.SearcherManager;
24+
import org.apache.lucene.search.Sort;
25+
import org.apache.lucene.search.SortField;
2426
import org.apache.lucene.store.FSDirectory;
2527
import org.apache.lucene.store.MMapDirectory;
2628
import org.slf4j.Logger;
@@ -139,6 +141,14 @@ private IndexWriterConfig buildIndexWriterConfig(
139141
new IndexWriterConfig(analyzer)
140142
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
141143
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry))
144+
// we sort by timestamp descending, as that is the order we expect to return results the
145+
// majority of the time
146+
.setIndexSort(
147+
new Sort(
148+
new SortField(
149+
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
150+
SortField.Type.LONG,
151+
true)))
142152
.setIndexDeletionPolicy(snapshotDeletionPolicy);
143153

144154
if (config.enableTracing) {

kaldb/src/main/java/com/slack/kaldb/logstore/search/LogIndexSearcherImpl.java

+56-15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.nio.file.Path;
2121
import java.util.ArrayList;
22+
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.concurrent.TimeUnit;
@@ -29,16 +30,17 @@
2930
import org.apache.lucene.queryparser.classic.QueryParser;
3031
import org.apache.lucene.search.BooleanClause.Occur;
3132
import org.apache.lucene.search.BooleanQuery.Builder;
32-
import org.apache.lucene.search.Collector;
33+
import org.apache.lucene.search.CollectorManager;
3334
import org.apache.lucene.search.IndexSearcher;
34-
import org.apache.lucene.search.MultiCollector;
35+
import org.apache.lucene.search.MultiCollectorManager;
3536
import org.apache.lucene.search.Query;
3637
import org.apache.lucene.search.ScoreDoc;
3738
import org.apache.lucene.search.SearcherManager;
3839
import org.apache.lucene.search.Sort;
3940
import org.apache.lucene.search.SortField;
4041
import org.apache.lucene.search.SortField.Type;
4142
import org.apache.lucene.search.TopFieldCollector;
43+
import org.apache.lucene.search.TopFieldDocs;
4244
import org.apache.lucene.store.MMapDirectory;
4345
import org.slf4j.Logger;
4446
import org.slf4j.LoggerFactory;
@@ -111,30 +113,41 @@ public SearchResult<LogMessage> search(
111113
// This is a useful optimization for indexes that are static.
112114
IndexSearcher searcher = searcherManager.acquire();
113115
try {
114-
TopFieldCollector topFieldCollector = buildTopFieldCollector(howMany);
115-
StatsCollector statsCollector =
116+
List<LogMessage> results;
117+
Histogram histogram = new NoOpHistogramImpl();
118+
119+
CollectorManager<StatsCollector, Histogram> statsCollector =
116120
buildStatsCollector(bucketCount, startTimeMsEpoch, endTimeMsEpoch);
117-
Collector collectorChain = MultiCollector.wrap(topFieldCollector, statsCollector);
118121

119-
searcher.search(query, collectorChain);
120-
List<LogMessage> results;
121122
if (howMany > 0) {
122-
ScoreDoc[] hits = topFieldCollector.topDocs().scoreDocs;
123+
CollectorManager<TopFieldCollector, TopFieldDocs> topFieldCollector =
124+
buildTopFieldCollector(howMany, bucketCount > 0 ? Integer.MAX_VALUE : howMany);
125+
MultiCollectorManager collectorManager;
126+
if (bucketCount > 0) {
127+
collectorManager = new MultiCollectorManager(topFieldCollector, statsCollector);
128+
} else {
129+
collectorManager = new MultiCollectorManager(topFieldCollector);
130+
}
131+
Object[] collector = searcher.search(query, collectorManager);
132+
133+
ScoreDoc[] hits = ((TopFieldDocs) collector[0]).scoreDocs;
123134
results = new ArrayList<>(hits.length);
124135
for (ScoreDoc hit : hits) {
125136
results.add(buildLogMessage(searcher, hit));
126137
}
138+
if (bucketCount > 0) {
139+
histogram = ((Histogram) collector[1]);
140+
}
127141
} else {
128142
results = Collections.emptyList();
143+
histogram = searcher.search(query, statsCollector);
129144
}
130145

131-
Histogram histogram = statsCollector.histogram;
132-
133146
elapsedTime.stop();
134147
return new SearchResult<>(
135148
results,
136149
elapsedTime.elapsed(TimeUnit.MICROSECONDS),
137-
histogram.count(),
150+
bucketCount > 0 ? histogram.count() : results.size(),
138151
histogram.getBuckets(),
139152
0,
140153
0,
@@ -167,22 +180,50 @@ private LogMessage buildLogMessage(IndexSearcher searcher, ScoreDoc hit) {
167180
}
168181
}
169182

170-
private TopFieldCollector buildTopFieldCollector(int howMany) {
183+
/**
184+
* Builds a top field collector for the requested amount of results, with the option to set the
185+
* totalHitsThreshold. If the totalHitsThreshold is set to Integer.MAX_VALUE it will force a
186+
* ScoreMode.COMPLETE, iterating over all documents at the expense of a longer query time. This
187+
* value can be set to equal howMany to allow early exiting (ScoreMode.TOP_SCORES), but should
188+
* only be done when all collectors are tolerant of an early exit.
189+
*/
190+
private CollectorManager<TopFieldCollector, TopFieldDocs> buildTopFieldCollector(
191+
int howMany, int totalHitsThreshold) {
171192
if (howMany > 0) {
172193
SortField sortField = new SortField(SystemField.TIME_SINCE_EPOCH.fieldName, Type.LONG, true);
173-
return TopFieldCollector.create(new Sort(sortField), howMany, howMany);
194+
return TopFieldCollector.createSharedManager(
195+
new Sort(sortField), howMany, null, totalHitsThreshold);
174196
} else {
175197
return null;
176198
}
177199
}
178200

179-
private StatsCollector buildStatsCollector(
201+
private CollectorManager<StatsCollector, Histogram> buildStatsCollector(
180202
int bucketCount, long startTimeMsEpoch, long endTimeMsEpoch) {
181203
Histogram histogram =
182204
bucketCount > 0
183205
? new FixedIntervalHistogramImpl(startTimeMsEpoch, endTimeMsEpoch, bucketCount)
184206
: new NoOpHistogramImpl();
185-
return new StatsCollector(histogram);
207+
208+
return new CollectorManager<>() {
209+
@Override
210+
public StatsCollector newCollector() {
211+
return new StatsCollector(histogram);
212+
}
213+
214+
@Override
215+
public Histogram reduce(Collection<StatsCollector> collectors) {
216+
Histogram histogram = null;
217+
for (StatsCollector collector : collectors) {
218+
if (histogram == null) {
219+
histogram = collector.getHistogram();
220+
} else {
221+
histogram.mergeHistogram(collector.getHistogram().getBuckets());
222+
}
223+
}
224+
return histogram;
225+
}
226+
};
186227
}
187228

188229
private Query buildQuery(

kaldb/src/main/java/com/slack/kaldb/logstore/search/StatsCollector.java

+16-21
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,12 @@
33
import com.slack.kaldb.histogram.Histogram;
44
import com.slack.kaldb.logstore.LogMessage.SystemField;
55
import java.io.IOException;
6-
import org.apache.lucene.index.LeafReader;
76
import org.apache.lucene.index.LeafReaderContext;
87
import org.apache.lucene.index.NumericDocValues;
9-
import org.apache.lucene.search.Collector;
10-
import org.apache.lucene.search.LeafCollector;
11-
import org.apache.lucene.search.Scorable;
128
import org.apache.lucene.search.ScoreMode;
9+
import org.apache.lucene.search.SimpleCollector;
1310

14-
public class StatsCollector implements Collector {
11+
public class StatsCollector extends SimpleCollector {
1512

1613
public final Histogram histogram;
1714
private NumericDocValues docValues;
@@ -23,26 +20,24 @@ public StatsCollector(Histogram histogram) {
2320
}
2421

2522
@Override
26-
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
27-
LeafReader reader = context.reader();
28-
docValues = reader.getNumericDocValues(SystemField.TIME_SINCE_EPOCH.fieldName);
29-
30-
return new LeafCollector() {
31-
@Override
32-
public void setScorer(Scorable scorer) {}
33-
34-
@Override
35-
public void collect(int doc) throws IOException {
36-
if (docValues != null && docValues.advanceExact(doc)) {
37-
long timestamp = docValues.longValue();
38-
histogram.add(timestamp);
39-
}
40-
}
41-
};
23+
protected void doSetNextReader(final LeafReaderContext context) throws IOException {
24+
docValues = context.reader().getNumericDocValues(SystemField.TIME_SINCE_EPOCH.fieldName);
25+
}
26+
27+
public Histogram getHistogram() {
28+
return histogram;
4229
}
4330

4431
@Override
4532
public ScoreMode scoreMode() {
4633
return ScoreMode.COMPLETE_NO_SCORES;
4734
}
35+
36+
@Override
37+
public void collect(int doc) throws IOException {
38+
if (docValues != null && docValues.advanceExact(doc)) {
39+
long timestamp = docValues.longValue();
40+
histogram.add(timestamp);
41+
}
42+
}
4843
}

0 commit comments

Comments
 (0)