Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tooktime #3

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
long startTime = System.nanoTime();
if (searchContext.hasOnlySuggest()) {
suggestProcessor.process(searchContext);
searchContext.queryResult()
.topDocs(
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]
);
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
return;
}

Expand Down Expand Up @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
);
searchContext.queryResult().profileResults(shardResults);
}
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
}

// making public for testing
Expand Down Expand Up @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
private int nodeQueueSize = -1;

private final boolean isNull;
private long tookTimeNanos;

public QuerySearchResult() {
this(false);
Expand Down Expand Up @@ -364,6 +365,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc
nodeQueueSize = in.readInt();
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
setRescoreDocIds(new RescoreDocIds(in));
tookTimeNanos = in.readVLong();
}

@Override
Expand Down Expand Up @@ -406,6 +408,7 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeInt(nodeQueueSize);
out.writeOptionalWriteable(getShardSearchRequest());
getRescoreDocIds().writeTo(out);
out.writeVLong(tookTimeNanos); // VLong as took time should always be positive
}

public TotalHits getTotalHits() {
Expand All @@ -415,4 +418,12 @@ public TotalHits getTotalHits() {
public float getMaxScore() {
return maxScore;
}

public long getTookTimeNanos() {
return tookTimeNanos;
}

public void setTookTimeNanos(long tookTime) {
tookTimeNanos = tookTime;
}
}
116 changes: 116 additions & 0 deletions server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
Expand All @@ -103,9 +108,11 @@
import org.opensearch.lucene.queries.MinDocQuery;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.collapse.CollapseBuilder;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.ScrollContext;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -115,6 +122,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1145,6 +1153,114 @@ public void testQueryTimeoutChecker() throws Exception {
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

public void testQuerySearchResultTookTime() throws IOException {
int sleepMillis = 3000;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

U can consider to randomize this value.
Using randomInBetween

DelayedQueryPhaseSearcher delayedQueryPhaseSearcher = new DelayedQueryPhaseSearcher(sleepMillis);

// we need to test queryPhase.execute(), not executeInternal(), since that's what the timer wraps around
// for that we must set up a searchContext with more functionality than the TestSearchContext,
// which requires a bit of complexity with test classes

Directory dir = newDirectory();
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
IndexWriterConfig iwc = newIndexWriterConfig().setIndexSort(sort);
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
Document doc = new Document();
for (int i = 0; i < 10; i++) {
doc.add(new StringField("foo", Integer.toString(i), Store.NO));
}
w.addDocument(doc);
w.close();
IndexReader reader = DirectoryReader.open(dir);

QueryShardContext queryShardContext = mock(QueryShardContext.class);
when(queryShardContext.fieldMapper("user")).thenReturn(
new NumberFieldType("user", NumberType.INTEGER, true, false, true, false, null, Collections.emptyMap())
);

Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
long nowInMillis = System.currentTimeMillis();
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(randomBoolean());
ShardSearchRequest request = new ShardSearchRequest(
OriginalIndices.NONE,
searchRequest,
shardId,
1,
AliasFilter.EMPTY,
1f,
nowInMillis,
clusterAlias,
Strings.EMPTY_ARRAY
);
TestSearchContextWithRequest searchContext = new TestSearchContextWithRequest(
queryShardContext,
indexShard,
newEarlyTerminationContextSearcher(reader, 0, executor),
request
);
Comment on lines +1164 to +1203
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think u need this change now? Remove it?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute(), which is what the timer wraps, needs to be passed a SearchContext with an actual request in it. I tried passing a dummy request without functionality but it didn't work. So I think we need all of this to set up the request, plus the reader and queryShardContext.


QueryPhase queryPhase = new QueryPhase(delayedQueryPhaseSearcher);
queryPhase.execute(searchContext);
long tookTime = searchContext.queryResult().getTookTimeNanos();
assertTrue(tookTime >= (long) sleepMillis * 1000000);
reader.close();
dir.close();
}

private class TestSearchContextWithRequest extends TestSearchContext {
ShardSearchRequest request;
Query query;

public TestSearchContextWithRequest(
QueryShardContext queryShardContext,
IndexShard indexShard,
ContextIndexSearcher searcher,
ShardSearchRequest request
) {
super(queryShardContext, indexShard, searcher);
this.request = request;
this.query = new TermQuery(new Term("foo", "bar"));
}

@Override
public ShardSearchRequest request() {
return request;
}

@Override
public Query query() {
return this.query;
}
}

private class DelayedQueryPhaseSearcher extends QueryPhase.DefaultQueryPhaseSearcher implements QueryPhaseSearcher {
// add delay into searchWith
private final int sleepMillis;

public DelayedQueryPhaseSearcher(int sleepMillis) {
super();
this.sleepMillis = sleepMillis;
}

@Override
public boolean searchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
try {
Thread.sleep(sleepMillis);
} catch (Exception ignored) {}
return super.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private static QuerySearchResult createTestInstance() throws Exception {
if (randomBoolean()) {
result.aggregations(InternalAggregationsTests.createTestInstance());
}
assertEquals(0, result.getTookTimeNanos());
return result;
}

Expand All @@ -118,6 +119,7 @@ public void testSerialization() throws Exception {
assertEquals(aggs.asList(), deserializedAggs.asList());
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
assertEquals(querySearchResult.getTookTimeNanos(), deserialized.getTookTimeNanos());
}

public void testNullResponse() throws Exception {
Expand Down