Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttled Search #2

Open
wants to merge 8 commits into
base: branchv5.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.elasticsearch.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

public class TransportChannelResponseRunnable extends AbstractRunnable {
private CheckedSupplier<TransportResponse, Exception> command;
private TransportChannel channel;
private String action;
static private final Logger logger = LogManager.getLogger(TransportChannelResponseRunnable.class);

public TransportChannelResponseRunnable(String action, CheckedSupplier<TransportResponse, Exception> command, TransportChannel channel) {
this.action = action;
this.command = command;
this.channel = channel;
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"Failed to send error message back to client for action [{}]", action), inner);
}
}

@Override
protected void doRun() throws Exception {
channel.sendResponse(command.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void handleException(TransportException e) {

@Override
public String executor() {
return ThreadPool.Names.SEARCH;
return TransportSearchAction.getExecutorName(searchRequest);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,14 @@ public SearchRequestBuilder setRequestCache(Boolean requestCache) {
return this;
}

/**
* Sets if this request will be executed on SEARCH_THROTTLED thread pool, rather than the SEARCH thread pool.
*/
public SearchRequestBuilder setThrottleSearch(Boolean throttleSearch) {
request.source().setThrottleSearch(throttleSearch);
return this;
}

/**
* Should the query be profiled. Defaults to <code>false</code>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,86 +313,77 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);

transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task);
channel.sendResponse(result);

searchService.executeDfsPhaseAndSendResponse(DFS_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhaseAndSendResponse(QUERY_ACTION_NAME, request, (SearchTask)task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhaseAndSendResponse(QUERY_ID_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhaseAndSendResponse(QUERY_SCROLL_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);

// this is for BWC with pre 5.3 indices in 5.3 we will only execute a `indices:data/read/search[phase/query+fetch]`
// if the node is pre 5.3
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
assert request.numberOfShards() == 1 : "expected single shard request but got: " + request.numberOfShards();
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhaseAndSendResponse(QUERY_FETCH_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_ACTION_NAME, QueryFetchSearchResult::new);

transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhaseAndSendResponse(QUERY_FETCH_SCROLL_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhaseAndSendResponse(FETCH_ID_SCROLL_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeFetchPhaseAndSendResponse(FETCH_ID_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
Executor executor = threadPool.executor(ThreadPool.Names.SEARCH);
Executor executor = threadPool.executor(getExecutorName(searchRequest));
AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
Expand All @@ -320,6 +320,11 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
return searchAsyncAction;
}

public static String getExecutorName(SearchRequest searchRequest) {
Boolean throttleSearch = searchRequest.source().getThrottleSearch();
return throttleSearch != null && throttleSearch ? ThreadPool.Names.SEARCH_THROTTLED : ThreadPool.Names.SEARCH;
}

private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
final long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
if (shardCount > shardCountLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -34,6 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private volatile ShutdownListener listener;

private final Object monitor = new Object();
private static final Logger logger = LogManager.getLogger(EsThreadPoolExecutor.class);
/**
* Name used in error reporting.
*/
Expand Down Expand Up @@ -92,6 +96,7 @@ public void execute(final Runnable command) {
protected void doExecute(final Runnable command) {
try {
super.execute(command);
logger.info(command.toString() + " executed on thread pool = " + name);
} catch (EsRejectedExecutionException ex) {
if (command instanceof AbstractRunnable) {
// If we are an abstract runnable we can handle the rejection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -84,21 +85,25 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
} else if (terminateAfter > 0) {
searchSourceBuilder.terminateAfter(terminateAfter);
}
return channel -> client.search(countRequest, new RestBuilderListener<SearchResponse>(channel) {
@Override
public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
if (terminateAfter != DEFAULT_TERMINATE_AFTER) {
builder.field("terminated_early", response.isTerminatedEarly());
}
builder.field("count", response.getHits().totalHits());
buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(),
return channel -> {
String throttleSearch = request.header("throttle_search");
countRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null));
client.search(countRequest, new RestBuilderListener<SearchResponse>(channel) {
@Override
public RestResponse buildResponse(SearchResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
if (terminateAfter != DEFAULT_TERMINATE_AFTER) {
builder.field("terminated_early", response.isTerminatedEarly());
}
builder.field("count", response.getHits().totalHits());
buildBroadcastShardsHeader(builder, request, response.getTotalShards(), response.getSuccessfulShards(),
response.getFailedShards(), response.getShardFailures());

builder.endObject();
return new BytesRestResponse(response.status(), builder);
}
});
builder.endObject();
return new BytesRestResponse(response.status(), builder);
}
});
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
import java.util.Set;
import java.util.function.BiConsumer;

import static org.elasticsearch.common.xcontent.support.XContentMapValues.lenientNodeBooleanValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.*;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -75,7 +76,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser));

return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
return channel -> {
String throttleSearch = request.header("throttle_search");
searchRequest.source().setThrottleSearch(Booleans.parseBoolean(throttleSearch, null));
client.search(searchRequest, new RestStatusToXContentListener<>(channel));
};
}

/**
Expand All @@ -86,7 +91,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
*/
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser) throws IOException {

if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
}
Expand Down Expand Up @@ -168,7 +172,6 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
"if the field is not stored");
}


StoredFieldsContext storedFieldsContext =
StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);
if (storedFieldsContext != null) {
Expand Down
Loading