Skip to content

Commit

Permalink
Enhance empty queue conditional in slicing logic (elastic#114911)
Browse files Browse the repository at this point in the history
With recent changes in Lucene 9.12 around not forking execution when not necessary
(see apache/lucene#13472), we have removed the search
worker thread pool in elastic#111099. The worker thread pool had unlimited queue, and we
feared that we couuld have much more queueing on the search thread pool if we execute
segment level searches on the same thread pool as the shard level searches, because
every shard search would take up to a thread per slice when executing the query phase.

We have then introduced an additional conditional to stop parallelizing when there
is a queue. That is perhaps a bit extreme, as it's a decision made when creating the
searcher, while a queue may no longer be there once the search is executing.
This has caused some benchmarks regressions, given that having a queue may be a transient
scenario, especially with short-lived segment searches being queued up. We may end
up disabling inter-segment concurrency more aggressively than we would want, penalizing
requests that do benefit from concurrency. At the same time, we do want to have some kind
of protection against rejections of shard searches that would be caused by excessive slicing.
When the queue is above a certain size, we can turn off the slicing and effectively disable
inter-segment concurrency. With this commit we set that threshold to be the number of
threads in the search pool.
  • Loading branch information
javanna authored and georgewallace committed Oct 25, 2024
1 parent cffae1f commit 297bd5b
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ static int determineMaximumNumberOfSlices(
ToLongFunction<String> fieldCardinality
) {
return executor instanceof ThreadPoolExecutor tpe
&& tpe.getQueue().isEmpty()
&& tpe.getQueue().size() <= tpe.getMaximumPoolSize()
&& isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection)
? tpe.getMaximumPoolSize()
: 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
Expand Down Expand Up @@ -507,10 +508,10 @@ public void testNewIdLoaderWithTsdbAndRoutingPathMatch() throws Exception {
}
}

public void testDetermineMaximumNumberOfSlices() {
private static ShardSearchRequest createParallelRequest() {
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
ShardSearchRequest parallelReq = new ShardSearchRequest(
return new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(randomBoolean()),
indexShard.shardId(),
Expand All @@ -521,69 +522,74 @@ public void testDetermineMaximumNumberOfSlices() {
System.currentTimeMillis(),
null
);
ShardSearchRequest singleSliceReq = new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(randomBoolean())
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
indexShard.shardId(),
0,
1,
AliasFilter.EMPTY,
1f,
System.currentTimeMillis(),
null
);
int executorPoolSize = randomIntBetween(1, 100);
ExecutorService threadPoolExecutor = EsExecutors.newFixed(
"test",
executorPoolSize,
0,
Thread::new,
new ThreadContext(Settings.EMPTY),
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);
ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
ToLongFunction<String> fieldCardinality = name -> -1;

}

public void testDetermineMaximumNumberOfSlicesNoExecutor() {
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
assertEquals(
executorPoolSize,
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
parallelReq,
null,
createParallelRequest(),
SearchService.ResultsType.DFS,
true,
randomBoolean(),
fieldCardinality
)
);
assertEquals(
executorPoolSize,
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
singleSliceReq,
SearchService.ResultsType.DFS,
true,
null,
createParallelRequest(),
SearchService.ResultsType.QUERY,
randomBoolean(),
fieldCardinality
)
);
}

public void testDetermineMaximumNumberOfSlicesNotThreadPoolExecutor() {
ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool();
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, true, fieldCardinality)
DefaultSearchContext.determineMaximumNumberOfSlices(
notThreadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.DFS,
randomBoolean(),
fieldCardinality
)
);
assertEquals(
executorPoolSize,
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
parallelReq,
notThreadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.QUERY,
true,
randomBoolean(),
fieldCardinality
)
);
}

public void testDetermineMaximumNumberOfSlicesEnableQueryPhaseParallelCollection() {
int executorPoolSize = randomIntBetween(1, 100);
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
"test",
executorPoolSize,
0,
Thread::new,
new ThreadContext(Settings.EMPTY),
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);
ToLongFunction<String> fieldCardinality = name -> -1;
assertEquals(
1,
executorPoolSize,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
singleSliceReq,
createParallelRequest(),
SearchService.ResultsType.QUERY,
true,
fieldCardinality
Expand All @@ -592,54 +598,133 @@ public void testDetermineMaximumNumberOfSlices() {
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
notThreadPoolExecutor,
parallelReq,
SearchService.ResultsType.DFS,
true,
threadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.QUERY,
false,
fieldCardinality
)
);

assertEquals(
executorPoolSize,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
parallelReq,
createParallelRequest(),
SearchService.ResultsType.DFS,
false,
randomBoolean(),
fieldCardinality
)
);
assertEquals(
}

public void testDetermineMaximumNumberOfSlicesSingleSortByField() {
IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0));
ShardSearchRequest singleSliceReq = new ShardSearchRequest(
OriginalIndices.NONE,
new SearchRequest().allowPartialSearchResults(randomBoolean())
.source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))),
indexShard.shardId(),
0,
1,
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, false, fieldCardinality)
AliasFilter.EMPTY,
1f,
System.currentTimeMillis(),
null
);
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };
int executorPoolSize = randomIntBetween(1, 100);
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
"test",
executorPoolSize,
0,
Thread::new,
new ThreadContext(Settings.EMPTY),
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);
// DFS concurrency does not rely on slices, hence it kicks in regardless of the request (supportsParallelCollection is not called)
assertEquals(
1,
executorPoolSize,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
parallelReq,
SearchService.ResultsType.QUERY,
false,
singleSliceReq,
SearchService.ResultsType.DFS,
true,
fieldCardinality
)
);
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.QUERY, false, fieldCardinality)
);
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
notThreadPoolExecutor,
parallelReq,
SearchService.ResultsType.DFS,
false,
threadPoolExecutor,
singleSliceReq,
SearchService.ResultsType.QUERY,
true,
fieldCardinality
)
);
}

public void testDetermineMaximumNumberOfSlicesWithQueue() {
int executorPoolSize = randomIntBetween(1, 100);
ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed(
"test",
executorPoolSize,
1000,
Thread::new,
new ThreadContext(Settings.EMPTY),
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
);
ToLongFunction<String> fieldCardinality = name -> { throw new UnsupportedOperationException(); };

for (int i = 0; i < executorPoolSize; i++) {
assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
assertEquals(
executorPoolSize,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.DFS,
true,
fieldCardinality
)
);
assertEquals(
executorPoolSize,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.QUERY,
true,
fieldCardinality
)
);
}
for (int i = 0; i < 100; i++) {
assertTrue(threadPoolExecutor.getQueue().offer(() -> {}));
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.DFS,
true,
fieldCardinality
)
);
assertEquals(
1,
DefaultSearchContext.determineMaximumNumberOfSlices(
threadPoolExecutor,
createParallelRequest(),
SearchService.ResultsType.QUERY,
true,
fieldCardinality
)
);
}
}

public void testIsParallelCollectionSupportedForResults() {
SearchSourceBuilder searchSourceBuilderOrNull = randomBoolean() ? null : new SearchSourceBuilder();
ToLongFunction<String> fieldCardinality = name -> -1;
Expand Down

0 comments on commit 297bd5b

Please sign in to comment.