Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Jun 13, 2024
1 parent 6ffa1fc commit a5a8166
Showing 1 changed file with 361 additions and 0 deletions.
361 changes: 361 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,227 @@
* @author Jerry Lee (oldratlee at gmail dot com)
*/
public final class CompletableFutureUtils {
////////////////////////////////////////////////////////////////////////////////
//# multi-actions(M*) methods
//
// - mRun*Async(Runnable): Runnable* -> CompletableFuture<Void>
// mRunAsync / mRunFastFailAsync
// - mSupply*Async(Supplier): Supplier<T>* -> CompletableFuture<List<T>>
// mSupplyAsync / mSupplyFastFailAsync / mSupplyMostSuccessAsync
////////////////////////////////////////////////////////////////////////////////

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* after runs the given actions.
*
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see #allOf(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable)
*/
public static CompletableFuture<Void> mRunAsync(Runnable... actions) {
return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the supplied Executor after runs the given actions.
*
* @param executor the executor to use for asynchronous execution
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see #allOf(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable, Executor)
*/
public static CompletableFuture<Void> mRunAsync(Executor executor, Runnable... actions) {
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return CompletableFuture.allOf(wrapActions(executor, actions));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* after runs the given actions.
* <p>
* This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior.
*
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see #allOfFastFail(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable)
*/
public static CompletableFuture<Void> mRunFastFailAsync(Runnable... actions) {
return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the supplied Executor after runs the given actions.
* <p>
* This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior.
*
* @param executor the executor to use for asynchronous execution
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see #allOfFastFail(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable, Executor)
*/
public static CompletableFuture<Void> mRunFastFailAsync(Executor executor, Runnable... actions) {
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return allOfFastFail(wrapActions(executor, actions));
}

private static CompletableFuture<?>[] wrapActions(Executor executor, Runnable[] actions) {
CompletableFuture<?>[] cfs = new CompletableFuture[actions.length];
for (int i = 0; i < actions.length; i++) {
cfs[i] = CompletableFuture.runAsync(actions[i], executor);
}
return cfs;
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* with the values obtained by calling the given Suppliers.
*
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #allResultsOf(CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyAsync(Supplier<? extends T>... suppliers) {
return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the supplied Executor with the values obtained by calling the given Suppliers.
*
* @param executor the executor to use for asynchronous execution
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #allResultsOf(CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyAsync(Executor executor, Supplier<? extends T>... suppliers) {
requireNonNull(executor, "executor is null");
requireNonNull(suppliers, "suppliers is null");
for (int i = 0; i < suppliers.length; i++) {
requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null");
}
return allResultsOf(wrapSuppliers(executor, suppliers));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* with the values obtained by calling the given Suppliers.
* <p>
* This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior.
*
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #allResultsOfFastFail(CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyFastFailAsync(Supplier<? extends T>... suppliers) {
return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers);
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the supplied Executor with the values obtained by calling the given Suppliers.
* <p>
* This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior.
*
* @param executor the executor to use for asynchronous execution
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #allResultsOfFastFail(CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyFastFailAsync(Executor executor, Supplier<? extends T>... suppliers) {
requireNonNull(executor, "executor is null");
requireNonNull(suppliers, "suppliers is null");
for (int i = 0; i < suppliers.length; i++) {
requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null");
}
return allResultsOfFastFail(wrapSuppliers(executor, suppliers));
}

/**
* Returns a new CompletableFuture with the most results in the <strong>same order</strong> of tasks
* running in the CompletableFuture's default asynchronous execution facility
* in the given time({@code timeout}), aka as many results as possible in the given time.
* <p>
* If the given supplier is successful in time, its result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyMostSuccessAsync(
long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier<? extends T>... suppliers) {
return mSupplyMostSuccessAsync(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers);
}

/**
* Returns a new CompletableFuture with the most results in the <strong>same order</strong> of tasks
* running in the supplied Executor in the given time({@code timeout}),
* aka as many results as possible in the given time.
* <p>
* If the given supplier is successful in time, its result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param suppliers functions returning the value to be used to complete the returned CompletableFuture
* @param <T> the function's return type
* @return the new CompletableFuture
* @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier, Executor)
*/
@SafeVarargs
public static <T> CompletableFuture<List<T>> mSupplyMostSuccessAsync(
Executor executor, long timeout, TimeUnit unit,
@Nullable T valueIfNotSuccess, Supplier<? extends T>... suppliers) {
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireNonNull(suppliers, "suppliers is null");
for (int i = 0; i < suppliers.length; i++) {
requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null");
}
return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, wrapSuppliers(executor, suppliers));
}

@SuppressWarnings("unchecked")
private static <T> CompletableFuture<T>[] wrapSuppliers(Executor executor, Supplier<? extends T>[] suppliers) {
CompletableFuture<T>[] cfs = new CompletableFuture[suppliers.length];
for (int i = 0; i < suppliers.length; i++) {
cfs[i] = CompletableFuture.supplyAsync((Supplier<T>) suppliers[i], executor);
}
return cfs;
}

////////////////////////////////////////////////////////////////////////////////
//# allOf*/mostResultsOfSuccess methods
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -836,6 +1057,146 @@ private static <T> CompletableFuture<T> mostTupleOfSuccess0(
// - thenCombineFastFail*(BiFunction): (T1, T2) -> U
////////////////////////////////////////////////////////////////////////////////

public static CompletableFuture<Void> thenMRunAsync(CompletionStage<?> cf, Runnable... actions) {
return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions);
}

public static CompletableFuture<Void> thenMRunAsync(CompletionStage<?> cf, Executor executor, Runnable... actions) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(unused -> CompletableFuture.allOf(wrapActions(executor, actions)));
}

public static CompletableFuture<Void> thenMRunFastFailAsync(CompletionStage<?> cf, Runnable... actions) {
return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions);
}

