Skip to content

Commit

Permalink
Resolve throttled pool from ShardSearchRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
Govind Balaji S committed Jun 25, 2021
1 parent c17cc68 commit 6d4efc5
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.elasticsearch.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

public class TransportChannelResponseRunnable extends AbstractRunnable {
private CheckedSupplier<TransportResponse, Exception> command;
private TransportChannel channel;
private String action;
static private final Logger logger = LogManager.getLogger(TransportChannelResponseRunnable.class);

public TransportChannelResponseRunnable(String action, CheckedSupplier<TransportResponse, Exception> command, TransportChannel channel) {
this.action = action;
this.command = command;
this.channel = channel;
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"Failed to send error message back to client for action [{}]", action), inner);
}
}

@Override
protected void doRun() throws Exception {
channel.sendResponse(command.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,23 +313,20 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);

transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request, (SearchTask)task);
channel.sendResponse(result);

searchService.executeDfsPhaseAndSendResponse(DFS_ACTION_NAME, request, (SearchTask) task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
searchService.executeQueryPhaseAndSendResponse(QUERY_ACTION_NAME, request, (SearchTask)task, channel);
}
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.TransportChannelResponseRunnable;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -86,6 +89,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -94,6 +98,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
Expand Down Expand Up @@ -217,6 +222,12 @@ protected void doClose() {
keepAliveReaper.cancel();
}

public void executeDfsPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException {
getExecutor(request).execute(
new TransportChannelResponseRunnable(action, ()->executeDfsPhase(request, (SearchTask)task), channel)
);
}

public DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
context.incRef();
Expand Down Expand Up @@ -248,6 +259,12 @@ private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final Sea
}
}

public void executeQueryPhaseAndSendResponse(String action, ShardSearchRequest request, SearchTask task, TransportChannel channel) throws IOException {
getExecutor(request).execute(
new TransportChannelResponseRunnable(action, ()->executeQueryPhase(request, (SearchTask)task), channel)
);
}

public SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
Expand Down Expand Up @@ -784,6 +801,11 @@ private void processScroll(InternalScrollSearchRequest request, SearchContext co
}
}

private Executor getExecutor(ShardSearchRequest request) {
String executorName = request.isThrottled() ? Names.SEARCH_THROTTLED : Names.SEARCH;
return threadPool.executor(executorName);
}

/**
* Returns the number of active contexts in this
* SearchService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
private float indexBoost;
private SearchSourceBuilder source;
private Boolean requestCache;
private Boolean isThrottled;
private long nowInMillis;

private boolean profile;
Expand All @@ -78,7 +79,7 @@ public class ShardSearchLocalRequest implements ShardSearchRequest {
ShardSearchLocalRequest(SearchRequest searchRequest, ShardId shardId, int numberOfShards,
AliasFilter aliasFilter, float indexBoost, long nowInMillis) {
this(shardId, numberOfShards, searchRequest.searchType(),
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), aliasFilter, indexBoost);
searchRequest.source(), searchRequest.types(), searchRequest.requestCache(), searchRequest.isThrottled(), aliasFilter, indexBoost);
this.scroll = searchRequest.scroll();
this.nowInMillis = nowInMillis;
}
Expand All @@ -92,13 +93,14 @@ public ShardSearchLocalRequest(ShardId shardId, String[] types, long nowInMillis
}

public ShardSearchLocalRequest(ShardId shardId, int numberOfShards, SearchType searchType, SearchSourceBuilder source, String[] types,
Boolean requestCache, AliasFilter aliasFilter, float indexBoost) {
Boolean requestCache, Boolean isThrottled, AliasFilter aliasFilter, float indexBoost) {
this.shardId = shardId;
this.numberOfShards = numberOfShards;
this.searchType = searchType;
this.source = source;
this.types = types;
this.requestCache = requestCache;
this.isThrottled = isThrottled;
this.aliasFilter = aliasFilter;
this.indexBoost = indexBoost;
}
Expand Down Expand Up @@ -154,6 +156,11 @@ public Boolean requestCache() {
return requestCache;
}

@Override
public Boolean isThrottled() {
return isThrottled;
}

@Override
public Scroll scroll() {
return scroll;
Expand Down Expand Up @@ -193,6 +200,7 @@ protected void innerReadFrom(StreamInput in) throws IOException {
}
nowInMillis = in.readVLong();
requestCache = in.readOptionalBoolean();
isThrottled = in.readOptionalBoolean();
}

protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException {
Expand All @@ -212,6 +220,7 @@ protected void innerWriteTo(StreamOutput out, boolean asKey) throws IOException
out.writeVLong(nowInMillis);
}
out.writeOptionalBoolean(requestCache);
out.writeOptionalBoolean(isThrottled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public interface ShardSearchRequest {

Boolean requestCache();

Boolean isThrottled();

Scroll scroll();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public Boolean requestCache() {
return shardSearchLocalRequest.requestCache();
}

@Override
public Boolean isThrottled() {
return shardSearchLocalRequest.isThrottled();
}

@Override
public Scroll scroll() {
return shardSearchLocalRequest.scroll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 200)); // now that we reuse bulk for index/delete ops
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100));
builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, searchThreadPoolSize(availableProcessors) / 2, 100));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
Expand Down

0 comments on commit 6d4efc5

Please sign in to comment.