diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 20832561..e5a2fca7 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -18,7 +18,7 @@ import java.util.concurrent.*; import java.util.function.*; -import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD; +import static io.foldright.cffu.Delayer.atCfDelayerThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -1513,7 +1513,7 @@ public static > C cffuOrTimeout( requireNonNull(unit, "unit is null"); final C f = orTimeout(cf, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1589,7 +1589,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l requireNonNull(unit, "unit is null"); final C f = completeOnTimeout(cf, value, timeout, unit); - return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); + return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout); } /** @@ -1635,12 +1635,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { } @SuppressWarnings("unchecked") - private static > - C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) { - return (C) cf.handle((r, ex) -> condition.getAsBoolean() - ? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage) x) - : cf - ).thenCompose(x -> x); + public static > + C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) { + final CompletionStage f = (CompletionStage) cf; + + return (C) f.handle((v, ex) -> null).thenCompose(unused -> { + if (!atCfDelayerThread()) return f; + + CompletableFuture signal = new CompletableFuture<>(); + delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor) + .execute(() -> signal.complete(null)); + + return signal.thenCompose(v -> f); + }); } //# Advanced methods of CompletionStage @@ -2119,7 +2126,7 @@ private static class AsyncPoolHolder { /** * Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism. */ - private static final Executor ASYNC_POOL = _asyncPool0(); + static final Executor ASYNC_POOL = _asyncPool0(); private static Executor _asyncPool0() { if (IS_JAVA9_PLUS) return completedFuture(null).defaultExecutor(); @@ -2153,7 +2160,7 @@ private static Executor _asyncPool0() { final CompletableFuture cf = completedFuture(42); try { // `exceptionallyCompose` is the new method of CompletableFuture since java 12 - cf.exceptionallyCompose(v -> cf); + cf.exceptionallyCompose(ex -> cf); b = true; } catch (NoSuchMethodError e) { b = false; diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index b2c53ce7..08f3b43d 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -11,7 +11,6 @@ import java.util.concurrent.*; import java.util.function.BiConsumer; -import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -22,20 +21,13 @@ */ @SuppressWarnings("JavadocReference") final class Delayer { - private static final ScheduledThreadPoolExecutor delayer; - - static { - delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); - delayer.setRemoveOnCancelPolicy(true); - } - /** * @return a Future that can be used to cancel the delayed task * @see FutureCanceller * @see DelayedExecutor#execute(Runnable) */ static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { - return delayer.schedule(command, delay, unit); + return DelayerHolder.delayer.schedule(command, delay, unit); } /** @@ -55,7 +47,34 @@ public static ScheduledFuture delayToCompleteCf( return delay(new CfCompleter<>(cf, value), delay, unit); } - private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler"; + /** + * Checks whether execution is at the thread of CompletableFuture/Cffu delayer. + *

+ * The constant {@code "CompletableFutureDelayScheduler"} is defined + * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. + */ + @SuppressWarnings("JavadocReference") + static boolean atCfDelayerThread() { + final String name = Thread.currentThread().getName(); + return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); + } + + private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuBuiltinDelayScheduler"; + + /** + * Holds {@link #delayer} scheduler as field of static inner class for lazy loading(init only when needed). + *

+ * The lazy loading is need because {@link #atCfDelayerThread()} method of + * class {@link Delayer} is used on {@code Java 9+}. + */ + private static class DelayerHolder { + static final ScheduledThreadPoolExecutor delayer; + + static { + delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + } private static final class DaemonThreadFactory implements ThreadFactory { @Override @@ -67,18 +86,6 @@ public Thread newThread(@NonNull Runnable r) { } } - /** - * Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer. - *

- * The constant {@code "CompletableFutureDelayScheduler"} is defined - * at {@link CompletableFuture.Delayer.DaemonThreadFactory}. - */ - @SuppressWarnings("JavadocReference") - static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> { - final String name = Thread.currentThread().getName(); - return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name); - }; - private Delayer() { } } diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 72edd636..b6eee44f 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -18,6 +18,7 @@ import static io.foldright.cffu.CompletableFutureUtils.*; import static io.foldright.test_utils.TestUtils.*; +import static java.lang.Thread.currentThread; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.ForkJoinPool.commonPool; import static java.util.function.Function.identity; @@ -1031,6 +1032,52 @@ void test_timeout() throws Exception { assertEquals(n, cffuCompleteOnTimeout(completedFuture(n), anotherN, defaultExecutor(), 1, TimeUnit.MILLISECONDS).get()); } + @Test + void test_safeBehavior_orTimeout() { + final Thread testThread = currentThread(); + + assertEquals(n, orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertInstanceOf(TimeoutException.class, ex); + assertEquals("CompletableFutureDelayScheduler", currentThread().getName()); + return n; + }).join()); + assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + // FIXME: Not TimeoutException, not compatible with orTimeout method + assertInstanceOf(CompletionException.class, ex); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertNotSame(testThread, currentThread()); + return n; + }).join()); + assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + // FIXME: Not TimeoutException, not compatible with orTimeout method + assertInstanceOf(CompletionException.class, ex); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return n; + }).join()); + } + + @Test + void test_safeBehavior_completeOnTimeout() { + final Thread testThread = currentThread(); + + assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertEquals("CompletableFutureDelayScheduler", currentThread().getName()); + return r; + }).join()); + assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertNotSame(testThread, currentThread()); + return r; + }).join()); + assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return r; + }).join()); + } + @Test void test_exceptionallyCompose() throws Exception { CompletableFuture completed = completedFuture(n); diff --git a/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt b/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt index a2d0a5b2..2e76cf18 100644 --- a/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt +++ b/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt @@ -15,37 +15,39 @@ import kotlin.random.Random import kotlin.random.nextULong -const val THREAD_COUNT_OF_POOL = 5 +val THREAD_COUNT_OF_POOL = 4.coerceAtLeast(Runtime.getRuntime().availableProcessors() * 2) @JvmOverloads fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService { val counter = AtomicLong() - val prefix = "${threadNamePrefix}_${Random.nextULong()}-" + val prefix = "${threadNamePrefix}_${Random.nextULong()}" val executorService = if (!isForkJoin) ThreadPoolExecutor( - THREAD_COUNT_OF_POOL, THREAD_COUNT_OF_POOL, 1, TimeUnit.DAYS, - ArrayBlockingQueue(5000) + /* corePoolSize = */ THREAD_COUNT_OF_POOL, /* maximumPoolSize = */ THREAD_COUNT_OF_POOL, + /* keepAliveTime = */ 1, /* unit = */ TimeUnit.DAYS, + /* workQueue = */ ArrayBlockingQueue(5000) ) { r -> Thread(r).apply { - name = "$prefix${counter.getAndIncrement()}" + name = "${prefix}_${counter.getAndIncrement()}" isDaemon = true } } else ForkJoinPool( - THREAD_COUNT_OF_POOL, { fjPool -> + /* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool -> ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply { - name = "$prefix${counter.getAndIncrement()}" + name = "${prefix}_${counter.getAndIncrement()}" } - }, - null, false + }, /* handler = */ null, /* asyncMode = */ false ) return object : ExecutorService by executorService, ThreadPoolAcquaintance { override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix) override fun unwrap(): ExecutorService = executorService + + override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix" } } @@ -57,8 +59,11 @@ private interface ThreadPoolAcquaintance { fun unwrap(): ExecutorService } +fun isRunInExecutor(executor: Executor): Boolean = + executor.doesOwnThread(currentThread()) + fun assertRunInExecutor(executor: Executor) { - executor.doesOwnThread(currentThread()).shouldBeTrue() + isRunInExecutor(executor).shouldBeTrue() } fun assertNotRunInExecutor(executor: Executor) { diff --git a/cffu-core/src/test/resources/kotest.properties b/cffu-core/src/test/resources/kotest.properties index fcf0167b..cf1838ba 100644 --- a/cffu-core/src/test/resources/kotest.properties +++ b/cffu-core/src/test/resources/kotest.properties @@ -2,5 +2,5 @@ kotest.framework.classpath.scanning.config.disable=true kotest.framework.classpath.scanning.autoscan.disable=true # https://kotest.io/docs/framework/project-config.html#runtime-detection -kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig +#kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig