From aeef03c08b28b213f3f8fcd6d4dfc6532ac2093b Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Mon, 10 Jun 2024 19:31:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20the=20`multi-actions`=20met?= =?UTF-8?q?hods=20in=20`CompletableFutureUtils`=20=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtils.java | 525 ++++++++++++++++++ 1 file changed, 525 insertions(+) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 655239e0..79d1970f 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -27,6 +27,235 @@ * @author Jerry Lee (oldratlee at gmail dot com) */ public final class CompletableFutureUtils { + //////////////////////////////////////////////////////////////////////////////// + //# multi-actions(M*) methods + // + // - mRun*Async(Runnable): Runnable* -> CompletableFuture + // mRunAsync / mRunFastFailAsync + // - mSupply*Async(Supplier): Supplier* -> CompletableFuture> + // mSupplyAsync / mSupplyFastFailAsync / mSupplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunAsync(Runnable... actions) { + return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return CompletableFuture.allOf(wrapActions(executor, actions)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunFastFailAsync(Runnable... actions) { + return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunFastFailAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return allOfFastFail(wrapActions(executor, actions)); + } + + private static CompletableFuture[] wrapActions(Executor executor, Runnable[] actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + * + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Supplier... suppliers) { + return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOf(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior. + * + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Supplier... suppliers) { + return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOfFastFail(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the CompletableFuture's default asynchronous execution facility + * in the given time({@code timeout}), aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier... suppliers) { + return mSupplyMostSuccessAsync(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the given Executor in the given time({@code timeout}), + * aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + Executor executor, long timeout, TimeUnit unit, + @Nullable T valueIfNotSuccess, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, wrapSuppliers(executor, suppliers)); + } + + private static CompletableFuture[] wrapSuppliers( + Executor executor, Supplier[] suppliers) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync(suppliers[i], executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# allOf*/mostResultsOfSuccess methods //////////////////////////////////////////////////////////////////////////////// @@ -828,6 +1057,302 @@ private static CompletableFuture mostTupleOfSuccess0( .handle((unused, ex) -> tupleOf0(MGetSuccessNow0(null, cfArray))); } + //////////////////////////////////////////////////////////////////////////////// + //# then-multi-actions(M*) methods + // + // - thenMRun*Async(Runnable): Runnable* -> CompletableFuture + // thenMRunAsync / thenMRunFastFailAsync + // - thenMAccept*Async(Supplier): Consumer* -> CompletableFuture + // thenMAcceptAsync / thenMAcceptFastFailAsync + // - thenMApply*Async(Supplier): Function* -> CompletableFuture> + // thenMApplyAsync / thenMApplyFastFailAsync / thenMApplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Runnable... actions) { + return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> CompletableFuture.allOf(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Runnable... actions) { + return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + *

+ * This method is the same as {@link #thenMRunAsync(CompletionStage, Executor, Runnable...)} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync( + CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> allOfFastFail(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> CompletableFuture.allOf(wrapConsumers(executor, v, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Executor, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allOfFastFail(wrapConsumers(executor, v, actions))); + } + + private static CompletableFuture[] wrapConsumers(Executor executor, T v, Consumer[] actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that, when this stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given functions. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Function... fns) { + return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when this stage completes normally, is executed using the given Executor, + * with the given stage's result as the argument to the given functions. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOf(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture that, when this stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given functions. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Function... fns) { + return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when this stage completes normally, is executed using the given Executor, + * with the given stage's result as the argument to the given functions. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Executor, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOfFastFail(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * executing in the CompletableFuture's default asynchronous execution facility + * with the given stage's result as the argument to the given functions + * in the given time({@code timeout}), aka as many results as possible in the given time. + *

+ * If the given fn is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * executing in the given Executor with the given stage's result as the argument to the given functions + * in the given time({@code timeout}), aka as many results as possible in the given time. + *

+ * If the given fn is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, Executor executor, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> mostResultsOfSuccess( + executor, timeout, unit, valueIfNotSuccess, wrapFunctions(executor, v, fns) + )); + } + + private static CompletableFuture[] wrapFunctions( + Executor executor, T v, Function[] fns) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# `then both(binary input)` methods with fast-fail support: //