Skip to content

Commit

Permalink
refactor: use cf.handle method(instead of whenComplete) and retur…
Browse files Browse the repository at this point in the history
…n null, in order to prevent reporting the handled argument exception; improve test cases of safe `timeout*` methods 💥
  • Loading branch information
oldratlee committed Jun 12, 2024
1 parent 11d7c87 commit 6d2ebbd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1665,10 +1665,13 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
private static <C extends CompletableFuture<?>> C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) {
CompletableFuture<Object> ret = newIncompleteFuture(cf);

cf.whenComplete((v, ex) -> {
cf.handle((v, ex) -> {
if (!atCfDelayerThread()) completeCf(ret, v, ex);
else delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor)
.execute(() -> completeCf(ret, v, ex));
// use `cf.handle` method(instead of `whenComplete`) and return null,
// in order to prevent below `exceptionally` reporting the handled argument exception in this action
return null;
}).exceptionally(ex -> reportException("Exception occurred in the input cf whenComplete of hop executor:", ex));

return (C) ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.foldright.cffu.CompletableFutureUtils.*;
import static io.foldright.test_utils.TestUtils.*;
Expand Down Expand Up @@ -964,49 +967,63 @@ void test_timeout() throws Exception {
@Test
void test_safeBehavior_orTimeout() {
final Thread testThread = currentThread();

assertEquals(n, orTimeout(createIncompleteFuture(), 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertTrue(Delayer.atCfDelayerThread());
return n;
}).join());

assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return n;
}).join());
assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return n;
}).join());
final List<Integer> results = IntStream.range(0, 10).boxed().collect(Collectors.toList());

assertEquals(results, results.stream().map(i ->
orTimeout(createIncompleteFuture(), 100, TimeUnit.MILLISECONDS).handle((r1, ex1) -> {
assertInstanceOf(TimeoutException.class, ex1);
assertTrue(Delayer.atCfDelayerThread());
return i;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));

assertEquals(results, results.stream().map(i ->
cffuOrTimeout(createIncompleteFuture(), 100, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return i;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
assertEquals(results, results.stream().map(i ->
cffuOrTimeout(createIncompleteFuture(), executorService, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return i;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

@Test
void test_safeBehavior_completeOnTimeout() {
final Thread testThread = currentThread();

assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertTrue(Delayer.atCfDelayerThread());
return r;
}).join());

assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return r;
}).join());
assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 5, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return r;
}).join());
final List<Integer> results = IntStream.range(0, 10).boxed().collect(Collectors.toList());

assertEquals(results, results.stream().map(i ->
completeOnTimeout(createIncompleteFuture(), i, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertTrue(Delayer.atCfDelayerThread());
return r;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));

assertEquals(results, results.stream().map(i ->
cffuCompleteOnTimeout(createIncompleteFuture(), i, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return r;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
assertEquals(results, results.stream().map(i ->
cffuCompleteOnTimeout(createIncompleteFuture(), i, executorService, 100, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return r;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

@Test
Expand Down

0 comments on commit 6d2ebbd

Please sign in to comment.