diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 1521b17a81766..8ac35f7c40caa 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -291,7 +291,7 @@ static int determineMaximumNumberOfSlices( ToLongFunction fieldCardinality ) { return executor instanceof ThreadPoolExecutor tpe - && tpe.getQueue().isEmpty() + && tpe.getQueue().size() <= tpe.getMaximumPoolSize() && isParallelCollectionSupportedForResults(resultsType, request.source(), fieldCardinality, enableQueryPhaseParallelCollection) ? tpe.getMaximumPoolSize() : 1; diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 0e4945e8bb8d1..a474c1dc38c50 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -81,6 +81,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.ToLongFunction; @@ -507,10 +508,10 @@ public void testNewIdLoaderWithTsdbAndRoutingPathMatch() throws Exception { } } - public void testDetermineMaximumNumberOfSlices() { + private static ShardSearchRequest createParallelRequest() { IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0)); - ShardSearchRequest parallelReq = new ShardSearchRequest( + return new ShardSearchRequest( OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(randomBoolean()), indexShard.shardId(), @@ -521,69 +522,74 @@ public void testDetermineMaximumNumberOfSlices() { System.currentTimeMillis(), null ); - ShardSearchRequest singleSliceReq = new ShardSearchRequest( - OriginalIndices.NONE, - new SearchRequest().allowPartialSearchResults(randomBoolean()) - .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))), - indexShard.shardId(), - 0, - 1, - AliasFilter.EMPTY, - 1f, - System.currentTimeMillis(), - null - ); - int executorPoolSize = randomIntBetween(1, 100); - ExecutorService threadPoolExecutor = EsExecutors.newFixed( - "test", - executorPoolSize, - 0, - Thread::new, - new ThreadContext(Settings.EMPTY), - EsExecutors.TaskTrackingConfig.DO_NOT_TRACK - ); - ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool(); - ToLongFunction fieldCardinality = name -> -1; + } + + public void testDetermineMaximumNumberOfSlicesNoExecutor() { + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - parallelReq, + null, + createParallelRequest(), SearchService.ResultsType.DFS, - true, + randomBoolean(), fieldCardinality ) ); assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - singleSliceReq, - SearchService.ResultsType.DFS, - true, + null, + createParallelRequest(), + SearchService.ResultsType.QUERY, + randomBoolean(), fieldCardinality ) ); + } + + public void testDetermineMaximumNumberOfSlicesNotThreadPoolExecutor() { + ExecutorService notThreadPoolExecutor = Executors.newWorkStealingPool(); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; assertEquals( 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, true, fieldCardinality) + DefaultSearchContext.determineMaximumNumberOfSlices( + notThreadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + randomBoolean(), + fieldCardinality + ) ); assertEquals( - executorPoolSize, + 1, DefaultSearchContext.determineMaximumNumberOfSlices( - threadPoolExecutor, - parallelReq, + notThreadPoolExecutor, + createParallelRequest(), SearchService.ResultsType.QUERY, - true, + randomBoolean(), fieldCardinality ) ); + } + + public void testDetermineMaximumNumberOfSlicesEnableQueryPhaseParallelCollection() { + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 0, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ToLongFunction fieldCardinality = name -> -1; assertEquals( - 1, + executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - singleSliceReq, + createParallelRequest(), SearchService.ResultsType.QUERY, true, fieldCardinality @@ -592,54 +598,133 @@ public void testDetermineMaximumNumberOfSlices() { assertEquals( 1, DefaultSearchContext.determineMaximumNumberOfSlices( - notThreadPoolExecutor, - parallelReq, - SearchService.ResultsType.DFS, - true, + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + false, fieldCardinality ) ); - assertEquals( executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - parallelReq, + createParallelRequest(), SearchService.ResultsType.DFS, - false, + randomBoolean(), fieldCardinality ) ); - assertEquals( + } + + public void testDetermineMaximumNumberOfSlicesSingleSortByField() { + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.shardId()).thenReturn(new ShardId("index", "uuid", 0)); + ShardSearchRequest singleSliceReq = new ShardSearchRequest( + OriginalIndices.NONE, + new SearchRequest().allowPartialSearchResults(randomBoolean()) + .source(new SearchSourceBuilder().sort(SortBuilders.fieldSort(FieldSortBuilder.DOC_FIELD_NAME))), + indexShard.shardId(), + 0, 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.DFS, false, fieldCardinality) + AliasFilter.EMPTY, + 1f, + System.currentTimeMillis(), + null ); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 0, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + // DFS concurrency does not rely on slices, hence it kicks in regardless of the request (supportsParallelCollection is not called) assertEquals( - 1, + executorPoolSize, DefaultSearchContext.determineMaximumNumberOfSlices( threadPoolExecutor, - parallelReq, - SearchService.ResultsType.QUERY, - false, + singleSliceReq, + SearchService.ResultsType.DFS, + true, fieldCardinality ) ); - assertEquals( - 1, - DefaultSearchContext.determineMaximumNumberOfSlices(null, parallelReq, SearchService.ResultsType.QUERY, false, fieldCardinality) - ); assertEquals( 1, DefaultSearchContext.determineMaximumNumberOfSlices( - notThreadPoolExecutor, - parallelReq, - SearchService.ResultsType.DFS, - false, + threadPoolExecutor, + singleSliceReq, + SearchService.ResultsType.QUERY, + true, fieldCardinality ) ); } + public void testDetermineMaximumNumberOfSlicesWithQueue() { + int executorPoolSize = randomIntBetween(1, 100); + ThreadPoolExecutor threadPoolExecutor = EsExecutors.newFixed( + "test", + executorPoolSize, + 1000, + Thread::new, + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + ToLongFunction fieldCardinality = name -> { throw new UnsupportedOperationException(); }; + + for (int i = 0; i < executorPoolSize; i++) { + assertTrue(threadPoolExecutor.getQueue().offer(() -> {})); + assertEquals( + executorPoolSize, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + true, + fieldCardinality + ) + ); + assertEquals( + executorPoolSize, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + true, + fieldCardinality + ) + ); + } + for (int i = 0; i < 100; i++) { + assertTrue(threadPoolExecutor.getQueue().offer(() -> {})); + assertEquals( + 1, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.DFS, + true, + fieldCardinality + ) + ); + assertEquals( + 1, + DefaultSearchContext.determineMaximumNumberOfSlices( + threadPoolExecutor, + createParallelRequest(), + SearchService.ResultsType.QUERY, + true, + fieldCardinality + ) + ); + } + } + public void testIsParallelCollectionSupportedForResults() { SearchSourceBuilder searchSourceBuilderOrNull = randomBoolean() ? null : new SearchSourceBuilder(); ToLongFunction fieldCardinality = name -> -1;