diff --git a/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java new file mode 100644 index 0000000000000..80f81886b8690 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java @@ -0,0 +1,43 @@ +package org.elasticsearch.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; + +public class TransportChannelResponseRunnable extends AbstractRunnable { + private CheckedSupplier command; + private TransportChannel channel; + private String action; + static private final Logger logger = LogManager.getLogger(TransportChannelResponseRunnable.class); + + public TransportChannelResponseRunnable(String action, CheckedSupplier command, TransportChannel channel) { + this.action = action; + this.command = command; + this.channel = channel; + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "Failed to send error message back to client for action [{}]", action), inner); + } + } + + @Override + protected void doRun() throws Exception { + channel.sendResponse(command.get()); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index a3f3f3a9612b5..abadcaa455556 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -202,7 +202,7 @@ public void handleException(TransportException e) { @Override public String executor() { - return ThreadPool.Names.SEARCH; + return TransportSearchAction.getExecutorName(searchRequest); } }); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index ffe2c1b20c516..4e6733ee286cd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -496,6 +496,14 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { return this; } + /** + * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. + */ + public SearchRequestBuilder setThrottleSearch(Boolean throttleSearch) { + request.source().setThrottleSearch(throttleSearch); + return this; + } + /** * Should the query be profiled. Defaults to false */ diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8751e303d0dd9..37f27465d76e6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -313,86 +313,77 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportResponse.Empty.INSTANCE); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task); - channel.sendResponse(result); - + searchService.executeDfsPhaseAndSendResponse(DFS_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_ACTION_NAME, request, (SearchTask)task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { - QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_ID_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); // this is for BWC with pre 5.3 indices in 5.3 we will only execute a `indices:data/read/search[phase/query+fetch]` // if the node is pre 5.3 - transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards(); - SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_FETCH_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(QUERY_FETCH_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(FETCH_ID_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(FETCH_ID_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 6f7cc26e59e58..e270e627bce1b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -301,7 +301,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { - Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); + Executor executor = threadPool.executor(getExecutorName(searchRequest)); AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: @@ -320,6 +320,11 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque return searchAsyncAction; } + public static String getExecutorName(SearchRequest searchRequest) { + Boolean throttleSearch = searchRequest.source().getThrottleSearch(); + return throttleSearch != null && throttleSearch ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; + } + private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING); if (shardCount > shardCountLimit) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9662292cf69f6..09ee0f4dde09c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -19,6 +19,9 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -34,6 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); /** * Name used in error reporting. */ @@ -92,6 +96,7 @@ public void execute(final Runnable command) { protected void doExecute(final Runnable command) { try { super.execute(command); + logger.info(command.toString() + " executed on thread pool = " + name); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection diff --git a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java index 5de436b38c319..6d10c01d32e8c 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -84,21 +85,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } else if (terminateAfter > 0) { searchSourceBuilder.terminateAfter(terminateAfter); } - return channel -> client.search(countRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - if (terminateAfter != DEFAULT_TERMINATE_AFTER) { - builder.field("terminated_early", response.isTerminatedEarly()); - } - builder.field("count", response.getHits().totalHits()); - buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(), + return channel -> { + String throttleSearch = request.header("throttle_search"); + countRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(countRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + if (terminateAfter != DEFAULT_TERMINATE_AFTER) { + builder.field("terminated_early", response.isTerminatedEarly()); + } + builder.field("count", response.getHits().totalHits()); + buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); - builder.endObject(); - return new BytesRestResponse(response.status(), builder); - } - }); + builder.endObject(); + return new BytesRestResponse(response.status(), builder); + } + }); + }; } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index fd843730da23d..f3778dd7538dd 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -44,9 +44,7 @@ import java.util.Set; import java.util.function.BiConsumer; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.lenientNodeBooleanValue; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.*; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 7c575dbb05038..e1c20a67337e2 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; @@ -75,7 +76,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.withContentOrSourceParamParserOrNull(parser -> parseSearchRequest(searchRequest, request, parser)); - return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel)); + return channel -> { + String throttleSearch = request.header("throttle_search"); + searchRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(searchRequest, new RestStatusToXContentListener<>(channel)); + }; } /** @@ -86,7 +91,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC */ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request, XContentParser requestContentParser) throws IOException { - if (searchRequest.source() == null) { searchRequest.source(new SearchSourceBuilder()); } @@ -168,7 +172,6 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil "if the field is not stored"); } - StoredFieldsContext storedFieldsContext = StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request); if (storedFieldsContext != null) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java index 8ccc00c5c1e6b..97a86918bacf5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -67,21 +68,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); - return channel -> client.search(searchRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { - RestStatus restStatus = RestStatus.status(response.getSuccessfulShards(), - response.getTotalShards(), response.getShardFailures()); - builder.startObject(); - buildBroadcastShardsHeader(builder, request, response.getTotalShards(), - response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); - Suggest suggest = response.getSuggest(); - if (suggest != null) { - suggest.toInnerXContent(builder, request); + return channel -> { + String throttleSearch = request.header("throttle_search"); + searchRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(searchRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { + RestStatus restStatus = RestStatus.status(response.getSuccessfulShards(), + response.getTotalShards(), response.getShardFailures()); + builder.startObject(); + buildBroadcastShardsHeader(builder, request, response.getTotalShards(), + response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); + Suggest suggest = response.getSuggest(); + if (suggest != null) { + suggest.toInnerXContent(builder, request); + } + builder.endObject(); + return new BytesRestResponse(restStatus, builder); } - builder.endObject(); - return new BytesRestResponse(restStatus, builder); - } - }); + }); + }; } } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index b1192c59e4cc7..c0aa4f864c504 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.TransportChannelResponseRunnable; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; @@ -86,6 +87,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; import java.io.IOException; import java.util.Collections; @@ -94,6 +96,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -217,6 +220,12 @@ protected void doClose() { keepAliveReaper.cancel(); } + public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, () -> executeDfsPhase(request, task), channel) + ); + } + public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); @@ -248,6 +257,12 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } + public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) + ); + } + public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -305,6 +320,13 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } + public void executeQueryPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) + ); + } + public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -329,6 +351,13 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req } } + public void executeQueryPhaseAndSendResponse(String action, QuerySearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) + ); + } + public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.setTask(task); @@ -370,6 +399,13 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } + public void executeFetchPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, () -> executeFetchPhase(request, task), channel) + ); + } + public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.incRef(); @@ -401,6 +437,13 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques } } + public void executeFetchPhaseAndSendResponse(String action, ShardFetchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, () -> executeFetchPhase(request, task), channel) + ); + } + public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -784,6 +827,12 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co } } + private Executor getExecutor(ShardSearchRequest request) { + Boolean throttleSearch = request.getThrottleSearch(); + String executorName = throttleSearch != null && throttleSearch ? Names.SEARCH_THROTTLED : Names.SEARCH; + return threadPool.executor(executorName); + } + /** * Returns the number of active contexts in this * SearchService diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index f5bd046eec8aa..1da9dad5b7753 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -135,6 +135,8 @@ public static HighlightBuilder highlight() { private Boolean explain; + private Boolean throttleSearch; + private Boolean version; private List> sorts; @@ -185,6 +187,7 @@ public SearchSourceBuilder() { public SearchSourceBuilder(StreamInput in) throws IOException { aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); explain = in.readOptionalBoolean(); + throttleSearch = in.readOptionalBoolean(); fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); docValueFields = (List) in.readGenericValue(); storedFieldsContext = in.readOptionalWriteable(StoredFieldsContext::new); @@ -229,6 +232,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(aggregations); out.writeOptionalBoolean(explain); + out.writeOptionalBoolean(throttleSearch); out.writeOptionalWriteable(fetchSourceContext); out.writeGenericValue(docValueFields); out.writeOptionalWriteable(storedFieldsContext); @@ -375,6 +379,21 @@ public Boolean explain() { return explain; } + /** + * Should the request be executed on throttled thread pool + */ + public SearchSourceBuilder setThrottleSearch(Boolean throttleSearch) { + this.throttleSearch = throttleSearch; + return this; + } + + /** + * Indicates whether this request will be executed on throttled thread pool + */ + public Boolean getThrottleSearch() { + return throttleSearch; + } + /** * Should each {@link org.elasticsearch.search.SearchHit} be returned with a * version associated with it. @@ -903,6 +922,7 @@ private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder SearchSourceBuilder rewrittenBuilder = new SearchSourceBuilder(); rewrittenBuilder.aggregations = aggregations; rewrittenBuilder.explain = explain; + rewrittenBuilder.throttleSearch = throttleSearch; rewrittenBuilder.extBuilders = extBuilders; rewrittenBuilder.fetchSourceContext = fetchSourceContext; rewrittenBuilder.docValueFields = docValueFields; @@ -1433,7 +1453,7 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return Objects.hash(aggregations, explain, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder, + return Objects.hash(aggregations, explain, throttleSearch, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder, indexBoosts, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields, size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeout, trackScores, version, profile, extBuilders, collapse); @@ -1450,6 +1470,7 @@ public boolean equals(Object obj) { SearchSourceBuilder other = (SearchSourceBuilder) obj; return Objects.equals(aggregations, other.aggregations) && Objects.equals(explain, other.explain) + && Objects.equals(throttleSearch, other.throttleSearch) && Objects.equals(fetchSourceContext, other.fetchSourceContext) && Objects.equals(docValueFields, other.docValueFields) && Objects.equals(storedFieldsContext, other.storedFieldsContext) diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java index 1f3868c0dbac3..1f29e4e7fadf8 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java @@ -68,6 +68,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { private float indexBoost; private SearchSourceBuilder source; private Boolean requestCache; + private Boolean throttleSearch; private long nowInMillis; private boolean profile; @@ -78,7 +79,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis) { this(shardId, numberOfShards, searchRequest.searchType(), - searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost); + searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.source().getThrottleSearch(), aliasFilter, indexBoost); this.scroll = searchRequest.scroll(); this.nowInMillis = nowInMillis; } @@ -92,13 +93,14 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis } public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types, - Boolean requestCache, AliasFilter aliasFilter, float indexBoost) { + Boolean requestCache, Boolean throttleSearch, AliasFilter aliasFilter, float indexBoost) { this.shardId = shardId; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; this.types = types; this.requestCache = requestCache; + this.throttleSearch = throttleSearch; this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; } @@ -154,6 +156,11 @@ public Boolean requestCache() { return requestCache; } + @Override + public Boolean getThrottleSearch() { + return throttleSearch; + } + @Override public Scroll scroll() { return scroll; @@ -193,6 +200,7 @@ protected void innerReadFrom(StreamInput in) throws IOException { } nowInMillis = in.readVLong(); requestCache = in.readOptionalBoolean(); + throttleSearch = in.readOptionalBoolean(); } protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { @@ -212,6 +220,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException out.writeVLong(nowInMillis); } out.writeOptionalBoolean(requestCache); + out.writeOptionalBoolean(throttleSearch); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 5dcc36cc74299..cc5dc322ae731 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -66,6 +66,8 @@ public interface ShardSearchRequest { Boolean requestCache(); + Boolean getThrottleSearch(); + Scroll scroll(); /** diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index b9b78fca54aec..504b9c1038590 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -126,6 +126,11 @@ public Boolean requestCache() { return shardSearchLocalRequest.requestCache(); } + @Override + public Boolean getThrottleSearch() { + return shardSearchLocalRequest.getThrottleSearch(); + } + @Override public Scroll scroll() { return shardSearchLocalRequest.scroll(); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 128c78a4fd6ca..1ba3fd186cb44 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -72,6 +72,7 @@ public static class Names { public static final String INDEX = "index"; public static final String BULK = "bulk"; public static final String SEARCH = "search"; + public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; @@ -127,6 +128,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.INDEX, ThreadPoolType.FIXED); map.put(Names.BULK, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.FIXED); + map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -172,6 +174,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); + builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, searchThreadPoolSize(availableProcessors) / 2, 100)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side