Skip to content

Commit

Permalink
TaskExecutor should not fork unnecessarily (#13472)
Browse files Browse the repository at this point in the history
When an executor is provided to the IndexSearcher constructor, the searcher now executes tasks on the thread that invoked a search as well as its configured executor. Users should reduce the executor's thread-count by
1 to retain the previous level of parallelism. Moreover, it is now possible to start searches from the same executor that is configured in the IndexSearcher without risk of deadlocking. A separate executor for starting searches is no longer required.

Previously, a separate executor was required to prevent deadlock, and all the tasks were offloaded to it unconditionally, wasting resources in some scenarios due to unnecessary forking, and the caller thread having to wait for all tasks to be completed anyways. it can now actively contribute to the execution as well.

Co-authored-by: Armin Braun <[email protected]>
  • Loading branch information
javanna and original-brownbear authored Jul 4, 2024
1 parent 48816b4 commit e78f237
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 64 deletions.
9 changes: 9 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ Optimizations

* GITHUB#12941: Don't preserve auxiliary buffer contents in LSBRadixSorter if it grows. (Stefan Vodita)

Changes in runtime behavior
---------------------

* GITHUB#13472: When an executor is provided to the IndexSearcher constructor, the searcher now executes tasks on the
thread that invoked a search as well as its configured executor. Users should reduce the executor's thread-count by 1
to retain the previous level of parallelism. Moreover, it is now possible to start searches from the same executor
that is configured in the IndexSearcher without risk of deadlocking. A separate executor for starting searches is no
longer required. (Armin Braun)

Bug Fixes
---------------------

Expand Down
85 changes: 41 additions & 44 deletions lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,21 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;

/**
* Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search
* across segments as well as query rewrite in some cases. Exposes a single {@link
* #invokeAll(Collection)} method that takes a collection of {@link Callable}s and executes them
* concurrently/ Once all tasks are submitted to the executor, it blocks and wait for all tasks to
* be completed, and then returns a list with the obtained results. Ensures that the underlying
* executor is only used for top-level {@link #invokeAll(Collection)} calls, and not for potential
* {@link #invokeAll(Collection)} calls made from one of the tasks. This is to prevent deadlock with
* certain types of pool based executors (e.g. {@link java.util.concurrent.ThreadPoolExecutor}).
* concurrently. Once all but one task have been submitted to the executor, it tries to run as many
* tasks as possible on the calling thread, then waits for all tasks that have been executed in
* parallel on the executor to be completed and then returns a list with the obtained results.
*
* @lucene.experimental
*/
public final class TaskExecutor {
// a static thread local is ok as long as we use a counter, which accounts for multiple
// searchers holding a different TaskExecutor all backed by the same executor
private static final ThreadLocal<Integer> numberOfRunningTasksInCurrentThread =
ThreadLocal.withInitial(() -> 0);

private final Executor executor;

/**
Expand Down Expand Up @@ -84,26 +78,21 @@ public String toString() {
/**
* Holds all the sub-tasks that a certain operation gets split into as it gets parallelized and
* exposes the ability to invoke such tasks and wait for them all to complete their execution and
* provide their results. Ensures that each task does not get parallelized further: this is
* important to avoid a deadlock in situations where one executor thread waits on other executor
* threads to complete before it can progress. This happens in situations where for instance
* {@link Query#createWeight(IndexSearcher, ScoreMode, float)} is called as part of searching each
* slice, like {@link TopFieldCollector#populateScores(ScoreDoc[], IndexSearcher, Query)} does.
* Additionally, if one task throws an exception, all other tasks from the same group are
* cancelled, to avoid needless computation as their results would not be exposed anyways. Creates
* one {@link FutureTask} for each {@link Callable} provided
* provide their results. Additionally, if one task throws an exception, all other tasks from the
* same group are cancelled, to avoid needless computation as their results would not be exposed
* anyways. Creates one {@link FutureTask} for each {@link Callable} provided
*
* @param <T> the return type of all the callables
*/
private static final class TaskGroup<T> {
private final Collection<RunnableFuture<T>> futures;
private final List<RunnableFuture<T>> futures;

TaskGroup(Collection<Callable<T>> callables) {
List<RunnableFuture<T>> tasks = new ArrayList<>(callables.size());
for (Callable<T> callable : callables) {
tasks.add(createTask(callable));
}
this.futures = Collections.unmodifiableCollection(tasks);
this.futures = Collections.unmodifiableList(tasks);
}

RunnableFuture<T> createTask(Callable<T> callable) {
Expand All @@ -112,15 +101,10 @@ RunnableFuture<T> createTask(Callable<T> callable) {
() -> {
if (startedOrCancelled.compareAndSet(false, true)) {
try {
Integer counter = numberOfRunningTasksInCurrentThread.get();
numberOfRunningTasksInCurrentThread.set(counter + 1);
return callable.call();
} catch (Throwable t) {
cancelAll();
throw t;
} finally {
Integer counter = numberOfRunningTasksInCurrentThread.get();
numberOfRunningTasksInCurrentThread.set(counter - 1);
}
}
// task is cancelled hence it has no results to return. That's fine: they would be
Expand All @@ -144,32 +128,45 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

List<T> invokeAll(Executor executor) throws IOException {
boolean runOnCallerThread = numberOfRunningTasksInCurrentThread.get() > 0;
for (Runnable runnable : futures) {
if (runOnCallerThread) {
runnable.run();
} else {
executor.execute(runnable);
final int count = futures.size();
// taskId provides the first index of an un-executed task in #futures
final AtomicInteger taskId = new AtomicInteger(0);
// we fork execution count - 1 tasks to execute at least one task on the current thread to
// minimize needless forking and blocking of the current thread
if (count > 1) {
final Runnable work =
() -> {
int id = taskId.getAndIncrement();
if (id < count) {
futures.get(id).run();
}
};
for (int j = 0; j < count - 1; j++) {
executor.execute(work);
}
}
// try to execute as many tasks as possible on the current thread to minimize context
// switching in case of long running concurrent
// tasks as well as dead-locking if the current thread is part of #executor for executors that
// have limited or no parallelism
int id;
while ((id = taskId.getAndIncrement()) < count) {
futures.get(id).run();
if (id >= count - 1) {
// save redundant CAS in case this was the last task
break;
}
}
Throwable exc = null;
List<T> results = new ArrayList<>(futures.size());
for (Future<T> future : futures) {
List<T> results = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Future<T> future = futures.get(i);
try {
results.add(future.get());
} catch (InterruptedException e) {
var newException = new ThreadInterruptedException(e);
if (exc == null) {
exc = newException;
} else {
exc.addSuppressed(newException);
}
exc = IOUtils.useOrSuppress(exc, new ThreadInterruptedException(e));
} catch (ExecutionException e) {
if (exc == null) {
exc = e.getCause();
} else {
exc.addSuppressed(e.getCause());
}
exc = IOUtils.useOrSuppress(exc, e.getCause());
}
}
assert assertAllFuturesCompleted() : "Some tasks are still running?";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
IOUtils.close(r, dir);
}

public void testSlicesAllOffloadedToTheExecutor() throws IOException {
public void testSlicesOffloadedToTheExecutor() throws IOException {
List<LeafReaderContext> leaves = reader.leaves();
AtomicInteger numExecutions = new AtomicInteger(0);
IndexSearcher searcher =
Expand All @@ -286,7 +286,7 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
}
};
searcher.search(new MatchAllDocsQuery(), 10);
assertEquals(leaves.size(), numExecutions.get());
assertEquals(leaves.size() - 1, numExecutions.get());
}

public void testNullExecutorNonNullTaskExecutor() {
Expand Down
56 changes: 38 additions & 18 deletions lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
Expand Down Expand Up @@ -111,15 +108,28 @@ public void testUnwrappedExceptions() {
assertEquals("exc", runtimeException.getCause().getMessage());
}

public void testInvokeAllFromTaskDoesNotDeadlockSameSearcher() throws IOException {
public void testInvokeAllFromTaskDoesNotDeadlockSameSearcher() throws Exception {
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(executorService);
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(Runnable::run);
executorService
.submit(
() -> {
doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(executorService);
return null;
})
.get();
}

private static void doTestInvokeAllFromTaskDoesNotDeadlockSameSearcher(Executor executor)
throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 500; i++) {
iw.addDocument(new Document());
}
try (DirectoryReader reader = iw.getReader()) {
IndexSearcher searcher =
new IndexSearcher(reader, executorService) {
new IndexSearcher(reader, executor) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slices(leaves, 1, 1);
Expand Down Expand Up @@ -172,15 +182,28 @@ public Void reduce(Collection<Collector> collectors) {
}
}

public void testInvokeAllFromTaskDoesNotDeadlockMultipleSearchers() throws IOException {
public void testInvokeAllFromTaskDoesNotDeadlockMultipleSearchers() throws Exception {
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(executorService);
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(Runnable::run);
executorService
.submit(
() -> {
doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(executorService);
return null;
})
.get();
}

private static void doTestInvokeAllFromTaskDoesNotDeadlockMultipleSearchers(Executor executor)
throws IOException {
try (Directory dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir)) {
for (int i = 0; i < 500; i++) {
iw.addDocument(new Document());
}
try (DirectoryReader reader = iw.getReader()) {
IndexSearcher searcher =
new IndexSearcher(reader, executorService) {
new IndexSearcher(reader, executor) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slices(leaves, 1, 1);
Expand All @@ -202,7 +225,7 @@ public void setScorer(Scorable scorer) throws IOException {
// searcher has its own
// TaskExecutor, the safeguard is shared among all the searchers that get
// the same executor
IndexSearcher indexSearcher = new IndexSearcher(reader, executorService);
IndexSearcher indexSearcher = new IndexSearcher(reader, executor);
indexSearcher
.getTaskExecutor()
.invokeAll(Collections.singletonList(() -> null));
Expand Down Expand Up @@ -234,11 +257,8 @@ public void testInvokeAllDoesNotLeaveTasksBehind() {
TaskExecutor taskExecutor =
new TaskExecutor(
command -> {
executorService.execute(
() -> {
tasksStarted.incrementAndGet();
command.run();
});
tasksStarted.incrementAndGet();
command.run();
});
AtomicInteger tasksExecuted = new AtomicInteger(0);
List<Callable<Void>> callables = new ArrayList<>();
Expand All @@ -251,14 +271,14 @@ public void testInvokeAllDoesNotLeaveTasksBehind() {
for (int i = 0; i < tasksWithNormalExit; i++) {
callables.add(
() -> {
tasksExecuted.incrementAndGet();
return null;
throw new AssertionError(
"must not be called since the first task failing cancels all subsequent tasks");
});
}
expectThrows(RuntimeException.class, () -> taskExecutor.invokeAll(callables));
assertEquals(1, tasksExecuted.get());
// the callables are technically all run, but the cancelled ones will be no-op
assertEquals(100, tasksStarted.get());
assertEquals(tasksWithNormalExit, tasksStarted.get());
}

/**
Expand Down Expand Up @@ -308,7 +328,7 @@ public void testInvokeAllCatchesMultipleExceptions() {
}

public void testCancelTasksOnException() {
TaskExecutor taskExecutor = new TaskExecutor(executorService);
TaskExecutor taskExecutor = new TaskExecutor(Runnable::run);
final int numTasks = TestUtil.nextInt(random(), 10, 50);
final int throwingTask = random().nextInt(numTasks);
boolean error = random().nextBoolean();
Expand Down

0 comments on commit e78f237

Please sign in to comment.