From 8b6b5abcc33aecf6249868b88210c3ef58ae45f6 Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Mon, 10 Jun 2024 19:31:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20the=20`multi-actions`=20met?= =?UTF-8?q?hods=20in=20`CompletableFutureUtils`=20=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/foldright/cffu/CffuFactory.java | 10 +- .../cffu/CompletableFutureUtils.java | 570 +++++++++++++++++- .../foldright/cffu/kotlin/CffuExtensions.kt | 8 +- .../kotlin/CompletableFutureExtensions.kt | 9 +- 4 files changed, 568 insertions(+), 29 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java b/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java index 0f6279df..8c18d35c 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java @@ -414,7 +414,7 @@ public final Cffu> allResultsOfFastFail(CompletionStage /** * Returns a new Cffu with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -629,7 +629,7 @@ public Cffu> allTupleOfFastFail( /** * Returns a new Cffu with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -647,7 +647,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -666,7 +666,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -686,7 +686,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 655239e0..dbc951d4 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -27,6 +27,243 @@ * @author Jerry Lee (oldratlee at gmail dot com) */ public final class CompletableFutureUtils { + //////////////////////////////////////////////////////////////////////////////// + //# multi-actions(M*) methods + // + // - mRun*Async(Runnable): Runnable* -> CompletableFuture + // mRunAsync / mRunFastFailAsync + // - mSupply*Async(Supplier): Supplier* -> CompletableFuture> + // mSupplyAsync / mSupplyFastFailAsync / mSupplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunAsync(Runnable... actions) { + return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return CompletableFuture.allOf(wrapActions(executor, actions)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunFastFailAsync(Runnable... actions) { + return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunFastFailAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return allOfFastFail(wrapActions(executor, actions)); + } + + private static CompletableFuture[] wrapActions(Executor executor, Runnable[] actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + * + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Supplier... suppliers) { + return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOf(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + *

+ * This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior. + * + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Supplier... suppliers) { + return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + *

+ * This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync( + Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOfFastFail(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the most values obtained by calling the given Suppliers + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Suppliers arguments. + *

+ * If the given supplier is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier... suppliers) { + return mSupplyMostSuccessAsync(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the most values obtained by calling the given Suppliers + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Suppliers arguments. + *

+ * If the given supplier is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + Executor executor, long timeout, TimeUnit unit, + @Nullable T valueIfNotSuccess, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, wrapSuppliers(executor, suppliers)); + } + + private static CompletableFuture[] wrapSuppliers( + Executor executor, Supplier[] suppliers) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync(suppliers[i], executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# allOf*/mostResultsOfSuccess methods //////////////////////////////////////////////////////////////////////////////// @@ -84,7 +321,7 @@ public static CompletableFuture allOf(CompletionStage... cfs) { } /** - * Returns a new CompletableFuture with the results in the same order of all the given stages, + * Returns a new CompletableFuture with the results in the same order of the given stages arguments, * the new CompletableFuture is completed when all the given stages complete; * If any of the given stages complete exceptionally, then the returned CompletableFuture also does so, * with a CompletionException holding this exception as its cause. @@ -184,7 +421,7 @@ public static CompletableFuture allOfFastFail(CompletionStage... cfs) { /** * Returns a new CompletableFuture that is successful with the results in the same order - * of all the given stages when all the given stages success; + * of the given stages arguments when all the given stages success; * If any of the given stages complete exceptionally, then the returned CompletableFuture also does so * *without* waiting other incomplete given stages, with a CompletionException holding this exception as its cause. * If no stages are provided, returns a CompletableFuture completed with the value empty list. @@ -226,7 +463,7 @@ public static CompletableFuture> allResultsOfFastFail(CompletionStag /** * Returns a new CompletableFuture with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -245,7 +482,7 @@ public static CompletableFuture> mostResultsOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -283,7 +520,7 @@ public static CompletableFuture> mostResultsOfSuccess( } /** - * Multi-Gets(MGet) the results in the same order of the given cfs, + * Multi-Gets(MGet) the results in the same order of the given cfs arguments, * use the result value if the given stage is completed successfully, else use the given valueIfNotSuccess * * @param cfs MUST be *Non-Minimal* CF instances in order to read results(`getSuccessNow`), @@ -660,7 +897,7 @@ private static T tupleOf0(Object... elements) { /** * Returns a new CompletableFuture with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -678,7 +915,7 @@ public static CompletableFuture> mostTupleOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -698,7 +935,7 @@ public static CompletableFuture> mostTupleOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -717,7 +954,7 @@ public static CompletableFuture> mostTupleOfSucc /** * Returns a new CompletableFuture with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -737,7 +974,7 @@ public static CompletableFuture> mostTupleOfSucc /** * Returns a new CompletableFuture with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -757,7 +994,7 @@ public static CompletableFuture> mostTup /** * Returns a new CompletableFuture with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -778,7 +1015,7 @@ public static CompletableFuture> mostTup /** * Returns a new CompletableFuture with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -798,7 +1035,7 @@ public static CompletableFuture> /** * Returns a new CompletableFuture with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -828,6 +1065,311 @@ private static CompletableFuture mostTupleOfSuccess0( .handle((unused, ex) -> tupleOf0(MGetSuccessNow0(null, cfArray))); } + //////////////////////////////////////////////////////////////////////////////// + //# then-multi-actions(M*) methods + // + // - thenMRun*Async(Runnable): Runnable* -> CompletableFuture + // thenMRunAsync / thenMRunFastFailAsync + // - thenMAccept*Async(Supplier): Consumer* -> CompletableFuture + // thenMAcceptAsync / thenMAcceptFastFailAsync + // - thenMApply*Async(Supplier): Function* -> CompletableFuture> + // thenMApplyAsync / thenMApplyFastFailAsync / thenMApplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Runnable... actions) { + return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> CompletableFuture.allOf(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + *

+ * This method is the same as {@link #thenMRunAsync(CompletionStage, Runnable...)} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Runnable... actions) { + return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + *

+ * This method is the same as {@link #thenMRunAsync(CompletionStage, Executor, Runnable...)} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync( + CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> allOfFastFail(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> CompletableFuture.allOf(wrapConsumers(executor, v, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Executor, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allOfFastFail(wrapConsumers(executor, v, actions))); + } + + private static CompletableFuture[] wrapConsumers(Executor executor, T v, Consumer[] actions) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Function... fns) { + return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOf(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Function... fns) { + return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Executor, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOfFastFail(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * executing in the CompletableFuture's default asynchronous execution facility + * with the given stage's result as the argument to the given functions + * in the given time({@code timeout}, aka as many results as possible in the given time). + *

+ * If the given fn is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * executing in the given Executor with the given stage's result as the argument to the given functions + * in the given time({@code timeout}, aka as many results as possible in the given time). + *

+ * If the given fn is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, Executor executor, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> mostResultsOfSuccess( + executor, timeout, unit, valueIfNotSuccess, wrapFunctions(executor, v, fns) + )); + } + + private static CompletableFuture[] wrapFunctions( + Executor executor, T v, Function[] fns) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# `then both(binary input)` methods with fast-fail support: // @@ -944,7 +1486,6 @@ public static CompletableFuture thenAcceptBothFastFail( * @return the new CompletableFuture * @see CompletionStage#thenAcceptBothAsync(CompletionStage, BiConsumer) */ - @SuppressWarnings("unchecked") public static CompletableFuture thenAcceptBothFastFailAsync( CompletionStage cf1, CompletionStage cf2, BiConsumer action) { @@ -1028,7 +1569,6 @@ public static CompletableFuture thenCombineFastFail( * @return the new CompletableFuture * @see CompletionStage#thenCombineAsync(CompletionStage, BiFunction) */ - @SuppressWarnings("unchecked") public static CompletableFuture thenCombineFastFailAsync( CompletionStage cf1, CompletionStage cf2, BiFunction fn) { diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt index d43f83fa..ddf96157 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt @@ -392,7 +392,7 @@ fun Array>.allOfFastFailCffu(cffuFactory: CffuFactory): C /** * Returns a new Cffu with the most results in the **same order** of - * the given Cffus in the given time(`timeout`), aka as many results as possible in the given time. + * the given Cffus in the given time(`timeout`, aka as many results as possible in the given time). * * If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -413,7 +413,7 @@ fun Collection>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given Cffus in the given time(`timeout`), aka as many results as possible in the given time. + * the given Cffus in the given time(`timeout`, aka as many results as possible in the given time). * * If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -434,7 +434,7 @@ fun Array>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -453,7 +453,7 @@ fun Collection>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * (aka the result extraction logic is [Cffu.getSuccessNow]). diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt index 8db439e1..5a226d0a 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt @@ -5,7 +5,6 @@ package io.foldright.cffu.kotlin import io.foldright.cffu.Cffu import io.foldright.cffu.CffuState import io.foldright.cffu.CompletableFutureUtils -import java.util.* import java.util.concurrent.* import java.util.function.* import java.util.function.Function @@ -181,7 +180,7 @@ fun Array>.allResultsOfFastFailCompletableFuture( /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -197,7 +196,7 @@ fun Collection>.mostResultsOfSuccessCompletableFuture /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -213,7 +212,7 @@ fun Array>.mostResultsOfSuccessCompletableFuture( /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -232,7 +231,7 @@ fun Collection>.mostResultsOfSuccessCompletableFuture /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. *