From b5cce74c65d4b3ecd0914cebde06b6dc3ea94547 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 16 Oct 2024 10:30:39 +0200 Subject: [PATCH] Improve empty queue conditional from slicing logic With recent changes in Lucene 9.12 around not forking execution when not necessary (see https://github.com/apache/lucene/pull/13472), we have removed the search worker thread pool in #111099. The worker thread pool had unlimited queue, and we feared that we couuld have much more queueing on the search thread pool if we execute segment level searches on the same thread pool as the shard level searches, because every shard search would take up to a thread per slice when executing the query phase. We have then introduced an additional conditional to stop parallelizing when there is a queue. That is perhaps a bit extreme, as it's a decision made when creating the searcher, while a queue may no longer be there once the search is executing. This has caused some benchmarks regressions, given that having a queue may be a transient scenario, especially with short-lived segment searches being queued up. We may end up disabling inter-segment concurrency more aggressively than we would want, penalizing requests that do benefit from concurrency. At the same time, we do want to have some kind of protection against rejections of shard searches that would be caused by excessive slicing. When the queue is above a certain size, we can turn off the slicing and effectively disable inter-segment concurrency. With this commit we set that threshold to be the number of threads in the search pool. --- .../search/DefaultSearchContext.java | 2 +- .../search/DefaultSearchContextTests.java | 209 ++++++++++++------ 2 files changed, 148 insertions(+), 63 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 1521b17a81766..8ac35f7c40caa 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -291,7 +291,7 @@ static int determineMaximumNumberOfSlices( ToLongFunction fieldCardinality ) { return executor instanceof ThreadPoolExecutor tpe - && tpe.getQueue().isEmpty() + && tpe.getQueue().size() <= tpe.getMaximumPoolSize() && isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection) ? tpe.getMaximumPoolSize() : 1; diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 0e4945e8bb8d1..a474c1dc38c50 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -81,6 +81,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToLongFunction; @@ -507,10 +508,10 @@ public void testNewIdLoaderWithTsdbAndRoutingPathMatch() throws Exception { } } - public void testDetermineMaximumNumberOfSlices() { + private static ShardSearchRequest createParallelRequest() { IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0)); - ShardSearchRequest parallelReq = new ShardSearchRequest( + return new ShardSearchRequest( OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(randomBoolean()), indexShard.shardId(), @@ -521,69 +522,74 @@ public void testDetermineMaximumNumberOfSlices() { System.currentTimeMillis(), null ); - ShardSearchRequest singleSliceReq = new ShardSearchRequest( - OriginalIndices.NONE, - new SearchRequest().allowPartialSearchResults(randomBoolean()) - .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))), - indexShard.shardId(), - 0, - 1, - AliasFilter.EMPTY, - 1f, - System.currentTimeMillis(), - null - ); - int executorPoolSize = randomIntBetween(1, 100); - ExecutorService threadPoolExecutor = EsExecutors.newFixed( - "test", - executorPoolSize, - 0, - Thread::new, - new ThreadContext(Settings.EMPTY), - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ); - ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool(); - ToLongFunction fieldCardinality = name -> -1; + } + + public void testDetermineMaximumNumberOfSlicesNoExecutor() { + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - parallelReq, + null, + createParallelRequest(), SearchService.ResultsType.DFS, - true, + randomBoolean(), fieldCardinality ) ); assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - singleSliceReq, - SearchService.ResultsType.DFS, - true, + null, + createParallelRequest(), + SearchService.ResultsType.QUERY, + randomBoolean(), fieldCardinality ) ); + } + + public void testDetermineMaximumNumberOfSlicesNotThreadPoolExecutor() { + ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool(); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; assertEquals( 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, true, fieldCardinality) + DefaultSearchContext.determineMaximumNumberOfSlices( + notThreadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + randomBoolean(), + fieldCardinality + ) ); assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - parallelReq, + notThreadPoolExecutor, + createParallelRequest(), SearchService.ResultsType.QUERY, - true, + randomBoolean(), fieldCardinality ) ); + } + + public void testDetermineMaximumNumberOfSlicesEnableQueryPhaseParallelCollection() { + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 0, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ToLongFunction fieldCardinality = name -> -1; assertEquals( - 1, + executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - singleSliceReq, + createParallelRequest(), SearchService.ResultsType.QUERY, true, fieldCardinality @@ -592,54 +598,133 @@ public void testDetermineMaximumNumberOfSlices() { assertEquals( 1, DefaultSearchContext.determineMaximumNumberOfSlices( - notThreadPoolExecutor, - parallelReq, - SearchService.ResultsType.DFS, - true, + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + false, fieldCardinality ) ); - assertEquals( executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - parallelReq, + createParallelRequest(), SearchService.ResultsType.DFS, - false, + randomBoolean(), fieldCardinality ) ); - assertEquals( + } + + public void testDetermineMaximumNumberOfSlicesSingleSortByField() { + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0)); + ShardSearchRequest singleSliceReq = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(randomBoolean()) + .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))), + indexShard.shardId(), + 0, 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, false, fieldCardinality) + AliasFilter.EMPTY, + 1f, + System.currentTimeMillis(), + null ); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 0, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + // DFS concurrency does not rely on slices, hence it kicks in regardless of the request (supportsParallelCollection is not called) assertEquals( - 1, + executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - parallelReq, - SearchService.ResultsType.QUERY, - false, + singleSliceReq, + SearchService.ResultsType.DFS, + true, fieldCardinality ) ); - assertEquals( - 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.QUERY, false, fieldCardinality) - ); assertEquals( 1, DefaultSearchContext.determineMaximumNumberOfSlices( - notThreadPoolExecutor, - parallelReq, - SearchService.ResultsType.DFS, - false, + threadPoolExecutor, + singleSliceReq, + SearchService.ResultsType.QUERY, + true, fieldCardinality ) ); } + public void testDetermineMaximumNumberOfSlicesWithQueue() { + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 1000, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; + + for (int i = 0; i < executorPoolSize; i++) { + assertTrue(threadPoolExecutor.getQueue().offer(() -> {})); + assertEquals( + executorPoolSize, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + true, + fieldCardinality + ) + ); + assertEquals( + executorPoolSize, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + true, + fieldCardinality + ) + ); + } + for (int i = 0; i < 100; i++) { + assertTrue(threadPoolExecutor.getQueue().offer(() -> {})); + assertEquals( + 1, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + true, + fieldCardinality + ) + ); + assertEquals( + 1, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + true, + fieldCardinality + ) + ); + } + } + public void testIsParallelCollectionSupportedForResults() { SearchSourceBuilder searchSourceBuilderOrNull = randomBoolean() ? null : new SearchSourceBuilder(); ToLongFunction fieldCardinality = name -> -1;