Skip to content

Commit

Permalink
refactor: rename parameter and revise javadoc of `ListenableFutureUti…
Browse files Browse the repository at this point in the history
…ls.toCompletableFuture/toCffu`
  • Loading branch information
oldratlee committed Jul 27, 2024
1 parent 1ea8888 commit 4f20735
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,31 @@ public class ListenableFutureUtils {
* Callback from ListenableFuture is executed using the given executor,
* use {@link MoreExecutors#directExecutor()} if you need skip executor switch.
* <p>
* Cancelling the result {@link CompletableFuture} will also cancel inner {@link ListenableFuture}.
* Use param {@code mayInterruptIfRunning} to control whether to interrupt the thread of {@link ListenableFuture}.
* Cancelling({@link Future#cancel(boolean)}) the returned CompletableFuture
* will also cancel underlying ListenableFuture.
* <p>
* Note: CompletionException caused by this CancellationException is also considered cancellation.
* Use param {@code interruptLfWhenCancellationException} to control whether to cancel ListenableFuture with
* interruption when CancellationException occurred (including CompletionException/ExecutionException with
* CancellationException cause, more info see {@link CompletableFutureUtils#unwrapCfException(Throwable)}).
* <p>
* We encourage you to avoid using direct write methods in {@link CompletableFuture} so that the underlying
* {@link ListenableFuture} can benefit from cancel propagation.
* It's recommended to avoid using direct write methods(e.g. {@link CompletableFuture#complete(Object)},
* {@link CompletableFuture#completeExceptionally(Throwable)}) of the returned CompletableFuture:
* <ul>
* <li>the underlying ListenableFuture can benefit from cancellation propagation.
* <li>the writing of the returned CompletableFuture won't affect the underlying ListenableFuture.
* </ul>
*
* @param lf the wrapped ListenableFuture
* @param executor the executor
* @param mayInterruptIfRunning {@code true} if the thread of {@link ListenableFuture} should be interrupted when
* {@link CompletableFuture} canceled (if the thread is known to the implementation).
* @return the completable future
* @param lf the underlying ListenableFuture
* @param executor the executor to use for ListenableFuture callback execution
* @param interruptLfWhenCancellationException whether to cancel ListenableFuture with interruption when CancellationException occurred
* @return the CompletableFuture adapter
* @see CompletableFuture#cancel(boolean)
*/
@Contract(pure = true)
public static <T> CompletableFuture<T> toCompletableFuture(ListenableFuture<T> lf, Executor executor, boolean mayInterruptIfRunning) {
public static <T> CompletableFuture<T> toCompletableFuture(
ListenableFuture<T> lf, Executor executor, boolean interruptLfWhenCancellationException) {
requireNonNull(lf, "listenableFuture is null");
requireNonNull(executor, "executor is null");

CompletableFuture<T> ret = new CompletableFuture<T>() {
@Override
Expand All @@ -68,7 +75,7 @@ public String toString() {
CompletableFutureUtils.peek(ret, (v, ex) -> {
ex = CompletableFutureUtils.unwrapCfException(ex);
if (ex instanceof CancellationException) {
lf.cancel(mayInterruptIfRunning);
lf.cancel(interruptLfWhenCancellationException);
}
});

Expand All @@ -90,10 +97,13 @@ public void onFailure(Throwable ex) {
* Converts input {@link ListenableFuture} to {@link Cffu}.
* <p>
* Callback from ListenableFuture is executed using cffuFactory's default executor.
* <p>
* More info see {@link #toCompletableFuture(ListenableFuture, Executor, boolean)}.
*/
@Contract(pure = true)
public static <T> Cffu<T> toCffu(ListenableFuture<T> lf, CffuFactory cffuFactory, boolean mayInterruptIfRunning) {
return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), mayInterruptIfRunning));
public static <T> Cffu<T> toCffu(
ListenableFuture<T> lf, CffuFactory cffuFactory, boolean interruptLfWhenCancellationException) {
return cffuFactory.toCffu(toCompletableFuture(lf, cffuFactory.defaultExecutor(), interruptLfWhenCancellationException));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.foldright.test_utils.TestThreadPoolManager;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -116,17 +117,59 @@ void test_lf2cf_cancellationAndPropagation() throws Exception {

@Test
void test_lf2cf_setCancellationExceptionToCf_cancellationAndPropagation() throws Exception {
final ListenableFuture<Integer> lf = SettableFuture.create();
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);
{
final ListenableFuture<Integer> lf = SettableFuture.create();
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);

assertTrue(cf.completeExceptionally(new CancellationException()));
waitForAllCfsToComplete(cf);
waitForAllLfsToComplete(lf);

assertTrue(lf.isCancelled());
assertThrowsExactly(CancellationException.class, lf::get);
assertTrue(cf.isCancelled());
assertThrowsExactly(CancellationException.class, cf::get);
}
{
final ListenableFuture<Integer> lf = SettableFuture.create();
final CompletableFuture<Integer> cf = toCompletableFuture(lf, executorService, true);

assertTrue(cf.completeExceptionally(new CancellationException()));
waitForAllCfsToComplete(cf);
waitForAllLfsToComplete(lf);
assertTrue(cf.completeExceptionally(new IllegalArgumentException()));
sleep(100);
waitForAllCfsToComplete(cf);

assertTrue(lf.isCancelled());
assertThrowsExactly(CancellationException.class, lf::get);
assertTrue(cf.isCancelled());
assertThrowsExactly(CancellationException.class, cf::get);
assertFalse(lf.isDone());
}
}

@Test
void test_lf2cf_dependentLf() throws Exception {
{
CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> n, executorService);
CompletableFuture<Integer> cfWrapperOfLf = cf.thenCompose(v -> {
ListenableFuture<Integer> lf = Futures.submit(() -> v + 1, executorService);
return toCompletableFuture(lf, MoreExecutors.directExecutor(), true);
});
assertEquals(n + 1, cfWrapperOfLf.join());
}
{
final ListenableFuture<Integer> lf = SettableFuture.create();
CompletableFuture<Integer> cf = new CompletableFuture<>();

CompletableFuture<Integer> cfWrapperOfLf = cf.thenCompose(v ->
toCompletableFuture(lf, MoreExecutors.directExecutor(), true));

cf.cancel(false);
sleep(20);

assertTrue(cf.isCancelled());
assertThrowsExactly(CancellationException.class, cf::get);
assertFalse(cfWrapperOfLf.isCancelled());
final ExecutionException ee = assertThrowsExactly(ExecutionException.class, cfWrapperOfLf::get);
assertInstanceOf(CancellationException.class, ee.getCause());

assertFalse(lf.isDone());
}
}

@Test
Expand Down Expand Up @@ -174,8 +217,8 @@ void showCase_CompletableFuture_cancellationAndPropagation() throws Exception {
// 🚫 CompletableFuture does NOT SUPPORT the propagation of cancellation
// the CancellationException is wrapped by CompletionException or ExecutionException
assertFalse(transform.isCancelled());
final ExecutionException ce = assertThrowsExactly(ExecutionException.class, transform::get);
assertInstanceOf(CancellationException.class, ce.getCause());
final ExecutionException ee = assertThrowsExactly(ExecutionException.class, transform::get);
assertInstanceOf(CancellationException.class, ee.getCause());
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import java.util.concurrent.Executor
---------------------
Implementation Note:
---------------------
the methods of this file MUST NOT be defined in CompletableFutureExtensions or CffuExtensions;
The methods of this file MUST NOT be defined in `CompletableFutureExtensions` or `CffuExtensions`;
Otherwise `NoClassDefFoundError` when loading CompletableFutureExtensions/CffuExtensions
if ListenableFuture class(`ClassNotFoundException` aka. guava dependency) absent.
if `ListenableFuture` class(`ClassNotFoundException` aka. `Guava` dependency) is absent.
*/

/**
Expand All @@ -26,17 +26,18 @@ import java.util.concurrent.Executor
* Callback from ListenableFuture is executed using the given executor,
* use {[MoreExecutors.directExecutor]} if you need skip executor switch.
*/
fun <T> ListenableFuture<T>.toCompletableFuture(executor: Executor, mayInterruptIfRunning: Boolean): CompletableFuture<T> =
ListenableFutureUtils.toCompletableFuture(this, executor, mayInterruptIfRunning)
fun <T> ListenableFuture<T>.toCompletableFuture(
executor: Executor, interruptLfWhenCancellationException: Boolean
): CompletableFuture<T> =
ListenableFutureUtils.toCompletableFuture(this, executor, interruptLfWhenCancellationException)

/**
* Converts input [ListenableFuture] to [Cffu].
*
* Callback from ListenableFuture is executed using cffuFactory's default executor.
*/
fun <T> ListenableFuture<T>.toCffu(cffuFactory: CffuFactory, mayInterruptIfRunning: Boolean): Cffu<T> {
return ListenableFutureUtils.toCffu(this, cffuFactory, mayInterruptIfRunning)
}
fun <T> ListenableFuture<T>.toCffu(cffuFactory: CffuFactory, interruptLfWhenCancellationException: Boolean): Cffu<T> =
ListenableFutureUtils.toCffu(this, cffuFactory, interruptLfWhenCancellationException)

/**
* Converts input [CompletableFuture] to [ListenableFuture].
Expand Down

0 comments on commit 4f20735

Please sign in to comment.