diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 5bee67ecd..93391fd93 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -1665,10 +1665,13 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { private static > C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) { CompletableFuture ret = newIncompleteFuture(cf); - cf.whenComplete((v, ex) -> { + cf.handle((v, ex) -> { if (!atCfDelayerThread()) completeCf(ret, v, ex); else delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor) .execute(() -> completeCf(ret, v, ex)); + // use `cf.handle` method(instead of `whenComplete`) and return null, + // in order to prevent below `exceptionally` reporting the handled argument exception in this action + return null; }).exceptionally(ex -> reportException("Exception occurred in the input cf whenComplete of hop executor:", ex)); return (C) ret; diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 9622865f1..2a35e59c6 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -11,10 +11,13 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static io.foldright.cffu.CompletableFutureUtils.*; import static io.foldright.test_utils.TestUtils.*; @@ -964,49 +967,63 @@ void test_timeout() throws Exception { @Test void test_safeBehavior_orTimeout() { final Thread testThread = currentThread(); - - assertEquals(n, orTimeout(createIncompleteFuture(), 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertInstanceOf(TimeoutException.class, ex); - assertTrue(Delayer.atCfDelayerThread()); - return n; - }).join()); - - assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertInstanceOf(TimeoutException.class, ex); - assertFalse(Delayer.atCfDelayerThread()); - assertNotSame(testThread, currentThread()); - return n; - }).join()); - assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertInstanceOf(TimeoutException.class, ex); - assertFalse(Delayer.atCfDelayerThread()); - assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); - return n; - }).join()); + final List results = IntStream.range(0, 10).boxed().collect(Collectors.toList()); + + assertEquals(results, results.stream().map(i -> + orTimeout(createIncompleteFuture(), 100, TimeUnit.MILLISECONDS).handle((r1, ex1) -> { + assertInstanceOf(TimeoutException.class, ex1); + assertTrue(Delayer.atCfDelayerThread()); + return i; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); + + assertEquals(results, results.stream().map(i -> + cffuOrTimeout(createIncompleteFuture(), 100, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertInstanceOf(TimeoutException.class, ex); + assertFalse(Delayer.atCfDelayerThread()); + assertNotSame(testThread, currentThread()); + return i; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); + assertEquals(results, results.stream().map(i -> + cffuOrTimeout(createIncompleteFuture(), executorService, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertInstanceOf(TimeoutException.class, ex); + assertFalse(Delayer.atCfDelayerThread()); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return i; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); } @Test void test_safeBehavior_completeOnTimeout() { final Thread testThread = currentThread(); - - assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertNull(ex); - assertTrue(Delayer.atCfDelayerThread()); - return r; - }).join()); - - assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertNull(ex); - assertFalse(Delayer.atCfDelayerThread()); - assertNotSame(testThread, currentThread()); - return r; - }).join()); - assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> { - assertNull(ex); - assertFalse(Delayer.atCfDelayerThread()); - assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); - return r; - }).join()); + final List results = IntStream.range(0, 10).boxed().collect(Collectors.toList()); + + assertEquals(results, results.stream().map(i -> + completeOnTimeout(createIncompleteFuture(), i, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertTrue(Delayer.atCfDelayerThread()); + return r; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); + + assertEquals(results, results.stream().map(i -> + cffuCompleteOnTimeout(createIncompleteFuture(), i, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertFalse(Delayer.atCfDelayerThread()); + assertNotSame(testThread, currentThread()); + return r; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); + assertEquals(results, results.stream().map(i -> + cffuCompleteOnTimeout(createIncompleteFuture(), i, executorService, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> { + assertNull(ex); + assertFalse(Delayer.atCfDelayerThread()); + assertTrue(TestThreadPoolManager.isRunInExecutor(executorService)); + return r; + }) + ).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList())); } @Test