From 7969f99ba54318fa264d5f8a7643362ae1e7253a Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Wed, 12 Jun 2024 12:56:37 +0800 Subject: [PATCH] WIP --- .../cffu/CompletableFutureUtils.java | 371 ++++++++++++++++++ 1 file changed, 371 insertions(+) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 655239e0..4a4c0990 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -27,6 +27,227 @@ * @author Jerry Lee (oldratlee at gmail dot com) */ public final class CompletableFutureUtils { + //////////////////////////////////////////////////////////////////////////////// + //# multi-actions(M*) methods + // + // - mRun*Async(Runnable): Runnable* -> CompletableFuture + // mRunAsync / mRunFastFailAsync + // - mSupply*Async(Supplier): Supplier* -> CompletableFuture> + // mSupplyAsync / mSupplyFastFailAsync / mSupplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunAsync(Runnable... actions) { + return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor after runs the given actions. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return CompletableFuture.allOf(wrapActions(executor, actions)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunFastFailAsync(Runnable... actions) { + return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunFastFailAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return allOfFastFail(wrapActions(executor, actions)); + } + + private static CompletableFuture[] wrapActions(Executor executor, Runnable[] actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Supplier... suppliers) { + return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor with the values obtained by calling the given Suppliers. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOf(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Supplier... suppliers) { + return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOfFastFail(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the CompletableFuture's default asynchronous execution facility + * in the given time({@code timeout}), aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier... suppliers) { + return mSupplyMostSuccessAsync(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the supplied Executor in the given time({@code timeout}), + * aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + Executor executor, long timeout, TimeUnit unit, + @Nullable T valueIfNotSuccess, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, wrapSuppliers(executor, suppliers)); + } + + @SuppressWarnings("unchecked") + private static CompletableFuture[] wrapSuppliers(Executor executor, Supplier[] suppliers) { + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync((Supplier) suppliers[i], executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# allOf*/mostResultsOfSuccess methods //////////////////////////////////////////////////////////////////////////////// @@ -836,6 +1057,156 @@ private static CompletableFuture mostTupleOfSuccess0( // - thenCombineFastFail*(BiFunction): (T1, T2) -> U //////////////////////////////////////////////////////////////////////////////// + public static CompletableFuture thenMRunAsync(CompletionStage cf, Runnable... actions) { + return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMRunAsync(CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> CompletableFuture.allOf(wrapActions(executor, actions))); + } + + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Runnable... actions) { + return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> allOfFastFail(wrapActions(executor, actions))); + } + + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> CompletableFuture.allOf(wrapConsumers(executor, v, actions))); + } + + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allOfFastFail(wrapConsumers(executor, v, actions))); + } + + private static CompletableFuture[] wrapConsumers(Executor executor, T v, Consumer[] actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor); + } + return cfs; + } + + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Function... fns) { + return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOf(wrapFunctions(executor, v, fns))); + } + + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Function... fns) { + return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOfFastFail(wrapFunctions(executor, v, fns))); + } + + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns); + } + + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, Executor executor, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> mostResultsOfSuccess( + executor, timeout, unit, valueIfNotSuccess, wrapFunctions(executor, v, fns) + )); + } + + private static CompletableFuture[] wrapFunctions( + Executor executor, T v, Function[] fns) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor); + } + return cfs; + } + + //////////////////////////////////////////////////////////////////////////////// + //# `then both(binary input)` methods with fast-fail support: + // + // - runAfterBothFastFail*(Runnable): Void, Void -> Void + // - thenAcceptBothFastFail*(BiConsumer): (T1, T2) -> Void + // - thenCombineFastFail*(BiFunction): (T1, T2) -> U + //////////////////////////////////////////////////////////////////////////////// + /** * Returns a new CompletableFuture that, when two given stages both complete normally, executes the given action. * if any of the given stage complete exceptionally, then the returned CompletableFuture also does so