Skip to content

Commit

Permalink
feat: implement new methods cffuOrTimeout/cffuCompleteOnTimeout m…
Browse files Browse the repository at this point in the history
…ethods ⏳ ☘️ add internal helper method `hopAsyncIf` ✨
  • Loading branch information
oldratlee committed May 25, 2024
1 parent 99a0320 commit 335b90c
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 28 deletions.
103 changes: 95 additions & 8 deletions cffu-core/src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -1124,32 +1124,119 @@ public Cffu<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor
/**
* Exceptionally completes this Cffu with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* Uses {@link #defaultExecutor()} as {@code executorWhenTimeout}.
*
* @param timeout how long to wait before completing exceptionally
* with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return this Cffu
* @see CffuFactory#delayedExecutor(long, TimeUnit)
* @return the new Cffu
* @see #orTimeout(Executor, long, TimeUnit)
*/
public Cffu<T> orTimeout(long timeout, TimeUnit unit) {
return orTimeout(fac.defaultExecutor(), timeout, unit);
}

/**
* Exceptionally completes this Cffu with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
*
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing exceptionally
* with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the new Cffu
*/
public Cffu<T> orTimeout(Executor executorWhenTimeout, long timeout, TimeUnit unit) {
checkMinimalStage();
CompletableFutureUtils.orTimeout(cf, timeout, unit);
return this;
return reset0(CompletableFutureUtils.cffuOrTimeout(cf, executorWhenTimeout, timeout, unit));
}

/**
* Exceptionally completes given Cffu with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong> This method is <strong>UNSAFE</strong>!
* <p>
* When triggered by timeout, the subsequent non-async actions of the dependent cfs
* are performed in the <strong>SINGLE thread builtin executor</strong>
* of CompletableFuture for delay executions (including timeout function).
* So the long-running subsequent non-async actions lead to the CompletableFuture dysfunction
* (including delay execution and timeout).
* <p>
* <strong>Strong recommend</strong> using the safe methods {@link #orTimeout(long, TimeUnit)}
* instead of this method.
* <p>
* Unless all subsequent actions of dependent cfs is ensured executing async
* (aka. the dependent cfs is created by async methods), using this method
* is one less thread switch of task execution when triggered by timeout.
*
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return this Cffu
* @see #orTimeout(long, TimeUnit)
*/
public Cffu<T> unsafeOrTimeout(long timeout, TimeUnit unit) {
checkMinimalStage();
return reset0(CompletableFutureUtils.orTimeout(cf, timeout, unit));
}

/**
* Completes this Cffu with the given value if not otherwise completed before the given timeout.
* <p>
* Uses {@link #defaultExecutor()} as {@code executorWhenTimeout}.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return this Cffu
* @see CffuFactory#delayedExecutor(long, TimeUnit)
* @return the new Cffu
* @see #completeOnTimeout(Object, Executor, long, TimeUnit)
*/
public Cffu<T> completeOnTimeout(@Nullable T value, long timeout, TimeUnit unit) {
return completeOnTimeout(value, fac.defaultExecutor(), timeout, unit);
}

/**
* Completes this Cffu with the given value if not otherwise completed before the given timeout.
*
* @param value the value to use upon timeout
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the new Cffu
*/
public Cffu<T> completeOnTimeout(@Nullable T value, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
checkMinimalStage();
CompletableFutureUtils.completeOnTimeout(cf, value, timeout, unit);
return this;
return reset0(CompletableFutureUtils.cffuCompleteOnTimeout(cf, value, executorWhenTimeout, timeout, unit));
}

