diff --git a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java index 0d925713..328bf25c 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java @@ -30,24 +30,31 @@ public class ListenableFutureUtils { * Callback from ListenableFuture is executed using the given executor, * use {@link MoreExecutors#directExecutor()} if you need skip executor switch. *

- * Cancelling the result {@link CompletableFuture} will also cancel inner {@link ListenableFuture}. - * Use param {@code mayInterruptIfRunning} to control whether to interrupt the thread of {@link ListenableFuture}. + * Cancelling({@link Future#cancel(boolean)}) the returned CompletableFuture + * will also cancel underlying ListenableFuture. *

- * Note: CompletionException caused by this CancellationException is also considered cancellation. + * Use param {@code interruptLfWhenCancellationException} to control whether to cancel ListenableFuture with + * interruption when CancellationException occurred (including CompletionException/ExecutionException with + * CancellationException cause, more info see {@link CompletableFutureUtils#unwrapCfException(Throwable)}). *

- * We encourage you to avoid using direct write methods in {@link CompletableFuture} so that the underlying - * {@link ListenableFuture} can benefit from cancel propagation. + * It's recommended to avoid using direct write methods(e.g. {@link CompletableFuture#complete(Object)}, + * {@link CompletableFuture#completeExceptionally(Throwable)}) of the returned CompletableFuture: + *

* - * @param lf the wrapped ListenableFuture - * @param executor the executor - * @param mayInterruptIfRunning {@code true} if the thread of {@link ListenableFuture} should be interrupted when - * {@link CompletableFuture} canceled (if the thread is known to the implementation). - * @return the completable future + * @param lf the underlying ListenableFuture + * @param executor the executor to use for ListenableFuture callback execution + * @param interruptLfWhenCancellationException whether to cancel ListenableFuture with interruption when CancellationException occurred + * @return the CompletableFuture adapter * @see CompletableFuture#cancel(boolean) */ @Contract(pure = true) - public static CompletableFuture toCompletableFuture(ListenableFuture lf, Executor executor, boolean mayInterruptIfRunning) { + public static CompletableFuture toCompletableFuture( + ListenableFuture lf, Executor executor, boolean interruptLfWhenCancellationException) { requireNonNull(lf, "listenableFuture is null"); + requireNonNull(executor, "executor is null"); CompletableFuture ret = new CompletableFuture() { @Override @@ -68,7 +75,7 @@ public String toString() { CompletableFutureUtils.peek(ret, (v, ex) -> { ex = CompletableFutureUtils.unwrapCfException(ex); if (ex instanceof CancellationException) { - lf.cancel(mayInterruptIfRunning); + lf.cancel(interruptLfWhenCancellationException); } }); @@ -90,10 +97,13 @@ public void onFailure(Throwable ex) { * Converts input {@link ListenableFuture} to {@link Cffu}. *

* Callback from ListenableFuture is executed using cffuFactory's default executor. + *

+ * More info see {@link #toCompletableFuture(ListenableFuture, Executor, boolean)}. */ @Contract(pure = true) - public static Cffu toCffu(ListenableFuture lf, CffuFactory cffuFactory, boolean mayInterruptIfRunning) { - return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), mayInterruptIfRunning)); + public static Cffu toCffu( + ListenableFuture lf, CffuFactory cffuFactory, boolean interruptLfWhenCancellationException) { + return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), interruptLfWhenCancellationException)); } /** diff --git a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java index 5ec0c364..ca61b384 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java @@ -2,6 +2,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.foldright.test_utils.TestThreadPoolManager; import org.junit.jupiter.api.AfterAll; @@ -116,17 +117,59 @@ void test_lf2cf_cancellationAndPropagation() throws Exception { @Test void test_lf2cf_setCancellationExceptionToCf_cancellationAndPropagation() throws Exception { - final ListenableFuture lf = SettableFuture.create(); - final CompletableFuture cf = toCompletableFuture(lf, executorService, true); + { + final ListenableFuture lf = SettableFuture.create(); + final CompletableFuture cf = toCompletableFuture(lf, executorService, true); + + assertTrue(cf.completeExceptionally(new CancellationException())); + waitForAllCfsToComplete(cf); + waitForAllLfsToComplete(lf); + + assertTrue(lf.isCancelled()); + assertThrowsExactly(CancellationException.class, lf::get); + assertTrue(cf.isCancelled()); + assertThrowsExactly(CancellationException.class, cf::get); + } + { + final ListenableFuture lf = SettableFuture.create(); + final CompletableFuture cf = toCompletableFuture(lf, executorService, true); - assertTrue(cf.completeExceptionally(new CancellationException())); - waitForAllCfsToComplete(cf); - waitForAllLfsToComplete(lf); + assertTrue(cf.completeExceptionally(new IllegalArgumentException())); + sleep(100); + waitForAllCfsToComplete(cf); - assertTrue(lf.isCancelled()); - assertThrowsExactly(CancellationException.class, lf::get); - assertTrue(cf.isCancelled()); - assertThrowsExactly(CancellationException.class, cf::get); + assertFalse(lf.isDone()); + } + } + + @Test + void test_lf2cf_dependentLf() throws Exception { + { + CompletableFuture cf = CompletableFuture.supplyAsync(() -> n, executorService); + CompletableFuture cfWrapperOfLf = cf.thenCompose(v -> { + ListenableFuture lf = Futures.submit(() -> v + 1, executorService); + return toCompletableFuture(lf, MoreExecutors.directExecutor(), true); + }); + assertEquals(n + 1, cfWrapperOfLf.join()); + } + { + final ListenableFuture lf = SettableFuture.create(); + CompletableFuture cf = new CompletableFuture<>(); + + CompletableFuture cfWrapperOfLf = cf.thenCompose(v -> + toCompletableFuture(lf, MoreExecutors.directExecutor(), true)); + + cf.cancel(false); + sleep(20); + + assertTrue(cf.isCancelled()); + assertThrowsExactly(CancellationException.class, cf::get); + assertFalse(cfWrapperOfLf.isCancelled()); + final ExecutionException ee = assertThrowsExactly(ExecutionException.class, cfWrapperOfLf::get); + assertInstanceOf(CancellationException.class, ee.getCause()); + + assertFalse(lf.isDone()); + } } @Test @@ -174,8 +217,8 @@ void showCase_CompletableFuture_cancellationAndPropagation() throws Exception { // 🚫 CompletableFuture does NOT SUPPORT the propagation of cancellation // the CancellationException is wrapped by CompletionException or ExecutionException assertFalse(transform.isCancelled()); - final ExecutionException ce = assertThrowsExactly(ExecutionException.class, transform::get); - assertInstanceOf(CancellationException.class, ce.getCause()); + final ExecutionException ee = assertThrowsExactly(ExecutionException.class, transform::get); + assertInstanceOf(CancellationException.class, ee.getCause()); } // endregion diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt index 93c20b80..14398f8e 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt @@ -15,9 +15,9 @@ import java.util.concurrent.Executor --------------------- Implementation Note: --------------------- - the methods of this file MUST NOT be defined in CompletableFutureExtensions or CffuExtensions; + The methods of this file MUST NOT be defined in `CompletableFutureExtensions` or `CffuExtensions`; Otherwise `NoClassDefFoundError` when loading CompletableFutureExtensions/CffuExtensions - if ListenableFuture class(`ClassNotFoundException` aka. guava dependency) absent. + if `ListenableFuture` class(`ClassNotFoundException` aka. `Guava` dependency) is absent. */ /** @@ -26,17 +26,18 @@ import java.util.concurrent.Executor * Callback from ListenableFuture is executed using the given executor, * use {[MoreExecutors.directExecutor]} if you need skip executor switch. */ -fun ListenableFuture.toCompletableFuture(executor: Executor, mayInterruptIfRunning: Boolean): CompletableFuture = - ListenableFutureUtils.toCompletableFuture(this, executor, mayInterruptIfRunning) +fun ListenableFuture.toCompletableFuture( + executor: Executor, interruptLfWhenCancellationException: Boolean +): CompletableFuture = + ListenableFutureUtils.toCompletableFuture(this, executor, interruptLfWhenCancellationException) /** * Converts input [ListenableFuture] to [Cffu]. * * Callback from ListenableFuture is executed using cffuFactory's default executor. */ -fun ListenableFuture.toCffu(cffuFactory: CffuFactory, mayInterruptIfRunning: Boolean): Cffu { - return ListenableFutureUtils.toCffu(this, cffuFactory, mayInterruptIfRunning) -} +fun ListenableFuture.toCffu(cffuFactory: CffuFactory, interruptLfWhenCancellationException: Boolean): Cffu = + ListenableFutureUtils.toCffu(this, cffuFactory, interruptLfWhenCancellationException) /** * Converts input [CompletableFuture] to [ListenableFuture].