From 5dcd812e4d103316dc5d5a1a92f184e0a2d0d163 Mon Sep 17 00:00:00 2001 From: EricLin Date: Sun, 21 Jul 2024 13:53:04 +0800 Subject: [PATCH 1/2] fix ListenableFutureUtils#toCompletableFuture cancellation propagation --- .../main/java/io/foldright/cffu/ListenableFutureUtils.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java index 71779d47..332db4f2 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java @@ -50,7 +50,12 @@ public String toString() { } }; // propagate cancellation by CancellationException from outer adapter to LF - CompletableFutureUtils.peek(ret, (v, ex) -> lf.cancel(false)); + CompletableFutureUtils.peek(ret, (v, ex) -> { + ex = CompletableFutureUtils.unwrapCfException(ex); + if (ex instanceof CancellationException) { + lf.cancel(false); + } + }); Futures.addCallback(lf, new FutureCallback() { @Override From d5cdb11401dcb630899f298f591c5b5f7bd4ca03 Mon Sep 17 00:00:00 2001 From: EricLin Date: Mon, 22 Jul 2024 19:07:27 +0800 Subject: [PATCH 2/2] optimize ListenableFutureUtils#toCompletableFuture: support cancellation propagation, revise doc --- .../foldright/cffu/ListenableFutureUtils.java | 25 +++++++++++++++---- .../cffu/ListenableFutureUtilsTest.java | 12 ++++----- .../cffu/kotlin/ListenableFutureExtensions.kt | 8 +++--- .../test/ListenableFutureExtensionsKtTest.kt | 8 +++--- 4 files changed, 34 insertions(+), 19 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java index 332db4f2..0d925713 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java @@ -28,10 +28,25 @@ public class ListenableFutureUtils { * Converts input {@link ListenableFuture} to {@link CompletableFuture}. *

* Callback from ListenableFuture is executed using the given executor, - * use {{@link MoreExecutors#directExecutor()}} if you need skip executor switch. + * use {@link MoreExecutors#directExecutor()} if you need skip executor switch. + *

+ * Cancelling the result {@link CompletableFuture} will also cancel inner {@link ListenableFuture}. + * Use param {@code mayInterruptIfRunning} to control whether to interrupt the thread of {@link ListenableFuture}. + *

+ * Note: CompletionException caused by this CancellationException is also considered cancellation. + *

+ * We encourage you to avoid using direct write methods in {@link CompletableFuture} so that the underlying + * {@link ListenableFuture} can benefit from cancel propagation. + * + * @param lf the wrapped ListenableFuture + * @param executor the executor + * @param mayInterruptIfRunning {@code true} if the thread of {@link ListenableFuture} should be interrupted when + * {@link CompletableFuture} canceled (if the thread is known to the implementation). + * @return the completable future + * @see CompletableFuture#cancel(boolean) */ @Contract(pure = true) - public static CompletableFuture toCompletableFuture(ListenableFuture lf, Executor executor) { + public static CompletableFuture toCompletableFuture(ListenableFuture lf, Executor executor, boolean mayInterruptIfRunning) { requireNonNull(lf, "listenableFuture is null"); CompletableFuture ret = new CompletableFuture() { @@ -53,7 +68,7 @@ public String toString() { CompletableFutureUtils.peek(ret, (v, ex) -> { ex = CompletableFutureUtils.unwrapCfException(ex); if (ex instanceof CancellationException) { - lf.cancel(false); + lf.cancel(mayInterruptIfRunning); } }); @@ -77,8 +92,8 @@ public void onFailure(Throwable ex) { * Callback from ListenableFuture is executed using cffuFactory's default executor. */ @Contract(pure = true) - public static Cffu toCffu(ListenableFuture lf, CffuFactory cffuFactory) { - return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor())); + public static Cffu toCffu(ListenableFuture lf, CffuFactory cffuFactory, boolean mayInterruptIfRunning) { + return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), mayInterruptIfRunning)); } /** diff --git a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java index 1a326d00..5ec0c364 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java @@ -23,7 +23,7 @@ class ListenableFutureUtilsTest { @Test void test_toCompletableFuture() throws Exception { final ListenableFuture lf = Futures.immediateFuture(n); - final CompletableFuture cf = toCompletableFuture(lf, executorService); + final CompletableFuture cf = toCompletableFuture(lf, executorService, true); assertEquals(n, cf.get()); assertTrue(cf.toString().startsWith( "CompletableFutureAdapter@ListenableFutureUtils.toCompletableFuture of ListenableFuture(" + lf + "), ") @@ -31,18 +31,18 @@ void test_toCompletableFuture() throws Exception { ListenableFuture failed = Futures.immediateFailedFuture(rte); assertSame(rte, assertThrowsExactly(ExecutionException.class, - () -> toCompletableFuture(failed, executorService).get() + () -> toCompletableFuture(failed, executorService, true).get() ).getCause()); } @Test void test_toCffu() throws Exception { ListenableFuture lf = Futures.immediateFuture(n); - assertEquals(n, toCffu(lf, cffuFactory).get()); + assertEquals(n, toCffu(lf, cffuFactory, true).get()); ListenableFuture failed = Futures.immediateFailedFuture(rte); assertSame(rte, assertThrowsExactly(ExecutionException.class, - () -> toCffu(failed, cffuFactory).get() + () -> toCffu(failed, cffuFactory, true).get() ).getCause()); } @@ -102,7 +102,7 @@ void test_toListenableFuture_exception_java9plus() { @Test void test_lf2cf_cancellationAndPropagation() throws Exception { final ListenableFuture lf = SettableFuture.create(); - final CompletableFuture cf = toCompletableFuture(lf, executorService); + final CompletableFuture cf = toCompletableFuture(lf, executorService, true); assertTrue(cf.cancel(false)); waitForAllCfsToComplete(cf); @@ -117,7 +117,7 @@ void test_lf2cf_cancellationAndPropagation() throws Exception { @Test void test_lf2cf_setCancellationExceptionToCf_cancellationAndPropagation() throws Exception { final ListenableFuture lf = SettableFuture.create(); - final CompletableFuture cf = toCompletableFuture(lf, executorService); + final CompletableFuture cf = toCompletableFuture(lf, executorService, true); assertTrue(cf.completeExceptionally(new CancellationException())); waitForAllCfsToComplete(cf); diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt index 821dd19c..93c20b80 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt @@ -26,16 +26,16 @@ import java.util.concurrent.Executor * Callback from ListenableFuture is executed using the given executor, * use {[MoreExecutors.directExecutor]} if you need skip executor switch. */ -fun ListenableFuture.toCompletableFuture(executor: Executor): CompletableFuture = - ListenableFutureUtils.toCompletableFuture(this, executor) +fun ListenableFuture.toCompletableFuture(executor: Executor, mayInterruptIfRunning: Boolean): CompletableFuture = + ListenableFutureUtils.toCompletableFuture(this, executor, mayInterruptIfRunning) /** * Converts input [ListenableFuture] to [Cffu]. * * Callback from ListenableFuture is executed using cffuFactory's default executor. */ -fun ListenableFuture.toCffu(cffuFactory: CffuFactory): Cffu { - return ListenableFutureUtils.toCffu(this, cffuFactory) +fun ListenableFuture.toCffu(cffuFactory: CffuFactory, mayInterruptIfRunning: Boolean): Cffu { + return ListenableFutureUtils.toCffu(this, cffuFactory, mayInterruptIfRunning) } /** diff --git a/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt b/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt index ed7e76ee..ce99b448 100644 --- a/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt +++ b/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt @@ -17,18 +17,18 @@ import java.util.concurrent.ExecutionException class ListenableFutureExtensionsKtTest : FunSpec({ test("toCompletableFuture") { val lf = Futures.immediateFuture(n) - lf.toCompletableFuture(testThreadPoolExecutor).get() shouldBe n + lf.toCompletableFuture(testThreadPoolExecutor, true).get() shouldBe n val failed = Futures.immediateFailedFuture(rte) - shouldThrowExactly { failed.toCompletableFuture(testThreadPoolExecutor).get() }.cause shouldBeSameInstanceAs rte + shouldThrowExactly { failed.toCompletableFuture(testThreadPoolExecutor, true).get() }.cause shouldBeSameInstanceAs rte } test("toCffu") { val lf = Futures.immediateFuture(n) - lf.toCffu(testCffuFactory).get() shouldBe n + lf.toCffu(testCffuFactory, true).get() shouldBe n val failed = Futures.immediateFailedFuture(rte) - shouldThrowExactly { failed.toCffu(testCffuFactory).get() }.cause shouldBeSameInstanceAs rte + shouldThrowExactly { failed.toCffu(testCffuFactory, true).get() }.cause shouldBeSameInstanceAs rte } test("toListenableFuture") {