From fa26f51c123474cc5552fd8814a1b52994d38a5b Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 10:54:16 +0530 Subject: [PATCH 1/8] Create new threadpool for throttled search --- .../src/main/java/org/elasticsearch/threadpool/ThreadPool.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 128c78a4fd6ca..8bb2336718d35 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -72,6 +72,7 @@ public static class Names { public static final String INDEX = "index"; public static final String BULK = "bulk"; public static final String SEARCH = "search"; + public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; @@ -127,6 +128,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.INDEX, ThreadPoolType.FIXED); map.put(Names.BULK, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.FIXED); + map.put(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); @@ -172,6 +174,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); + builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side From c17cc68603421ca291cbf2ebc872c3d3d46f2aaf Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 11:06:43 +0530 Subject: [PATCH 2/8] Add Boolean isThrottled to SearchRequest --- .../action/search/SearchRequest.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 8c1d8b3766830..cc200081be7ce 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -79,6 +79,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; + private Boolean isThrottled; + public SearchRequest() { } @@ -303,6 +305,17 @@ public boolean isSuggestOnly() { return source != null && source.isSuggestOnly(); } + /** + * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. + */ + public void isThrottled(Boolean isThrottled) { + this.isThrottled = isThrottled; + } + + public Boolean isThrottled() { + return isThrottled; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { // generating description in a lazy way since source can be quite big @@ -342,6 +355,7 @@ public void readFrom(StreamInput in) throws IOException { types = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); requestCache = in.readOptionalBoolean(); + isThrottled = in.readOptionalBoolean(); if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { batchedReduceSize = in.readVInt(); } @@ -362,6 +376,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(types); indicesOptions.writeIndicesOptions(out); out.writeOptionalBoolean(requestCache); + out.writeOptionalBoolean(isThrottled); if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { out.writeVInt(batchedReduceSize); } @@ -382,6 +397,7 @@ public boolean equals(Object o) { Objects.equals(preference, that.preference) && Objects.equals(source, that.source) && Objects.equals(requestCache, that.requestCache) && + Objects.equals(isThrottled, that.isThrottled) && Objects.equals(scroll, that.scroll) && Arrays.equals(types, that.types) && Objects.equals(indicesOptions, that.indicesOptions); @@ -390,7 +406,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - scroll, Arrays.hashCode(types), indicesOptions); + isThrottled, scroll, Arrays.hashCode(types), indicesOptions); } @Override @@ -403,6 +419,7 @@ public String toString() { ", routing='" + routing + '\'' + ", preference='" + preference + '\'' + ", requestCache=" + requestCache + + ", isThrottled=" + isThrottled + ", scroll=" + scroll + ", source=" + source + '}'; } From 6d4efc50e698c34e50aa046d9f3b01febedc61d6 Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 14:44:34 +0530 Subject: [PATCH 3/8] Resolve throttled pool from ShardSearchRequest --- .../TransportChannelResponseRunnable.java | 43 +++++++++++++++++++ .../action/search/SearchTransportService.java | 11 ++--- .../elasticsearch/search/SearchService.java | 22 ++++++++++ .../internal/ShardSearchLocalRequest.java | 13 +++++- .../search/internal/ShardSearchRequest.java | 2 + .../internal/ShardSearchTransportRequest.java | 5 +++ .../elasticsearch/threadpool/ThreadPool.java | 2 +- 7 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java diff --git a/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java new file mode 100644 index 0000000000000..80f81886b8690 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/TransportChannelResponseRunnable.java @@ -0,0 +1,43 @@ +package org.elasticsearch.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; + +public class TransportChannelResponseRunnable extends AbstractRunnable { + private CheckedSupplier command; + private TransportChannel channel; + private String action; + static private final Logger logger = LogManager.getLogger(TransportChannelResponseRunnable.class); + + public TransportChannelResponseRunnable(String action, CheckedSupplier command, TransportChannel channel) { + this.action = action; + this.command = command; + this.channel = channel; + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "Failed to send error message back to client for action [{}]", action), inner); + } + } + + @Override + protected void doRun() throws Exception { + channel.sendResponse(command.get()); + } +} diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 8751e303d0dd9..723812faab3be 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -313,23 +313,20 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportResponse.Empty.INSTANCE); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task); - channel.sendResponse(result); - + searchService.executeDfsPhaseAndSendResponse(DFS_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { - SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_ACTION_NAME, request, (SearchTask)task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index b1192c59e4cc7..051ec68b698e7 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,10 +24,13 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.TransportChannelResponseRunnable; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lucene.Lucene; @@ -86,6 +89,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportChannel; import java.io.IOException; import java.util.Collections; @@ -94,6 +98,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; @@ -217,6 +222,12 @@ protected void doClose() { keepAliveReaper.cancel(); } + public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, (SearchTask)task), channel) + ); + } + public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); context.incRef(); @@ -248,6 +259,12 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea } } + public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + getExecutor(request).execute( + new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, (SearchTask)task), channel) + ); + } + public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException { final SearchContext context = createAndPutContext(request); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -784,6 +801,11 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co } } + private Executor getExecutor(ShardSearchRequest request) { + String executorName = request.isThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH; + return threadPool.executor(executorName); + } + /** * Returns the number of active contexts in this * SearchService diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java index 1f3868c0dbac3..96295a8d5290b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java @@ -68,6 +68,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { private float indexBoost; private SearchSourceBuilder source; private Boolean requestCache; + private Boolean isThrottled; private long nowInMillis; private boolean profile; @@ -78,7 +79,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis) { this(shardId, numberOfShards, searchRequest.searchType(), - searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost); + searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.isThrottled(), aliasFilter, indexBoost); this.scroll = searchRequest.scroll(); this.nowInMillis = nowInMillis; } @@ -92,13 +93,14 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis } public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types, - Boolean requestCache, AliasFilter aliasFilter, float indexBoost) { + Boolean requestCache, Boolean isThrottled, AliasFilter aliasFilter, float indexBoost) { this.shardId = shardId; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; this.types = types; this.requestCache = requestCache; + this.isThrottled = isThrottled; this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; } @@ -154,6 +156,11 @@ public Boolean requestCache() { return requestCache; } + @Override + public Boolean isThrottled() { + return isThrottled; + } + @Override public Scroll scroll() { return scroll; @@ -193,6 +200,7 @@ protected void innerReadFrom(StreamInput in) throws IOException { } nowInMillis = in.readVLong(); requestCache = in.readOptionalBoolean(); + isThrottled = in.readOptionalBoolean(); } protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { @@ -212,6 +220,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException out.writeVLong(nowInMillis); } out.writeOptionalBoolean(requestCache); + out.writeOptionalBoolean(isThrottled); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 5dcc36cc74299..6d5f6b650285b 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -66,6 +66,8 @@ public interface ShardSearchRequest { Boolean requestCache(); + Boolean isThrottled(); + Scroll scroll(); /** diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index b9b78fca54aec..df268c6e74e4d 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -126,6 +126,11 @@ public Boolean requestCache() { return shardSearchLocalRequest.requestCache(); } + @Override + public Boolean isThrottled() { + return shardSearchLocalRequest.isThrottled(); + } + @Override public Scroll scroll() { return shardSearchLocalRequest.scroll(); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 8bb2336718d35..1ba3fd186cb44 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -174,7 +174,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000)); - builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100)); + builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, searchThreadPoolSize(availableProcessors) / 2, 100)); builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side From b0381b6a6f4b11a1f819622d7eafbb1b507159ab Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 16:58:59 +0530 Subject: [PATCH 4/8] Replace all ThreadPool.Names.SEARCH with throttle resolution --- .../search/RemoteClusterConnection.java | 2 +- .../action/search/SearchTransportService.java | 30 +++++++---------- .../action/search/TransportSearchAction.java | 6 +++- .../elasticsearch/search/SearchService.java | 32 +++++++++++++++++-- 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java index a3f3f3a9612b5..abadcaa455556 100644 --- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java @@ -202,7 +202,7 @@ public void handleException(TransportException e) { @Override public String executor() { - return ThreadPool.Names.SEARCH; + return TransportSearchAction.getExecutorName(searchRequest); } }); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 723812faab3be..37f27465d76e6 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -331,65 +331,59 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne }); TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception { - QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_ID_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new); // this is for BWC with pre 5.3 indices in 5.3 we will only execute a `indices:data/read/search[phase/query+fetch]` // if the node is pre 5.3 - transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception { assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards(); - SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeQueryPhaseAndSendResponse(QUERY_FETCH_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(QUERY_FETCH_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(FETCH_ID_SCROLL_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler() { @Override public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception { - FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task); - channel.sendResponse(result); + searchService.executeFetchPhaseAndSendResponse(FETCH_ID_ACTION_NAME, request, (SearchTask) task, channel); } }); TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 6f7cc26e59e58..8ddd6b98d46e7 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -301,7 +301,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { - Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); + Executor executor = threadPool.executor(getExecutorName(searchRequest)); AbstractSearchAsyncAction searchAsyncAction; switch(searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: @@ -320,6 +320,10 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque return searchAsyncAction; } + public static String getExecutorName(SearchRequest searchRequest) { + return searchRequest.isThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; + } + private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING); if (shardCount > shardCountLimit) { diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 051ec68b698e7..212ad9966d239 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -224,7 +224,7 @@ protected void doClose() { public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { getExecutor(request).execute( - new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, (SearchTask)task), channel) + new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, task), channel) ); } @@ -261,7 +261,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { getExecutor(request).execute( - new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, (SearchTask)task), channel) + new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) ); } @@ -322,6 +322,13 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } + public void executeQueryPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) + ); + } + public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); @@ -346,6 +353,13 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req } } + public void executeQueryPhaseAndSendResponse(String action, QuerySearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) + ); + } + public QuerySearchResult executeQueryPhase(QuerySearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.setTask(task); @@ -387,6 +401,13 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { } } + public void executeFetchPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, ()->executeFetchPhase(request, task), channel) + ); + } + public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); context.incRef(); @@ -418,6 +439,13 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques } } + public void executeFetchPhaseAndSendResponse(String action, ShardFetchRequest request, SearchTask task, TransportChannel channel) throws IOException { + ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); + getExecutor(shardSearchRequest).execute( + new TransportChannelResponseRunnable(action, ()->executeFetchPhase(request, task), channel) + ); + } + public FetchSearchResult executeFetchPhase(ShardFetchRequest request, SearchTask task) { final SearchContext context = findContext(request.id()); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener(); From 972eed7d50e2403cf00d6c34f2a7623fdcfe904e Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 17:06:35 +0530 Subject: [PATCH 5/8] Use null as false for isThrottled --- .../org/elasticsearch/action/search/TransportSearchAction.java | 3 ++- core/src/main/java/org/elasticsearch/search/SearchService.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 8ddd6b98d46e7..fd27a1b166e1e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -321,7 +321,8 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque } public static String getExecutorName(SearchRequest searchRequest) { - return searchRequest.isThrottled() ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; + Boolean isThrottled = searchRequest.isThrottled(); + return isThrottled != null && isThrottled ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; } private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 212ad9966d239..19d3bdd998562 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -830,7 +830,8 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co } private Executor getExecutor(ShardSearchRequest request) { - String executorName = request.isThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH; + Boolean isThrottled = request.isThrottled(); + String executorName = isThrottled != null && isThrottled ? Names.SEARCH_THROTTLED : Names.SEARCH; return threadPool.executor(executorName); } From e93a11a39234c7d787ed8873e2382218c79a1e3f Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Fri, 25 Jun 2021 18:31:29 +0530 Subject: [PATCH 6/8] Add "throttle" param to search API --- .../bulk/byscroll/BulkByScrollParallelizationHelper.java | 1 + .../org/elasticsearch/action/search/SearchRequest.java | 3 ++- .../elasticsearch/action/search/SearchRequestBuilder.java | 8 ++++++++ .../rest/action/document/RestCountAction.java | 1 + .../rest/action/search/RestMultiSearchAction.java | 6 +++--- .../rest/action/search/RestSearchAction.java | 1 + .../rest/action/search/RestSuggestAction.java | 1 + 7 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java index f2bd62c233501..396130fc9bd33 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java @@ -75,6 +75,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field, .routing(request.routing()) .preference(request.preference()) .requestCache(request.requestCache()) + .isThrottled(request.isThrottled()) .scroll(request.scroll()) .indicesOptions(request.indicesOptions()); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index cc200081be7ce..35ffe2876b621 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -308,8 +308,9 @@ public boolean isSuggestOnly() { /** * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. */ - public void isThrottled(Boolean isThrottled) { + public SearchRequest isThrottled(Boolean isThrottled) { this.isThrottled = isThrottled; + return this; } public Boolean isThrottled() { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index ffe2c1b20c516..ca1c166de0bda 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -496,6 +496,14 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { return this; } + /** + * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. + */ + public SearchRequestBuilder setIsThrottled(Boolean isThrottled) { + request.isThrottled(isThrottled); + return this; + } + /** * Should the query be profiled. Defaults to false */ diff --git a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java index 5de436b38c319..fcf1e3949ec9f 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java @@ -71,6 +71,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } }); countRequest.routing(request.param("routing")); + countRequest.isThrottled(request.paramAsBoolean("throttle", null)); float minScore = request.paramAsFloat("min_score", -1f); if (minScore != -1f) { searchSourceBuilder.minScore(minScore); diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index fd843730da23d..dca176f1999fb 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -44,9 +44,7 @@ import java.util.Set; import java.util.function.BiConsumer; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.lenientNodeBooleanValue; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue; -import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.*; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -165,6 +163,8 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind searchRequest.searchType(nodeStringValue(value, null)); } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { searchRequest.requestCache(lenientNodeBooleanValue(value, entry.getKey())); + } else if ("throttle".equals(entry.getKey())) { + searchRequest.isThrottled(lenientNodeBooleanValue(value, entry.getKey())); } else if ("preference".equals(entry.getKey())) { searchRequest.preference(nodeStringValue(value, null)); } else if ("routing".equals(entry.getKey())) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 7c575dbb05038..5c15f9c201e46 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -111,6 +111,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r } parseSearchSource(searchRequest.source(), request); searchRequest.requestCache(request.paramAsBoolean("request_cache", null)); + searchRequest.isThrottled(request.paramAsBoolean("throttle", null)); String scroll = request.param("scroll"); if (scroll != null) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java index 8ccc00c5c1e6b..527f26acfbc4d 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java @@ -67,6 +67,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); + searchRequest.isThrottled(request.paramAsBoolean("throttle", null)); return channel -> client.search(searchRequest, new RestBuilderListener(channel) { @Override public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { From 50e7ece6ae25992c3ced79eb4533b8346f711b9c Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Mon, 28 Jun 2021 11:25:23 +0530 Subject: [PATCH 7/8] Refactor throttle_search parameter in REST handlers --- .../BulkByScrollParallelizationHelper.java | 1 - .../action/search/SearchRequest.java | 20 +---------- .../action/search/SearchRequestBuilder.java | 4 +-- .../action/search/TransportSearchAction.java | 4 +-- .../util/concurrent/EsThreadPoolExecutor.java | 5 +++ .../rest/action/document/RestCountAction.java | 32 +++++++++-------- .../action/search/RestMultiSearchAction.java | 2 -- .../rest/action/search/RestSearchAction.java | 10 +++--- .../rest/action/search/RestSuggestAction.java | 36 ++++++++++--------- .../elasticsearch/search/SearchService.java | 6 ++-- .../search/builder/SearchSourceBuilder.java | 23 +++++++++++- .../internal/ShardSearchLocalRequest.java | 16 ++++----- .../search/internal/ShardSearchRequest.java | 2 +- .../internal/ShardSearchTransportRequest.java | 4 +-- 14 files changed, 89 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java index 396130fc9bd33..f2bd62c233501 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/byscroll/BulkByScrollParallelizationHelper.java @@ -75,7 +75,6 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field, .routing(request.routing()) .preference(request.preference()) .requestCache(request.requestCache()) - .isThrottled(request.isThrottled()) .scroll(request.scroll()) .indicesOptions(request.indicesOptions()); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 35ffe2876b621..8c1d8b3766830 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -79,8 +79,6 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; - private Boolean isThrottled; - public SearchRequest() { } @@ -305,18 +303,6 @@ public boolean isSuggestOnly() { return source != null && source.isSuggestOnly(); } - /** - * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. - */ - public SearchRequest isThrottled(Boolean isThrottled) { - this.isThrottled = isThrottled; - return this; - } - - public Boolean isThrottled() { - return isThrottled; - } - @Override public Task createTask(long id, String type, String action, TaskId parentTaskId) { // generating description in a lazy way since source can be quite big @@ -356,7 +342,6 @@ public void readFrom(StreamInput in) throws IOException { types = in.readStringArray(); indicesOptions = IndicesOptions.readIndicesOptions(in); requestCache = in.readOptionalBoolean(); - isThrottled = in.readOptionalBoolean(); if (in.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { batchedReduceSize = in.readVInt(); } @@ -377,7 +362,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(types); indicesOptions.writeIndicesOptions(out); out.writeOptionalBoolean(requestCache); - out.writeOptionalBoolean(isThrottled); if (out.getVersion().onOrAfter(Version.V_5_4_0_UNRELEASED)) { out.writeVInt(batchedReduceSize); } @@ -398,7 +382,6 @@ public boolean equals(Object o) { Objects.equals(preference, that.preference) && Objects.equals(source, that.source) && Objects.equals(requestCache, that.requestCache) && - Objects.equals(isThrottled, that.isThrottled) && Objects.equals(scroll, that.scroll) && Arrays.equals(types, that.types) && Objects.equals(indicesOptions, that.indicesOptions); @@ -407,7 +390,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, - isThrottled, scroll, Arrays.hashCode(types), indicesOptions); + scroll, Arrays.hashCode(types), indicesOptions); } @Override @@ -420,7 +403,6 @@ public String toString() { ", routing='" + routing + '\'' + ", preference='" + preference + '\'' + ", requestCache=" + requestCache + - ", isThrottled=" + isThrottled + ", scroll=" + scroll + ", source=" + source + '}'; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index ca1c166de0bda..4e6733ee286cd 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -499,8 +499,8 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) { /** * Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool. */ - public SearchRequestBuilder setIsThrottled(Boolean isThrottled) { - request.isThrottled(isThrottled); + public SearchRequestBuilder setThrottleSearch(Boolean throttleSearch) { + request.source().setThrottleSearch(throttleSearch); return this; } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index fd27a1b166e1e..e270e627bce1b 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -321,8 +321,8 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque } public static String getExecutorName(SearchRequest searchRequest) { - Boolean isThrottled = searchRequest.isThrottled(); - return isThrottled != null && isThrottled ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; + Boolean throttleSearch = searchRequest.source().getThrottleSearch(); + return throttleSearch != null && throttleSearch ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH; } private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) { diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java index 9662292cf69f6..09ee0f4dde09c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java @@ -19,6 +19,9 @@ package org.elasticsearch.common.util.concurrent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -34,6 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class); /** * Name used in error reporting. */ @@ -92,6 +96,7 @@ public void execute(final Runnable command) { protected void doExecute(final Runnable command) { try { super.execute(command); + logger.info(command.toString() + " executed on thread pool = " + name); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection diff --git a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java index fcf1e3949ec9f..6d10c01d32e8c 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/document/RestCountAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -71,7 +72,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } }); countRequest.routing(request.param("routing")); - countRequest.isThrottled(request.paramAsBoolean("throttle", null)); float minScore = request.paramAsFloat("min_score", -1f); if (minScore != -1f) { searchSourceBuilder.minScore(minScore); @@ -85,21 +85,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } else if (terminateAfter > 0) { searchSourceBuilder.terminateAfter(terminateAfter); } - return channel -> client.search(countRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - if (terminateAfter != DEFAULT_TERMINATE_AFTER) { - builder.field("terminated_early", response.isTerminatedEarly()); - } - builder.field("count", response.getHits().totalHits()); - buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(), + return channel -> { + String throttleSearch = request.header("throttle_search"); + countRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(countRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + if (terminateAfter != DEFAULT_TERMINATE_AFTER) { + builder.field("terminated_early", response.isTerminatedEarly()); + } + builder.field("count", response.getHits().totalHits()); + buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); - builder.endObject(); - return new BytesRestResponse(response.status(), builder); - } - }); + builder.endObject(); + return new BytesRestResponse(response.status(), builder); + } + }); + }; } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index dca176f1999fb..f3778dd7538dd 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -163,8 +163,6 @@ public static void parseMultiLineRequest(RestRequest request, IndicesOptions ind searchRequest.searchType(nodeStringValue(value, null)); } else if ("request_cache".equals(entry.getKey()) || "requestCache".equals(entry.getKey())) { searchRequest.requestCache(lenientNodeBooleanValue(value, entry.getKey())); - } else if ("throttle".equals(entry.getKey())) { - searchRequest.isThrottled(lenientNodeBooleanValue(value, entry.getKey())); } else if ("preference".equals(entry.getKey())) { searchRequest.preference(nodeStringValue(value, null)); } else if ("routing".equals(entry.getKey())) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 5c15f9c201e46..e1c20a67337e2 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; @@ -75,7 +76,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC request.withContentOrSourceParamParserOrNull(parser -> parseSearchRequest(searchRequest, request, parser)); - return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel)); + return channel -> { + String throttleSearch = request.header("throttle_search"); + searchRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(searchRequest, new RestStatusToXContentListener<>(channel)); + }; } /** @@ -86,7 +91,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC */ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request, XContentParser requestContentParser) throws IOException { - if (searchRequest.source() == null) { searchRequest.source(new SearchSourceBuilder()); } @@ -111,7 +115,6 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r } parseSearchSource(searchRequest.source(), request); searchRequest.requestCache(request.paramAsBoolean("request_cache", null)); - searchRequest.isThrottled(request.paramAsBoolean("throttle", null)); String scroll = request.param("scroll"); if (scroll != null) { @@ -169,7 +172,6 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil "if the field is not stored"); } - StoredFieldsContext storedFieldsContext = StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request); if (storedFieldsContext != null) { diff --git a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java index 527f26acfbc4d..97a86918bacf5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/search/RestSuggestAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -67,22 +68,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC } searchRequest.routing(request.param("routing")); searchRequest.preference(request.param("preference")); - searchRequest.isThrottled(request.paramAsBoolean("throttle", null)); - return channel -> client.search(searchRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { - RestStatus restStatus = RestStatus.status(response.getSuccessfulShards(), - response.getTotalShards(), response.getShardFailures()); - builder.startObject(); - buildBroadcastShardsHeader(builder, request, response.getTotalShards(), - response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); - Suggest suggest = response.getSuggest(); - if (suggest != null) { - suggest.toInnerXContent(builder, request); + return channel -> { + String throttleSearch = request.header("throttle_search"); + searchRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null)); + client.search(searchRequest, new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception { + RestStatus restStatus = RestStatus.status(response.getSuccessfulShards(), + response.getTotalShards(), response.getShardFailures()); + builder.startObject(); + buildBroadcastShardsHeader(builder, request, response.getTotalShards(), + response.getSuccessfulShards(), response.getFailedShards(), response.getShardFailures()); + Suggest suggest = response.getSuggest(); + if (suggest != null) { + suggest.toInnerXContent(builder, request); + } + builder.endObject(); + return new BytesRestResponse(restStatus, builder); } - builder.endObject(); - return new BytesRestResponse(restStatus, builder); - } - }); + }); + }; } } diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index 19d3bdd998562..fa99b1b04e6bd 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -24,13 +24,11 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.TransportChannelResponseRunnable; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lucene.Lucene; @@ -830,8 +828,8 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co } private Executor getExecutor(ShardSearchRequest request) { - Boolean isThrottled = request.isThrottled(); - String executorName = isThrottled != null && isThrottled ? Names.SEARCH_THROTTLED : Names.SEARCH; + Boolean throttleSearch = request.getThrottleSearch(); + String executorName = throttleSearch != null && throttleSearch ? Names.SEARCH_THROTTLED : Names.SEARCH; return threadPool.executor(executorName); } diff --git a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index f5bd046eec8aa..1da9dad5b7753 100644 --- a/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -135,6 +135,8 @@ public static HighlightBuilder highlight() { private Boolean explain; + private Boolean throttleSearch; + private Boolean version; private List> sorts; @@ -185,6 +187,7 @@ public SearchSourceBuilder() { public SearchSourceBuilder(StreamInput in) throws IOException { aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new); explain = in.readOptionalBoolean(); + throttleSearch = in.readOptionalBoolean(); fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); docValueFields = (List) in.readGenericValue(); storedFieldsContext = in.readOptionalWriteable(StoredFieldsContext::new); @@ -229,6 +232,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(aggregations); out.writeOptionalBoolean(explain); + out.writeOptionalBoolean(throttleSearch); out.writeOptionalWriteable(fetchSourceContext); out.writeGenericValue(docValueFields); out.writeOptionalWriteable(storedFieldsContext); @@ -375,6 +379,21 @@ public Boolean explain() { return explain; } + /** + * Should the request be executed on throttled thread pool + */ + public SearchSourceBuilder setThrottleSearch(Boolean throttleSearch) { + this.throttleSearch = throttleSearch; + return this; + } + + /** + * Indicates whether this request will be executed on throttled thread pool + */ + public Boolean getThrottleSearch() { + return throttleSearch; + } + /** * Should each {@link org.elasticsearch.search.SearchHit} be returned with a * version associated with it. @@ -903,6 +922,7 @@ private SearchSourceBuilder shallowCopy(QueryBuilder queryBuilder, QueryBuilder SearchSourceBuilder rewrittenBuilder = new SearchSourceBuilder(); rewrittenBuilder.aggregations = aggregations; rewrittenBuilder.explain = explain; + rewrittenBuilder.throttleSearch = throttleSearch; rewrittenBuilder.extBuilders = extBuilders; rewrittenBuilder.fetchSourceContext = fetchSourceContext; rewrittenBuilder.docValueFields = docValueFields; @@ -1433,7 +1453,7 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return Objects.hash(aggregations, explain, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder, + return Objects.hash(aggregations, explain, throttleSearch, fetchSourceContext, docValueFields, storedFieldsContext, from, highlightBuilder, indexBoosts, minScore, postQueryBuilder, queryBuilder, rescoreBuilders, scriptFields, size, sorts, searchAfterBuilder, sliceBuilder, stats, suggestBuilder, terminateAfter, timeout, trackScores, version, profile, extBuilders, collapse); @@ -1450,6 +1470,7 @@ public boolean equals(Object obj) { SearchSourceBuilder other = (SearchSourceBuilder) obj; return Objects.equals(aggregations, other.aggregations) && Objects.equals(explain, other.explain) + && Objects.equals(throttleSearch, other.throttleSearch) && Objects.equals(fetchSourceContext, other.fetchSourceContext) && Objects.equals(docValueFields, other.docValueFields) && Objects.equals(storedFieldsContext, other.storedFieldsContext) diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java index 96295a8d5290b..1f29e4e7fadf8 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchLocalRequest.java @@ -68,7 +68,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { private float indexBoost; private SearchSourceBuilder source; private Boolean requestCache; - private Boolean isThrottled; + private Boolean throttleSearch; private long nowInMillis; private boolean profile; @@ -79,7 +79,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest { ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis) { this(shardId, numberOfShards, searchRequest.searchType(), - searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.isThrottled(), aliasFilter, indexBoost); + searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.source().getThrottleSearch(), aliasFilter, indexBoost); this.scroll = searchRequest.scroll(); this.nowInMillis = nowInMillis; } @@ -93,14 +93,14 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis } public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types, - Boolean requestCache, Boolean isThrottled, AliasFilter aliasFilter, float indexBoost) { + Boolean requestCache, Boolean throttleSearch, AliasFilter aliasFilter, float indexBoost) { this.shardId = shardId; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; this.types = types; this.requestCache = requestCache; - this.isThrottled = isThrottled; + this.throttleSearch = throttleSearch; this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; } @@ -157,8 +157,8 @@ public Boolean requestCache() { } @Override - public Boolean isThrottled() { - return isThrottled; + public Boolean getThrottleSearch() { + return throttleSearch; } @Override @@ -200,7 +200,7 @@ protected void innerReadFrom(StreamInput in) throws IOException { } nowInMillis = in.readVLong(); requestCache = in.readOptionalBoolean(); - isThrottled = in.readOptionalBoolean(); + throttleSearch = in.readOptionalBoolean(); } protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { @@ -220,7 +220,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException out.writeVLong(nowInMillis); } out.writeOptionalBoolean(requestCache); - out.writeOptionalBoolean(isThrottled); + out.writeOptionalBoolean(throttleSearch); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 6d5f6b650285b..cc5dc322ae731 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -66,7 +66,7 @@ public interface ShardSearchRequest { Boolean requestCache(); - Boolean isThrottled(); + Boolean getThrottleSearch(); Scroll scroll(); diff --git a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java index df268c6e74e4d..504b9c1038590 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java +++ b/core/src/main/java/org/elasticsearch/search/internal/ShardSearchTransportRequest.java @@ -127,8 +127,8 @@ public Boolean requestCache() { } @Override - public Boolean isThrottled() { - return shardSearchLocalRequest.isThrottled(); + public Boolean getThrottleSearch() { + return shardSearchLocalRequest.getThrottleSearch(); } @Override From 809dcbb6c9bbdfb721761233d9944903bf80637c Mon Sep 17 00:00:00 2001 From: Govind Balaji S Date: Mon, 28 Jun 2021 11:40:14 +0530 Subject: [PATCH 8/8] Formatting --- .../java/org/elasticsearch/search/SearchService.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index fa99b1b04e6bd..c0aa4f864c504 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -222,7 +222,7 @@ protected void doClose() { public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { getExecutor(request).execute( - new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeDfsPhase(request, task), channel) ); } @@ -259,7 +259,7 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { getExecutor(request).execute( - new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) ); } @@ -323,7 +323,7 @@ private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOp public void executeQueryPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); getExecutor(shardSearchRequest).execute( - new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) ); } @@ -354,7 +354,7 @@ public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest req public void executeQueryPhaseAndSendResponse(String action, QuerySearchRequest request, SearchTask task, TransportChannel channel) throws IOException { ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); getExecutor(shardSearchRequest).execute( - new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeQueryPhase(request, task), channel) ); } @@ -402,7 +402,7 @@ private boolean fetchPhaseShouldFreeContext(SearchContext context) { public void executeFetchPhaseAndSendResponse(String action, InternalScrollSearchRequest request, SearchTask task, TransportChannel channel) throws IOException { ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); getExecutor(shardSearchRequest).execute( - new TransportChannelResponseRunnable(action, ()->executeFetchPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeFetchPhase(request, task), channel) ); } @@ -440,7 +440,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques public void executeFetchPhaseAndSendResponse(String action, ShardFetchRequest request, SearchTask task, TransportChannel channel) throws IOException { ShardSearchRequest shardSearchRequest = findContext(request.id()).request(); getExecutor(shardSearchRequest).execute( - new TransportChannelResponseRunnable(action, ()->executeFetchPhase(request, task), channel) + new TransportChannelResponseRunnable(action, () -> executeFetchPhase(request, task), channel) ); }