Skip to content

Commit

Permalink
optimize ListenableFutureUtils#toCompletableFuture: support cancellat…
Browse files Browse the repository at this point in the history
…ion propagation, revise doc
  • Loading branch information
linzee1 committed Jul 22, 2024
1 parent e88baa6 commit 822bf45
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,25 @@ public class ListenableFutureUtils {
* Converts input {@link ListenableFuture} to {@link CompletableFuture}.
* <p>
* Callback from ListenableFuture is executed using the given executor,
* use {{@link MoreExecutors#directExecutor()}} if you need skip executor switch.
* use {@link MoreExecutors#directExecutor()} if you need skip executor switch.
* <p>
* Cancelling the result {@link CompletableFuture} will also cancel inner {@link ListenableFuture}.
* Use param {@code mayInterruptIfRunning} to control whether to interrupt the thread of {@link ListenableFuture}.
* <p>
* Note: CompletionException caused by this CancellationException is also considered cancellation.
* <p>
* We encourage you to avoid using direct write methods in {@link CompletableFuture} so that the underlying
* {@link ListenableFuture} can benefit from cancel propagation.
*
* @param lf the wrapped ListenableFuture
* @param executor the executor
* @param mayInterruptIfRunning {@code true} if the thread of {@link ListenableFuture} should be interrupted when
* {@link CompletableFuture} canceled (if the thread is known to the implementation).
* @return the completable future
* @see CompletableFuture#cancel(boolean)
*/
@Contract(pure = true)
public static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf, Executor executor) {
public static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf, Executor executor, boolean mayInterruptIfRunning) {
requireNonNull(lf, "listenableFuture is null");

CompletableFuture<T> ret = new CompletableFuture<T>() {
Expand All @@ -50,14 +65,10 @@ public String toString() {
}
};
// propagate cancellation by CancellationException from outer adapter to LF
ret.whenComplete((result, throwable) -> {
if (ret.isCancelled()) {
lf.cancel(false);
} else {
Throwable cause = CompletableFutureUtils.unwrapCfException(throwable);
if (cause instanceof CancellationException) {
lf.cancel(false);
}
ret.whenComplete((v, ex) -> {
ex = CompletableFutureUtils.unwrapCfException(ex);
if (ex instanceof CancellationException) {
lf.cancel(mayInterruptIfRunning);
}
});

Expand All @@ -81,8 +92,8 @@ public void onFailure(Throwable ex) {
* Callback from ListenableFuture is executed using cffuFactory's default executor.
*/
@Contract(pure = true)
public static <T> Cffu<T> toCffu(ListenableFuture<T> lf, CffuFactory cffuFactory) {
return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor()));
public static <T> Cffu<T> toCffu(ListenableFuture<T> lf, CffuFactory cffuFactory, boolean mayInterruptIfRunning) {
return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), mayInterruptIfRunning));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@ class ListenableFutureUtilsTest {
@Test
void test_toCompletableFuture() throws Exception {
final ListenableFuture<Integer> lf = Futures.immediateFuture(n);
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService);
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);
assertEquals(n, cf.get());
assertTrue(cf.toString().startsWith(
"CompletableFutureAdapter@ListenableFutureUtils.toCompletableFuture of ListenableFuture(" + lf + "), ")
);

ListenableFuture<Integer> failed = Futures.immediateFailedFuture(rte);
assertSame(rte, assertThrowsExactly(ExecutionException.class,
() -> toCompletableFuture(failed, executorService).get()
() -> toCompletableFuture(failed, executorService, true).get()
).getCause());
}

@Test
void test_toCffu() throws Exception {
ListenableFuture<Integer> lf = Futures.immediateFuture(n);
assertEquals(n, toCffu(lf, cffuFactory).get());
assertEquals(n, toCffu(lf, cffuFactory, true).get());

ListenableFuture<Integer> failed = Futures.immediateFailedFuture(rte);
assertSame(rte, assertThrowsExactly(ExecutionException.class,
() -> toCffu(failed, cffuFactory).get()
() -> toCffu(failed, cffuFactory, true).get()
).getCause());
}

Expand Down Expand Up @@ -102,7 +102,7 @@ void test_toListenableFuture_exception_java9plus() {
@Test
void test_lf2cf_cancellationAndPropagation() throws Exception {
final ListenableFuture<Integer> lf = SettableFuture.create();
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService);
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);

assertTrue(cf.cancel(false));
waitForAllCfsToComplete(cf);
Expand All @@ -117,7 +117,7 @@ void test_lf2cf_cancellationAndPropagation() throws Exception {
@Test
void test_lf2cf_setCancellationExceptionToCf_cancellationAndPropagation() throws Exception {
final ListenableFuture<Integer> lf = SettableFuture.create();
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService);
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);

assertTrue(cf.completeExceptionally(new CancellationException()));
waitForAllCfsToComplete(cf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import java.util.concurrent.Executor
* Callback from ListenableFuture is executed using the given executor,
* use {[MoreExecutors.directExecutor]} if you need skip executor switch.
*/
fun <T> ListenableFuture<T>.toCompletableFuture(executor: Executor): CompletableFuture<T> =
ListenableFutureUtils.toCompletableFuture(this, executor)
fun <T> ListenableFuture<T>.toCompletableFuture(executor: Executor, mayInterruptIfRunning: Boolean): CompletableFuture<T> =
ListenableFutureUtils.toCompletableFuture(this, executor, mayInterruptIfRunning)

/**
* Converts input [ListenableFuture] to [Cffu].
*
* Callback from ListenableFuture is executed using cffuFactory's default executor.
*/
fun <T> ListenableFuture<T>.toCffu(cffuFactory: CffuFactory): Cffu<T> {
return ListenableFutureUtils.toCffu(this, cffuFactory)
fun <T> ListenableFuture<T>.toCffu(cffuFactory: CffuFactory, mayInterruptIfRunning: Boolean): Cffu<T> {
return ListenableFutureUtils.toCffu(this, cffuFactory, mayInterruptIfRunning)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import java.util.concurrent.ExecutionException
class ListenableFutureExtensionsKtTest : FunSpec({
test("toCompletableFuture") {
val lf = Futures.immediateFuture(n)
lf.toCompletableFuture(testThreadPoolExecutor).get() shouldBe n
lf.toCompletableFuture(testThreadPoolExecutor, true).get() shouldBe n

val failed = Futures.immediateFailedFuture<Int>(rte)
shouldThrowExactly<ExecutionException> { failed.toCompletableFuture(testThreadPoolExecutor).get() }.cause shouldBeSameInstanceAs rte
shouldThrowExactly<ExecutionException> { failed.toCompletableFuture(testThreadPoolExecutor, true).get() }.cause shouldBeSameInstanceAs rte
}

test("toCffu") {
val lf = Futures.immediateFuture(n)
lf.toCffu(testCffuFactory).get() shouldBe n
lf.toCffu(testCffuFactory, true).get() shouldBe n

val failed = Futures.immediateFailedFuture<Int>(rte)
shouldThrowExactly<ExecutionException> { failed.toCffu(testCffuFactory).get() }.cause shouldBeSameInstanceAs rte
shouldThrowExactly<ExecutionException> { failed.toCffu(testCffuFactory, true).get() }.cause shouldBeSameInstanceAs rte
}

test("toListenableFuture") {
Expand Down

0 comments on commit 822bf45

Please sign in to comment.