Skip to content

Commit

Permalink
feat: implement methods orTimeout/completeOnTimeout/hopAsyncIf
Browse files Browse the repository at this point in the history
…methods ⏳
  • Loading branch information
oldratlee committed May 23, 2024
1 parent 71fb89f commit 56c46a8
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD;
import static java.util.Objects.requireNonNull;


Expand Down Expand Up @@ -1342,6 +1343,10 @@ C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executo
/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>Strong recommend</strong> using method {@link #orTimeout(CompletableFuture, long, TimeUnit, Executor)},
* unless all dependent CompletableFutures is executed async
* (aka. the dependent CompletableFutures is created by async methods).
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
Expand All @@ -1361,13 +1366,33 @@ public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, T
return cf;
}

/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param executorWhenTimeout the async executor when triggered by timeout
* @return a new CompletableFuture
*/
public static <C extends CompletableFuture<?>> C orTimeout(
C cf, long timeout, TimeUnit unit, Executor executorWhenTimeout) {
final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
* <p>
* <strong>Strong recommend</strong> using method {@link #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor)},
* unless all dependent CompletableFutures is executed async
* (aka. the dependent CompletableFutures is created by async methods).
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return the given CompletableFuture
* @see #completeOnTimeout(CompletableFuture, Object, long, TimeUnit, Executor)
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
Expand All @@ -1384,6 +1409,30 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
return cf;
}

/**
* Completes given CompletableFuture with the given value if not otherwise completed before the given timeout.
*
* @param value the value to use upon timeout
* @param timeout how long to wait before completing normally with the given value, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param executorWhenTimeout the async executor when triggered by timeout
* @return a new CompletableFuture
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit, Executor executorWhenTimeout) {
final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}

@SuppressWarnings("unchecked")
private static <T, C extends CompletionStage<? extends T>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor executor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, executor).thenCompose(x -> (CompletionStage<T>) x)
: cf
).thenCompose(x -> (CompletionStage<T>) x);
}

//# Advanced methods of CompletionStage

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;


Expand Down Expand Up @@ -54,16 +55,30 @@ public static <T> ScheduledFuture<?> delayToCompleteCf(
return delay(new CfCompleter<>(cf, value), delay, unit);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler";

private static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(@NonNull Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CffuDelayScheduler");
t.setName(THREAD_NAME_OF_CFFU_DELAY_SCHEDULER);
return t;
}
}

/**
* Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
};

private Delayer() {
}
}
Expand Down

0 comments on commit 56c46a8

Please sign in to comment.