diff --git a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java
index 71779d47..0d925713 100644
--- a/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java
+++ b/cffu-core/src/main/java/io/foldright/cffu/ListenableFutureUtils.java
@@ -28,10 +28,25 @@ public class ListenableFutureUtils {
* Converts input {@link ListenableFuture} to {@link CompletableFuture}.
*
* Callback from ListenableFuture is executed using the given executor,
- * use {{@link MoreExecutors#directExecutor()}} if you need skip executor switch.
+ * use {@link MoreExecutors#directExecutor()} if you need skip executor switch.
+ *
+ * Cancelling the result {@link CompletableFuture} will also cancel inner {@link ListenableFuture}.
+ * Use param {@code mayInterruptIfRunning} to control whether to interrupt the thread of {@link ListenableFuture}.
+ *
+ * Note: CompletionException caused by this CancellationException is also considered cancellation.
+ *
+ * We encourage you to avoid using direct write methods in {@link CompletableFuture} so that the underlying
+ * {@link ListenableFuture} can benefit from cancel propagation.
+ *
+ * @param lf the wrapped ListenableFuture
+ * @param executor the executor
+ * @param mayInterruptIfRunning {@code true} if the thread of {@link ListenableFuture} should be interrupted when
+ * {@link CompletableFuture} canceled (if the thread is known to the implementation).
+ * @return the completable future
+ * @see CompletableFuture#cancel(boolean)
*/
@Contract(pure = true)
- public static CompletableFuture toCompletableFuture(ListenableFuture lf, Executor executor) {
+ public static CompletableFuture toCompletableFuture(ListenableFuture lf, Executor executor, boolean mayInterruptIfRunning) {
requireNonNull(lf, "listenableFuture is null");
CompletableFuture ret = new CompletableFuture() {
@@ -50,7 +65,12 @@ public String toString() {
}
};
// propagate cancellation by CancellationException from outer adapter to LF
- CompletableFutureUtils.peek(ret, (v, ex) -> lf.cancel(false));
+ CompletableFutureUtils.peek(ret, (v, ex) -> {
+ ex = CompletableFutureUtils.unwrapCfException(ex);
+ if (ex instanceof CancellationException) {
+ lf.cancel(mayInterruptIfRunning);
+ }
+ });
Futures.addCallback(lf, new FutureCallback() {
@Override
@@ -72,8 +92,8 @@ public void onFailure(Throwable ex) {
* Callback from ListenableFuture is executed using cffuFactory's default executor.
*/
@Contract(pure = true)
- public static Cffu toCffu(ListenableFuture lf, CffuFactory cffuFactory) {
- return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor()));
+ public static Cffu toCffu(ListenableFuture lf, CffuFactory cffuFactory, boolean mayInterruptIfRunning) {
+ return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), mayInterruptIfRunning));
}
/**
diff --git a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java
index 1a326d00..5ec0c364 100644
--- a/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java
+++ b/cffu-core/src/test/java/io/foldright/cffu/ListenableFutureUtilsTest.java
@@ -23,7 +23,7 @@ class ListenableFutureUtilsTest {
@Test
void test_toCompletableFuture() throws Exception {
final ListenableFuture lf = Futures.immediateFuture(n);
- final CompletableFuture cf = toCompletableFuture(lf, executorService);
+ final CompletableFuture cf = toCompletableFuture(lf, executorService, true);
assertEquals(n, cf.get());
assertTrue(cf.toString().startsWith(
"CompletableFutureAdapter@ListenableFutureUtils.toCompletableFuture of ListenableFuture(" + lf + "), ")
@@ -31,18 +31,18 @@ void test_toCompletableFuture() throws Exception {
ListenableFuture failed = Futures.immediateFailedFuture(rte);
assertSame(rte, assertThrowsExactly(ExecutionException.class,
- () -> toCompletableFuture(failed, executorService).get()
+ () -> toCompletableFuture(failed, executorService, true).get()
).getCause());
}
@Test
void test_toCffu() throws Exception {
ListenableFuture lf = Futures.immediateFuture(n);
- assertEquals(n, toCffu(lf, cffuFactory).get());
+ assertEquals(n, toCffu(lf, cffuFactory, true).get());
ListenableFuture failed = Futures.immediateFailedFuture(rte);
assertSame(rte, assertThrowsExactly(ExecutionException.class,
- () -> toCffu(failed, cffuFactory).get()
+ () -> toCffu(failed, cffuFactory, true).get()
).getCause());
}
@@ -102,7 +102,7 @@ void test_toListenableFuture_exception_java9plus() {
@Test
void test_lf2cf_cancellationAndPropagation() throws Exception {
final ListenableFuture lf = SettableFuture.create();
- final CompletableFuture cf = toCompletableFuture(lf, executorService);
+ final CompletableFuture cf = toCompletableFuture(lf, executorService, true);
assertTrue(cf.cancel(false));
waitForAllCfsToComplete(cf);
@@ -117,7 +117,7 @@ void test_lf2cf_cancellationAndPropagation() throws Exception {
@Test
void test_lf2cf_setCancellationExceptionToCf_cancellationAndPropagation() throws Exception {
final ListenableFuture lf = SettableFuture.create();
- final CompletableFuture cf = toCompletableFuture(lf, executorService);
+ final CompletableFuture cf = toCompletableFuture(lf, executorService, true);
assertTrue(cf.completeExceptionally(new CancellationException()));
waitForAllCfsToComplete(cf);
diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt
index 821dd19c..93c20b80 100644
--- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt
+++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/ListenableFutureExtensions.kt
@@ -26,16 +26,16 @@ import java.util.concurrent.Executor
* Callback from ListenableFuture is executed using the given executor,
* use {[MoreExecutors.directExecutor]} if you need skip executor switch.
*/
-fun ListenableFuture.toCompletableFuture(executor: Executor): CompletableFuture =
- ListenableFutureUtils.toCompletableFuture(this, executor)
+fun ListenableFuture.toCompletableFuture(executor: Executor, mayInterruptIfRunning: Boolean): CompletableFuture =
+ ListenableFutureUtils.toCompletableFuture(this, executor, mayInterruptIfRunning)
/**
* Converts input [ListenableFuture] to [Cffu].
*
* Callback from ListenableFuture is executed using cffuFactory's default executor.
*/
-fun ListenableFuture.toCffu(cffuFactory: CffuFactory): Cffu {
- return ListenableFutureUtils.toCffu(this, cffuFactory)
+fun ListenableFuture.toCffu(cffuFactory: CffuFactory, mayInterruptIfRunning: Boolean): Cffu {
+ return ListenableFutureUtils.toCffu(this, cffuFactory, mayInterruptIfRunning)
}
/**
diff --git a/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt b/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt
index ed7e76ee..ce99b448 100644
--- a/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt
+++ b/cffu-kotlin/src/test/java/io/foldright/cffu/test/ListenableFutureExtensionsKtTest.kt
@@ -17,18 +17,18 @@ import java.util.concurrent.ExecutionException
class ListenableFutureExtensionsKtTest : FunSpec({
test("toCompletableFuture") {
val lf = Futures.immediateFuture(n)
- lf.toCompletableFuture(testThreadPoolExecutor).get() shouldBe n
+ lf.toCompletableFuture(testThreadPoolExecutor, true).get() shouldBe n
val failed = Futures.immediateFailedFuture(rte)
- shouldThrowExactly { failed.toCompletableFuture(testThreadPoolExecutor).get() }.cause shouldBeSameInstanceAs rte
+ shouldThrowExactly { failed.toCompletableFuture(testThreadPoolExecutor, true).get() }.cause shouldBeSameInstanceAs rte
}
test("toCffu") {
val lf = Futures.immediateFuture(n)
- lf.toCffu(testCffuFactory).get() shouldBe n
+ lf.toCffu(testCffuFactory, true).get() shouldBe n
val failed = Futures.immediateFailedFuture(rte)
- shouldThrowExactly { failed.toCffu(testCffuFactory).get() }.cause shouldBeSameInstanceAs rte
+ shouldThrowExactly { failed.toCffu(testCffuFactory, true).get() }.cause shouldBeSameInstanceAs rte
}
test("toListenableFuture") {