public static CompletableFuture<Void> thenMRunFastFailAsync(CompletionStage<?> cf, Executor executor, Runnable... actions) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(unused -> allOfFastFail(wrapActions(executor, actions)));
}

public static <T> CompletableFuture<Void> thenMAcceptAsync(
CompletionStage<? extends T> cf, Consumer<? super T>... actions) {
return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions);
}

public static <T> CompletableFuture<Void> thenMAcceptAsync(
CompletionStage<? extends T> cf, Executor executor, Consumer<? super T>... actions) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(v -> CompletableFuture.allOf(wrapConsumers(executor, v, actions)));
}

public static <T> CompletableFuture<Void> thenMAcceptFastFailAsync(
CompletionStage<? extends T> cf, Consumer<? super T>... actions) {
return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions);
}

public static <T> CompletableFuture<Void> thenMAcceptFastFailAsync(
CompletionStage<? extends T> cf, Executor executor, Consumer<? super T>... actions) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(actions, "actions is null");
for (int i = 0; i < actions.length; i++) {
requireNonNull(actions[i], "action" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(v -> allOfFastFail(wrapConsumers(executor, v, actions)));
}

private static <T> CompletableFuture<?>[] wrapConsumers(Executor executor, T v, Consumer<? super T>[] actions) {
CompletableFuture<?>[] cfs = new CompletableFuture[actions.length];
for (int i = 0; i < actions.length; i++) {
final int idx = i;
cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor);
}
return cfs;
}

public <T, U> CompletableFuture<List<U>> thenMApplyAsync(
CompletionStage<? extends T> cf, Function<? super T, ? extends U>... fns) {
return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns);
}

public <T, U> CompletableFuture<List<U>> thenMApplyAsync(
CompletionStage<? extends T> cf, Executor executor, Function<? super T, ? extends U>... fns) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(fns, "fns is null");
for (int i = 0; i < fns.length; i++) {
requireNonNull(fns[i], "fn" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(v -> allResultsOf(wrapFunctions(executor, v, fns)));
}

public <T, U> CompletableFuture<List<U>> thenMApplyFastFailAsync(
CompletionStage<? extends T> cf, Function<? super T, ? extends U>... fns) {
return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns);
}

public <T, U> CompletableFuture<List<U>> thenMApplyFastFailAsync(
CompletionStage<? extends T> cf, Executor executor, Function<? super T, ? extends U>... fns) {
requireNonNull(cf, "cf is null");
requireNonNull(executor, "executor is null");
requireNonNull(fns, "fns is null");
for (int i = 0; i < fns.length; i++) {
requireNonNull(fns[i], "fn" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(v -> allResultsOfFastFail(wrapFunctions(executor, v, fns)));
}

public <T, U> CompletableFuture<List<U>> thenMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, long timeout, TimeUnit unit,
@Nullable U valueIfNotSuccess, Function<? super T, ? extends U>... fns) {
return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns);
}

public <T, U> CompletableFuture<List<U>> thenMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, Executor executor, long timeout, TimeUnit unit,
@Nullable U valueIfNotSuccess, Function<? super T, ? extends U>... fns) {
requireNonNull(fns, "fns is null");
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireNonNull(fns, "fns is null");
for (int i = 0; i < fns.length; i++) {
requireNonNull(fns[i], "fn" + (i + 1) + " is null");
}
return toNonMinCf(cf).thenCompose(v ->
mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, wrapFunctions(executor, v, fns)));
}

private static <T, U> CompletableFuture<U>[] wrapFunctions(
Executor executor, T v, Function<? super T, ? extends U>[] fns) {
@SuppressWarnings("unchecked")
CompletableFuture<U>[] cfs = new CompletableFuture[fns.length];
for (int i = 0; i < fns.length; i++) {
final int idx = i;
cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor);
}
return cfs;
}

////////////////////////////////////////////////////////////////////////////////
//# `then both(binary input)` methods with fast-fail support:
//
// - runAfterBothFastFail*(Runnable): Void, Void -> Void
// - thenAcceptBothFastFail*(BiConsumer): (T1, T2) -> Void
// - thenCombineFastFail*(BiFunction): (T1, T2) -> U
////////////////////////////////////////////////////////////////////////////////

/**
* Returns a new CompletableFuture that, when two given stages both complete normally, executes the given action.
* if any of the given stage complete exceptionally, then the returned CompletableFuture also does so
Expand Down

0 comments on commit a5a8166

Please sign in to comment.