From 4fb3f0d1b867842bf30666cb1c42e3e2fbf10fa3 Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sat, 18 May 2024 22:52:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20methods=20`orTimeout`/`comp?= =?UTF-8?q?leteOnTimeout`/`hopAsyncIf`=20methods=20=E2=8F=B3=20=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtils.java | 49 +++++++++++++++++++ .../foldright/cffu/DelayExecutionHelpers.java | 17 ++++++- .../cffu/CompletableFutureUtilsTest.java | 26 +++++----- 3 files changed, 79 insertions(+), 13 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 115e7f78..4ac1f00d 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -16,6 +16,7 @@ import java.util.concurrent.*; import java.util.function.*; +import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD; import static java.util.Objects.requireNonNull; @@ -1343,6 +1344,10 @@ C exceptionallyAsync(C cf, Function fn, Executor executo /** * Exceptionally completes given CompletableFuture with a {@link TimeoutException} * if not otherwise completed before the given timeout. + *

+ * Strong recommend using method {@link #orTimeout(CompletableFuture, long, TimeUnit, Executor)}, + * unless all dependent CompletableFutures is executed async + * (aka. the dependent CompletableFutures is created by async methods). * * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter @@ -1362,13 +1367,33 @@ public static > C orTimeout(C cf, long timeout, T return cf; } + /** + * Exceptionally completes given CompletableFuture with a {@link TimeoutException} + * if not otherwise completed before the given timeout. + * + * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param executorWhenTimeout the async executor when triggered by timeout + * @return a new CompletableFuture + */ + public static > C orTimeout( + C cf, long timeout, TimeUnit unit, Executor executorWhenTimeout) { + final C f = orTimeout(cf, timeout, unit); + return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + } + /** * Completes given CompletableFuture with the given value if not otherwise completed before the given timeout. + *

+ * Strong recommend using method {@link #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor)}, + * unless all dependent CompletableFutures is executed async + * (aka. the dependent CompletableFutures is created by async methods). * * @param value the value to use upon timeout * @param timeout how long to wait before completing normally with the given value, in units of {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter * @return the given CompletableFuture + * @see #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor) */ public static > C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { @@ -1385,6 +1410,30 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { return cf; } + /** + * Completes given CompletableFuture with the given value if not otherwise completed before the given timeout. + * + * @param value the value to use upon timeout + * @param timeout how long to wait before completing normally with the given value, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param executorWhenTimeout the async executor when triggered by timeout + * @return a new CompletableFuture + */ + public static > + C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit, Executor executorWhenTimeout) { + final C f = completeOnTimeout(cf, value, timeout, unit); + return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + } + + @SuppressWarnings("unchecked") + private static > + C hopAsyncIf(C cf, BooleanSupplier condition, Executor executor) { + return (C) cf.handle((r, ex) -> condition.getAsBoolean() + ? cf.handleAsync((r1, ex1) -> cf, executor).thenCompose(x -> (CompletionStage) x) + : cf + ).thenCompose(x -> (CompletionStage) x); + } + //# Advanced methods of CompletionStage /** diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index 054e6fff..b2c53ce7 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -11,6 +11,7 @@ import java.util.concurrent.*; import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -54,16 +55,30 @@ public static ScheduledFuture delayToCompleteCf( return delay(new CfCompleter<>(cf, value), delay, unit); } + private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler"; + private static final class DaemonThreadFactory implements ThreadFactory { @Override public Thread newThread(@NonNull Runnable r) { Thread t = new Thread(r); t.setDaemon(true); - t.setName("CffuDelayScheduler"); + t.setName(THREAD_NAME_OF_CFFU_DELAY_SCHEDULER); return t; } } + /** + * Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer. + *

+ * The constant {@code "CompletableFutureDelayScheduler"} is defined + * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. + */ + @SuppressWarnings("JavadocReference") + static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> { + final String name = Thread.currentThread().getName(); + return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); + }; + private Delayer() { } } diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index ae8799bf..a28f9ed1 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -943,21 +943,23 @@ void test_exceptionallyAsync() throws Exception { @Test void test_timeout() throws Exception { - CompletableFuture cf = createIncompleteFuture(); - try { - orTimeout(cf, 1, TimeUnit.MILLISECONDS).get(); - } catch (ExecutionException expected) { - assertEquals(TimeoutException.class, expected.getCause().getClass()); - } + assertInstanceOf(TimeoutException.class, + assertThrows(ExecutionException.class, () -> + orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).get() + ).getCause()); + assertInstanceOf(TimeoutException.class, + assertThrows(ExecutionException.class, () -> + orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS, defaultExecutor()).get() + ).getCause()); - cf = createIncompleteFuture(); - assertEquals(n, completeOnTimeout(cf, n, 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, orTimeout(completedFuture(n), 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, orTimeout(completedFuture(n), 1, TimeUnit.MILLISECONDS, defaultExecutor()).get()); - cf = completedFuture(n); - assertEquals(n, completeOnTimeout(cf, anotherN, 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS, defaultExecutor()).get()); - cf = completedFuture(n); - assertEquals(n, orTimeout(cf, 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, completeOnTimeout(completedFuture(n), anotherN, 1, TimeUnit.MILLISECONDS).get()); + assertEquals(n, completeOnTimeout(completedFuture(n), anotherN, 1, TimeUnit.MILLISECONDS, defaultExecutor()).get()); } @Test