From 5b8ed95a81e3104617df9ac20ca8040de74ebc7f Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sun, 2 Jun 2024 18:20:18 +0800 Subject: [PATCH] =?UTF-8?q?fix=20the=20safe=20behavior=20of=20`timeout*`?= =?UTF-8?q?=20methods=20=E2=8F=B3=20=F0=9F=90=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtils.java | 25 +++++---- .../foldright/cffu/DelayExecutionHelpers.java | 51 +++++++++++-------- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 2c1258ec..faf24c38 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -18,7 +18,7 @@ import java.util.concurrent.*; import java.util.function.*; -import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD; +import static io.foldright.cffu.Delayer.atCfDelayerThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -1513,7 +1513,7 @@ public static > C cffuOrTimeout( requireNonNull(unit, "unit is null"); final C f = orTimeout(cf, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1589,7 +1589,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l requireNonNull(unit, "unit is null"); final C f = completeOnTimeout(cf, value, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1635,12 +1635,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { } @SuppressWarnings("unchecked") - private static > - C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) { - return (C) cf.handle((r, ex) -> condition.getAsBoolean() - ? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage) x) - : cf - ).thenCompose(x -> x); + public static > + C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) { + final CompletionStage f = (CompletionStage) cf; + + return (C) f.handle((v, ex) -> null).thenCompose(unused -> { + if (!atCfDelayerThread()) return f; + + CompletableFuture signal = new CompletableFuture<>(); + delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor) + .execute(() -> signal.complete(null)); + + return signal.thenCompose(v -> f); + }); } //# Advanced methods of CompletionStage diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index b2c53ce7..08f3b43d 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -11,7 +11,6 @@ import java.util.concurrent.*; import java.util.function.BiConsumer; -import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -22,20 +21,13 @@ */ @SuppressWarnings("JavadocReference") final class Delayer { - private static final ScheduledThreadPoolExecutor delayer; - - static { - delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); - delayer.setRemoveOnCancelPolicy(true); - } - /** * @return a Future that can be used to cancel the delayed task * @see FutureCanceller * @see DelayedExecutor#execute(Runnable) */ static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { - return delayer.schedule(command, delay, unit); + return DelayerHolder.delayer.schedule(command, delay, unit); } /** @@ -55,7 +47,34 @@ public static ScheduledFuture delayToCompleteCf( return delay(new CfCompleter<>(cf, value), delay, unit); } - private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler"; + /** + * Checks whether execution is at the thread of CompletableFuture/Cffu delayer. + *

+ * The constant {@code "CompletableFutureDelayScheduler"} is defined + * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. + */ + @SuppressWarnings("JavadocReference") + static boolean atCfDelayerThread() { + final String name = Thread.currentThread().getName(); + return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); + } + + private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuBuiltinDelayScheduler"; + + /** + * Holds {@link #delayer} scheduler as field of static inner class for lazy loading(init only when needed). + *

+ * The lazy loading is need because {@link #atCfDelayerThread()} method of + * class {@link Delayer} is used on {@code Java 9+}. + */ + private static class DelayerHolder { + static final ScheduledThreadPoolExecutor delayer; + + static { + delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + } private static final class DaemonThreadFactory implements ThreadFactory { @Override @@ -67,18 +86,6 @@ public Thread newThread(@NonNull Runnable r) { } } - /** - * Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer. - *

- * The constant {@code "CompletableFutureDelayScheduler"} is defined - * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. - */ - @SuppressWarnings("JavadocReference") - static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> { - final String name = Thread.currentThread().getName(); - return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); - }; - private Delayer() { } }