diff --git a/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java new file mode 100644 index 0000000000000..80f81886b8690 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java @@ -0,0 +1,43 @@ +package org.elasticsearch.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; + +public class TransportChannelResponseRunnable extends AbstractRunnable { + private CheckedSupplier command; + private TransportChannel channel; + private String action; + static private final Logger logger = LogManager.getLogger(TransportChannelResponseRunnable.class); + + public TransportChannelResponseRunnable(String action, CheckedSupplier command, TransportChannel channel) { + this.action = action; + this.command = command; + this.channel = channel; + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "Failed to send error message back to client for action [{}]", action), inner); + } + } + + @Override + protected void doRun() throws Exception { + channel.sendResponse(command.get()); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8751e303d0dd9..723812faab3be 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -313,23 +313,20 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportResponse.Empty.INSTANCE); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task); - channel.sendResponse(result); - + searchService.executeDfsPhaseAndSendResponse(DFS_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_ACTION_NAME, request, (SearchTask)task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index b1192c59e4cc7..051ec68b698e7 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,10 +24,13 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.TransportChannelResponseRunnable; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lucene.Lucene; @@ -86,6 +89,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; import java.io.IOException; import java.util.Collections; @@ -94,6 +98,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -217,6 +222,12 @@ protected void doClose() { keepAliveReaper.cancel(); } + public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, (SearchTask)task), channel) + ); + } + public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); @@ -248,6 +259,12 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } + public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, (SearchTask)task), channel) + ); + } + public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -784,6 +801,11 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co } } + private Executor getExecutor(ShardSearchRequest request) { + String executorName = request.isThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH; + return threadPool.executor(executorName); + } + /** * Returns the number of active contexts in this * SearchService diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java index 1f3868c0dbac3..96295a8d5290b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java @@ -68,6 +68,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { private float indexBoost; private SearchSourceBuilder source; private Boolean requestCache; + private Boolean isThrottled; private long nowInMillis; private boolean profile; @@ -78,7 +79,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis) { this(shardId, numberOfShards, searchRequest.searchType(), - searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost); + searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.isThrottled(), aliasFilter, indexBoost); this.scroll = searchRequest.scroll(); this.nowInMillis = nowInMillis; } @@ -92,13 +93,14 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis } public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types, - Boolean requestCache, AliasFilter aliasFilter, float indexBoost) { + Boolean requestCache, Boolean isThrottled, AliasFilter aliasFilter, float indexBoost) { this.shardId = shardId; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; this.types = types; this.requestCache = requestCache; + this.isThrottled = isThrottled; this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; } @@ -154,6 +156,11 @@ public Boolean requestCache() { return requestCache; } + @Override + public Boolean isThrottled() { + return isThrottled; + } + @Override public Scroll scroll() { return scroll; @@ -193,6 +200,7 @@ protected void innerReadFrom(StreamInput in) throws IOException { } nowInMillis = in.readVLong(); requestCache = in.readOptionalBoolean(); + isThrottled = in.readOptionalBoolean(); } protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { @@ -212,6 +220,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException out.writeVLong(nowInMillis); } out.writeOptionalBoolean(requestCache); + out.writeOptionalBoolean(isThrottled); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 5dcc36cc74299..6d5f6b650285b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -66,6 +66,8 @@ public interface ShardSearchRequest { Boolean requestCache(); + Boolean isThrottled(); + Scroll scroll(); /** diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index b9b78fca54aec..df268c6e74e4d 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -126,6 +126,11 @@ public Boolean requestCache() { return shardSearchLocalRequest.requestCache(); } + @Override + public Boolean isThrottled() { + return shardSearchLocalRequest.isThrottled(); + } + @Override public Scroll scroll() { return shardSearchLocalRequest.scroll(); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 8bb2336718d35..1ba3fd186cb44 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -174,7 +174,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); - builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100)); + builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, searchThreadPoolSize(availableProcessors) / 2, 100)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side