/**
* Completes given Cffu with the given value if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong> This method is <strong>UNSAFE</strong>!
* <p>
* When triggered by timeout, the subsequent non-async actions of the dependent cfs
* are performed in the <strong>SINGLE thread builtin executor</strong>
* of CompletableFuture for delay executions (including timeout function).
* So the long-running subsequent non-async actions lead to the CompletableFuture dysfunction
* (including delay execution and timeout).
* <p>
* <strong>Strong recommend</strong> using the safe methods {@link #completeOnTimeout(Object, long, TimeUnit)}
* instead of this method.
* <p>
* Unless all subsequent actions of dependent cfs is ensured executing async
* (aka. the dependent cfs is created by async methods), using this method
* is one less thread switch of task execution when triggered by timeout.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return this Cffu
* @see #completeOnTimeout(Object, long, TimeUnit)
*/
public Cffu<T> unsafeCompleteOnTimeout(@Nullable T value, long timeout, TimeUnit unit) {
checkMinimalStage();
return reset0(CompletableFutureUtils.completeOnTimeout(cf, value, timeout, unit));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
106 changes: 106 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD;
import static java.util.Objects.requireNonNull;


Expand Down Expand Up @@ -1356,10 +1357,58 @@ C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executo
/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* Uses CompletableFuture's default asynchronous execution facility as {@code executorWhenTimeout}.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture
* @see #cffuOrTimeout(CompletableFuture, Executor, long, TimeUnit)
*/
public static <C extends CompletableFuture<?>> C cffuOrTimeout(
C cf, long timeout, TimeUnit unit) {
return cffuOrTimeout(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit);
}

/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
*
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture
*/
public static <C extends CompletableFuture<?>> C cffuOrTimeout(
C cf, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong> This method and {@link CompletableFuture#orTimeout(long, TimeUnit)}
* is <strong>UNSAFE</strong>!
* <p>
* When triggered by timeout, the subsequent non-async actions of the dependent CompletableFutures
* are performed in the <strong>SINGLE thread builtin executor</strong>
* of CompletableFuture for delay executions (including timeout function).
* So the long-running subsequent non-async actions lead to the CompletableFuture dysfunction
* (including delay execution and timeout).
* <p>
* <strong>Strong recommend</strong> using the safe methods {@link #cffuOrTimeout(CompletableFuture, long, TimeUnit)}
* instead of this method and {@link CompletableFuture#orTimeout(long, TimeUnit)}.
* <p>
* Unless all subsequent actions of dependent CompletableFutures is ensured executing async
* (aka. the dependent CompletableFutures is created by async methods), using this method and {@link CompletableFuture#orTimeout(long, TimeUnit)}
* is one less thread switch of task execution when triggered by timeout.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the given CompletableFuture
* @see #cffuOrTimeout(CompletableFuture, long, TimeUnit)
*/
public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
Expand All @@ -1375,13 +1424,61 @@ public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, T
return cf;
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
* <p>
* Uses CompletableFuture's default asynchronous execution facility as {@code executorWhenTimeout}.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture
* @see #cffuCompleteOnTimeout(CompletableFuture, Object, Executor, long, TimeUnit)
*/
public static <T, C extends CompletableFuture<? super T>>
C cffuCompleteOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
return cffuCompleteOnTimeout(cf, value, AsyncPoolHolder.ASYNC_POOL, timeout, unit);
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
*
* @param value the value to use upon timeout
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return a new CompletableFuture
*/
public static <T, C extends CompletableFuture<? super T>>
C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong> This method and {@link CompletableFuture#completeOnTimeout(Object, long, TimeUnit)}
* is <strong>UNSAFE</strong>!
* <p>
* When triggered by timeout, the subsequent non-async actions of the dependent CompletableFutures
* are performed in the <strong>SINGLE thread builtin executor</strong>
* of CompletableFuture for delay executions (including timeout function).
* So the long-running subsequent non-async actions lead to the CompletableFuture dysfunction
* (including delay execution and timeout).
* <p>
* <strong>Strong recommend</strong> using the safe methods {@link #cffuCompleteOnTimeout(CompletableFuture, Object, long, TimeUnit)}
* instead of this method and {@link CompletableFuture#completeOnTimeout(Object, long, TimeUnit)}.
* <p>
* Unless all subsequent actions of dependent CompletableFutures is ensured executing async
* (aka. the dependent CompletableFutures is created by async methods), using this method and {@link CompletableFuture#completeOnTimeout(Object, long, TimeUnit)}
* is one less thread switch of task execution when triggered by timeout.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the given CompletableFuture
* @see #cffuCompleteOnTimeout(CompletableFuture, Object, long, TimeUnit)
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
Expand All @@ -1398,6 +1495,15 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
return cf;
}

@SuppressWarnings("unchecked")
private static <T, C extends CompletionStage<? extends T>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor ayncExecutor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, ayncExecutor).thenCompose(x -> (CompletionStage<T>) x)
: cf
).thenCompose(x -> (CompletionStage<T>) x);
}

//# Advanced methods of CompletionStage

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;


Expand Down Expand Up @@ -54,16 +55,30 @@ public static <T> ScheduledFuture<?> delayToCompleteCf(
return delay(new CfCompleter<>(cf, value), delay, unit);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler";

private static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CffuDelayScheduler");
t.setName(THREAD_NAME_OF_CFFU_DELAY_SCHEDULER);
return t;
}
}

/**
* Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
};

private Delayer() {
}
}
Expand Down
27 changes: 26 additions & 1 deletion cffu-core/src/test/java/io/foldright/cffu/CffuTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,34 @@ void test_either_success() throws Exception {
////////////////////////////////////////
// timeout control
//
// tested in CffuApiCompatibilityTest
// also tested in CffuApiCompatibilityTest
////////////////////////////////////////

@Test
void test_timeout() throws Exception {
assertInstanceOf(TimeoutException.class,
assertThrows(ExecutionException.class, () ->
cffuFactory.newIncompleteCffu().orTimeout(1, TimeUnit.MILLISECONDS).get()
).getCause());
assertInstanceOf(TimeoutException.class,
assertThrows(ExecutionException.class, () ->
cffuFactory.newIncompleteCffu().orTimeout(executorService, 1, TimeUnit.MILLISECONDS).get()
).getCause());

assertEquals(n, cffuFactory.completedFuture(n).orTimeout(1, TimeUnit.MILLISECONDS).get());
assertEquals(n, cffuFactory.completedFuture(n).orTimeout(executorService, 1, TimeUnit.MILLISECONDS).get());

assertEquals(n, cffuFactory.newIncompleteCffu().completeOnTimeout(
n, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, cffuFactory.newIncompleteCffu().completeOnTimeout(
n, executorService, 1, TimeUnit.MILLISECONDS).get());

assertEquals(n, cffuFactory.completedFuture(n).completeOnTimeout(
anotherN, 1, TimeUnit.MILLISECONDS).get());
assertEquals(n, cffuFactory.completedFuture(n).completeOnTimeout(
anotherN, executorService, 1, TimeUnit.MILLISECONDS).get());
}

////////////////////////////////////////////////////////////////////////////////
//# Read(explicitly) methods
//
Expand Down
Loading

0 comments on commit 335b90c

Please sign in to comment.