From 3fe44786b39044dc12935a77d985b9392b7c5613 Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sat, 25 May 2024 20:22:54 +0800 Subject: [PATCH] =?UTF-8?q?test:=20add=20test=20cases=20for=20the=20safe?= =?UTF-8?q?=20behavior=20of=20`timeout*`=20methods=20=E2=8F=B3=20test=20sh?= =?UTF-8?q?ould=20FAIL=20=F0=9F=92=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtilsTest.java | 53 +++++++++++++++++++ .../test_utils/TestThreadPoolManager.kt | 25 +++++---- 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 72edd636..61f41405 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -18,6 +18,7 @@ import static io.foldright.cffu.CompletableFutureUtils.*; import static io.foldright.test_utils.TestUtils.*; +import static java.lang.Thread.currentThread; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.ForkJoinPool.commonPool; import static java.util.function.Function.identity; @@ -1031,6 +1032,58 @@ void test_timeout() throws Exception { assertEquals(n, cffuCompleteOnTimeout(completedFuture(n), anotherN, defaultExecutor(), 1, TimeUnit.MILLISECONDS).get()); } + @Test + void test_safeBehavior_orTimeout() { + final Thread testThread = currentThread(); + + assertEquals(n, orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertInstanceOf(TimeoutException.class, ex); + assertTrue(Delayer.atCfDelayerThread()); + return n; + }).join()); + + assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + // FIXME: Not TimeoutException, not compatible with orTimeout method + assertInstanceOf(CompletionException.class, ex); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertFalse(Delayer.atCfDelayerThread()); + assertNotSame(testThread, currentThread()); + return n; + }).join()); + assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + // FIXME: Not TimeoutException, not compatible with orTimeout method + assertInstanceOf(CompletionException.class, ex); + assertInstanceOf(TimeoutException.class, ex.getCause()); + assertFalse(Delayer.atCfDelayerThread()); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return n; + }).join()); + } + + @Test + void test_safeBehavior_completeOnTimeout() { + final Thread testThread = currentThread(); + + assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertTrue(Delayer.atCfDelayerThread()); + return r; + }).join()); + + assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertFalse(Delayer.atCfDelayerThread()); + assertNotSame(testThread, currentThread()); + return r; + }).join()); + assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertFalse(Delayer.atCfDelayerThread()); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return r; + }).join()); + } + @Test void test_exceptionallyCompose() throws Exception { CompletableFuture completed = completedFuture(n); diff --git a/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt b/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt index a2d0a5b2..4c49c13c 100644 --- a/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt +++ b/cffu-core/src/test/java/io/foldright/test_utils/TestThreadPoolManager.kt @@ -15,37 +15,39 @@ import kotlin.random.Random import kotlin.random.nextULong -const val THREAD_COUNT_OF_POOL = 5 +val THREAD_COUNT_OF_POOL: Int = (Runtime.getRuntime().availableProcessors() * 2).coerceAtLeast(4).coerceAtMost(15) @JvmOverloads fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService { val counter = AtomicLong() - val prefix = "${threadNamePrefix}_${Random.nextULong()}-" + val prefix = "${threadNamePrefix}_${Random.nextULong()}" val executorService = if (!isForkJoin) ThreadPoolExecutor( - THREAD_COUNT_OF_POOL, THREAD_COUNT_OF_POOL, 1, TimeUnit.DAYS, - ArrayBlockingQueue(5000) + /* corePoolSize = */ THREAD_COUNT_OF_POOL, /* maximumPoolSize = */ THREAD_COUNT_OF_POOL, + /* keepAliveTime = */ 1, /* unit = */ TimeUnit.DAYS, + /* workQueue = */ ArrayBlockingQueue(5000) ) { r -> Thread(r).apply { - name = "$prefix${counter.getAndIncrement()}" + name = "${prefix}_${counter.getAndIncrement()}" isDaemon = true } } else ForkJoinPool( - THREAD_COUNT_OF_POOL, { fjPool -> + /* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool -> ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply { - name = "$prefix${counter.getAndIncrement()}" + name = "${prefix}_${counter.getAndIncrement()}" } - }, - null, false + }, /* handler = */ null, /* asyncMode = */ false ) return object : ExecutorService by executorService, ThreadPoolAcquaintance { override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix) override fun unwrap(): ExecutorService = executorService + + override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix" } } @@ -57,8 +59,11 @@ private interface ThreadPoolAcquaintance { fun unwrap(): ExecutorService } +fun isRunInExecutor(executor: Executor): Boolean = + executor.doesOwnThread(currentThread()) + fun assertRunInExecutor(executor: Executor) { - executor.doesOwnThread(currentThread()).shouldBeTrue() + isRunInExecutor(executor).shouldBeTrue() } fun assertNotRunInExecutor(executor: Executor) {