From d23405c227be29d493a927fe06d4556481e918df Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sun, 2 Jun 2024 18:20:18 +0800 Subject: [PATCH] =?UTF-8?q?fix=20the=20safe=20behavior=20of=20`timeout*`?= =?UTF-8?q?=20methods=20=E2=8F=B3=20=F0=9F=90=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/io/foldright/cffu/Cffu.java | 10 ++++ .../cffu/CompletableFutureUtils.java | 35 +++++++++---- .../foldright/cffu/DelayExecutionHelpers.java | 51 +++++++++++-------- .../kotlin/CompletableFutureExtensions.kt | 10 +++- 4 files changed, 73 insertions(+), 33 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/Cffu.java b/cffu-core/src/main/java/io/foldright/cffu/Cffu.java index 3f8637c6..ef94e3d0 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/Cffu.java +++ b/cffu-core/src/main/java/io/foldright/cffu/Cffu.java @@ -946,6 +946,11 @@ public Cffu exceptionallyAsync(Function fn, Executor * if not otherwise completed before the given timeout. *

* Uses {@link #defaultExecutor()} as {@code executorWhenTimeout}. + *

+ * CAUTION:
+ * If the wait timed out, the returned Cffu complete exceptionally with a CompletionException holding + * the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException} + * like {@link #unsafeOrTimeout(long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}. * * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter @@ -959,6 +964,11 @@ public Cffu orTimeout(long timeout, TimeUnit unit) { /** * Exceptionally completes this Cffu with a {@link TimeoutException} * if not otherwise completed before the given timeout. + *

+ * CAUTION:
+ * If the wait timed out, the returned Cffu complete exceptionally with a CompletionException holding + * the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException} + * like {@link #unsafeOrTimeout(long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}. * * @param executorWhenTimeout the async executor when triggered by timeout * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 2c1258ec..fe7d6a6a 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -18,7 +18,7 @@ import java.util.concurrent.*; import java.util.function.*; -import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD; +import static io.foldright.cffu.Delayer.atCfDelayerThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -1487,6 +1487,11 @@ C exceptionallyAsync(C cf, Function fn, Executor executo * if not otherwise completed before the given timeout. *

* Uses CompletableFuture's default asynchronous execution facility as {@code executorWhenTimeout}. + *

+ * CAUTION:
+ * If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException holding + * the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException} + * like {@link #orTimeout(CompletableFuture, long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}. * * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter @@ -1500,6 +1505,11 @@ public static > C cffuOrTimeout(C cf, long timeou /** * Exceptionally completes given CompletableFuture with a {@link TimeoutException} * if not otherwise completed before the given timeout. + *

+ * CAUTION:
+ * If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException holding + * the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException} + * like {@link #orTimeout(CompletableFuture, long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}. * * @param executorWhenTimeout the async executor when triggered by timeout * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit} @@ -1513,7 +1523,7 @@ public static > C cffuOrTimeout( requireNonNull(unit, "unit is null"); final C f = orTimeout(cf, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1589,7 +1599,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l requireNonNull(unit, "unit is null"); final C f = completeOnTimeout(cf, value, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1635,12 +1645,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { } @SuppressWarnings("unchecked") - private static > - C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) { - return (C) cf.handle((r, ex) -> condition.getAsBoolean() - ? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage) x) - : cf - ).thenCompose(x -> x); + public static > + C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) { + final CompletionStage f = (CompletionStage) cf; + + return (C) f.handle((v, ex) -> null).thenCompose(unused -> { + if (!atCfDelayerThread()) return f; + + CompletableFuture signal = new CompletableFuture<>(); + delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor) + .execute(() -> signal.complete(null)); + + return signal.thenCompose(v -> f); + }); } //# Advanced methods of CompletionStage diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index b2c53ce7..08f3b43d 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -11,7 +11,6 @@ import java.util.concurrent.*; import java.util.function.BiConsumer; -import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -22,20 +21,13 @@ */ @SuppressWarnings("JavadocReference") final class Delayer { - private static final ScheduledThreadPoolExecutor delayer; - - static { - delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); - delayer.setRemoveOnCancelPolicy(true); - } - /** * @return a Future that can be used to cancel the delayed task * @see FutureCanceller * @see DelayedExecutor#execute(Runnable) */ static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { - return delayer.schedule(command, delay, unit); + return DelayerHolder.delayer.schedule(command, delay, unit); } /** @@ -55,7 +47,34 @@ public static ScheduledFuture delayToCompleteCf( return delay(new CfCompleter<>(cf, value), delay, unit); } - private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler"; + /** + * Checks whether execution is at the thread of CompletableFuture/Cffu delayer. + *

+ * The constant {@code "CompletableFutureDelayScheduler"} is defined + * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. + */ + @SuppressWarnings("JavadocReference") + static boolean atCfDelayerThread() { + final String name = Thread.currentThread().getName(); + return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); + } + + private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuBuiltinDelayScheduler"; + + /** + * Holds {@link #delayer} scheduler as field of static inner class for lazy loading(init only when needed). + *

+ * The lazy loading is need because {@link #atCfDelayerThread()} method of + * class {@link Delayer} is used on {@code Java 9+}. + */ + private static class DelayerHolder { + static final ScheduledThreadPoolExecutor delayer; + + static { + delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + } private static final class DaemonThreadFactory implements ThreadFactory { @Override @@ -67,18 +86,6 @@ public Thread newThread(@NonNull Runnable r) { } } - /** - * Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer. - *

- * The constant {@code "CompletableFutureDelayScheduler"} is defined - * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. - */ - @SuppressWarnings("JavadocReference") - static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> { - final String name = Thread.currentThread().getName(); - return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); - }; - private Delayer() { } } diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt index 8a6f1fd1..cafaad70 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt @@ -771,6 +771,9 @@ fun > C.exceptionallyAsync(fn: Function> C.cffuOrTimeout(timeout: Long, unit: TimeU * Exceptionally completes this CompletableFuture with a TimeoutException * if not otherwise completed before the given timeout. * + * **CAUTION:** If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException + * holding the [TimeoutException] as its cause; NOT a direct [TimeoutException] like [orTimeout]/[CompletableFuture.orTimeout]. + * * @param executorWhenTimeout the async executor when triggered by timeout * @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of `unit` * @param unit a `TimeUnit` determining how to interpret the `timeout` parameter @@ -930,8 +936,8 @@ fun > C.exceptionallyComposeAsync( * .join(); * ``` * - * **CAUTION:** if the wait timed out, this method throws an (unchecked) CompletionException with the TimeoutException as its cause; - * NOT throws a (checked) TimeoutException like [CompletableFuture.get]. + * **CAUTION:** If the wait timed out, this method throws an (unchecked) CompletionException + * with the TimeoutException as its cause; NOT throws a (checked) TimeoutException like [CompletableFuture.get]. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument