Skip to content

Commit

Permalink
feat: implement new methods orTimeout/completeOnTimeout methods a…
Browse files Browse the repository at this point in the history
…nd `hopAsyncIf` help method ⏳ ✨
  • Loading branch information
oldratlee committed May 24, 2024
1 parent a476fdb commit 9e68e0b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD;
import static java.util.Objects.requireNonNull;


Expand Down Expand Up @@ -1343,6 +1344,10 @@ C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executo
/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>Strong recommend</strong> using method {@link #orTimeout(CompletableFuture, long, TimeUnit, Executor)},
* unless all dependent CompletableFutures is executed async
* (aka. the dependent CompletableFutures is created by async methods).
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
Expand All @@ -1362,13 +1367,33 @@ public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, T
return cf;
}

/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param executorWhenTimeout the async executor when triggered by timeout
* @return a new CompletableFuture
*/
public static <C extends CompletableFuture<?>> C orTimeout(
C cf, long timeout, TimeUnit unit, Executor executorWhenTimeout) {
final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
* <p>
* <strong>Strong recommend</strong> using method {@link #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor)},
* unless all dependent CompletableFutures is executed async
* (aka. the dependent CompletableFutures is created by async methods).
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the given CompletableFuture
* @see #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor)
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
Expand All @@ -1385,6 +1410,30 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
return cf;
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param executorWhenTimeout the async executor when triggered by timeout
* @return a new CompletableFuture
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit, Executor executorWhenTimeout) {
final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

@SuppressWarnings("unchecked")
private static <T, C extends CompletionStage<? extends T>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor executor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, executor).thenCompose(x -> (CompletionStage<T>) x)
: cf
).thenCompose(x -> (CompletionStage<T>) x);
}

//# Advanced methods of CompletionStage

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;


Expand Down Expand Up @@ -54,16 +55,30 @@ public static <T> ScheduledFuture<?> delayToCompleteCf(
return delay(new CfCompleter<>(cf, value), delay, unit);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler";

private static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CffuDelayScheduler");
t.setName(THREAD_NAME_OF_CFFU_DELAY_SCHEDULER);
return t;
}
}

/**
* Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
};

private Delayer() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,21 +952,23 @@ void test_exceptionallyAsync() throws Exception {

@Test
void test_timeout() throws Exception {
CompletableFuture<Integer> cf = createIncompleteFuture();
try {
orTimeout(cf, 1, TimeUnit.MILLISECONDS).get();
} catch (ExecutionException expected) {
assertEquals(TimeoutException.class, expected.getCause().getClass());
}
assertInstanceOf(TimeoutException.class,
assertThrows(ExecutionException.class, () ->
orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).get()
).getCause());
assertInstanceOf(TimeoutException.class,
assertThrows(ExecutionException.class, () ->
orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS, defaultExecutor()).get()
).getCause());

cf = createIncompleteFuture();
assertEquals(n, completeOnTimeout(cf, n, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, orTimeout(completedFuture(n), 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, orTimeout(completedFuture(n), 1, TimeUnit.MILLISECONDS, defaultExecutor()).get());

cf = completedFuture(n);
assertEquals(n, completeOnTimeout(cf, anotherN, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS, defaultExecutor()).get());

cf = completedFuture(n);
assertEquals(n, orTimeout(cf, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, completeOnTimeout(completedFuture(n), anotherN, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, completeOnTimeout(completedFuture(n), anotherN, 1, TimeUnit.MILLISECONDS, defaultExecutor()).get());
}

@Test
Expand Down

0 comments on commit 9e68e0b

Please sign in to comment.