From e495734bd5e4a45f77897b979432d5d4af89e7fc Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Mon, 10 Jun 2024 19:31:13 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20the=20`multi-actions`=20met?= =?UTF-8?q?hods=20in=20`CompletableFutureUtils`=20=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/io/foldright/cffu/CffuFactory.java | 14 +- .../cffu/CompletableFutureUtils.java | 575 +++++++++++++++++- .../foldright/cffu/kotlin/CffuExtensions.kt | 24 +- .../kotlin/CompletableFutureExtensions.kt | 17 +- 4 files changed, 587 insertions(+), 43 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java b/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java index 0f6279df..c37e07e7 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java @@ -325,7 +325,7 @@ public Cffu allOf(CompletionStage... cfs) { } /** - * Returns a new Cffu with the results in the same order of all the given stages, + * Returns a new Cffu with the results in the same order of the given stages arguments, * the new Cffu is completed when all the given stages complete; * If any of the given stages complete exceptionally, then the returned Cffu also does so, * with a CompletionException holding this exception as its cause. @@ -389,7 +389,7 @@ public Cffu allOfFastFail(CompletionStage... cfs) { /** * Returns a new Cffu that is successful with the results in the same order - * of all the given stages when all the given stages success; + * of the given stages arguments when all the given stages success; * If any of the given stages complete exceptionally, then the returned Cffu also does so * *without* waiting other incomplete given stages, with a CompletionException holding this exception as its cause. * If no stages are provided, returns a Cffu completed with the value empty list. @@ -414,7 +414,7 @@ public final Cffu> allResultsOfFastFail(CompletionStage /** * Returns a new Cffu with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -629,7 +629,7 @@ public Cffu> allTupleOfFastFail( /** * Returns a new Cffu with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -647,7 +647,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -666,7 +666,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -686,7 +686,7 @@ public Cffu> mostTupleOfSuccess( /** * Returns a new Cffu with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index a2d0e11c..0896a632 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -27,6 +27,244 @@ * @author Jerry Lee (oldratlee at gmail dot com) */ public final class CompletableFutureUtils { + //////////////////////////////////////////////////////////////////////////////// + //# multi-actions(M*) methods + // + // - mRun*Async(Runnable): Runnable* -> CompletableFuture + // mRunAsync / mRunFastFailAsync + // - mSupply*Async(Supplier): Supplier* -> CompletableFuture> + // mSupplyAsync / mSupplyFastFailAsync / mSupplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunAsync(Runnable... actions) { + return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return CompletableFuture.allOf(wrapActions(executor, actions)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunFastFailAsync(Runnable... actions) { + return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunFastFailAsync(Executor executor, Runnable... actions) { + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return allOfFastFail(wrapActions(executor, actions)); + } + + private static CompletableFuture[] wrapActions(Executor executor, Runnable[] actions) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + * + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Supplier... suppliers) { + return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyAsync(Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOf(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + *

+ * This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior. + * + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync(Supplier... suppliers) { + return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the values obtained by calling the given Suppliers + * in the same order of the given Suppliers arguments. + *

+ * This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyFastFailAsync( + Executor executor, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return allResultsOfFastFail(wrapSuppliers(executor, suppliers)); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the most values obtained by calling the given Suppliers + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Suppliers arguments. + *

+ * If the given supplier is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier... suppliers) { + return mSupplyMostSuccessAsync(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the given Executor with the most values obtained by calling the given Suppliers + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Suppliers arguments. + *

+ * If the given supplier is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture + * @param the suppliers' return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SafeVarargs + public static CompletableFuture> mSupplyMostSuccessAsync( + Executor executor, long timeout, TimeUnit unit, + @Nullable T valueIfNotSuccess, Supplier... suppliers) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(suppliers, "suppliers is null"); + for (int i = 0; i < suppliers.length; i++) { + requireNonNull(suppliers[i], "supplier" + (i + 1) + " is null"); + } + return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, wrapSuppliers(executor, suppliers)); + } + + private static CompletableFuture[] wrapSuppliers( + Executor executor, Supplier[] suppliers) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync(suppliers[i], executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# allOf*/mostResultsOfSuccess methods //////////////////////////////////////////////////////////////////////////////// @@ -84,7 +322,7 @@ public static CompletableFuture allOf(CompletionStage... cfs) { } /** - * Returns a new CompletableFuture with the results in the same order of all the given stages, + * Returns a new CompletableFuture with the results in the same order of the given stages arguments, * the new CompletableFuture is completed when all the given stages complete; * If any of the given stages complete exceptionally, then the returned CompletableFuture also does so, * with a CompletionException holding this exception as its cause. @@ -184,7 +422,7 @@ public static CompletableFuture allOfFastFail(CompletionStage... cfs) { /** * Returns a new CompletableFuture that is successful with the results in the same order - * of all the given stages when all the given stages success; + * of the given stages arguments when all the given stages success; * If any of the given stages complete exceptionally, then the returned CompletableFuture also does so * *without* waiting other incomplete given stages, with a CompletionException holding this exception as its cause. * If no stages are provided, returns a CompletableFuture completed with the value empty list. @@ -226,7 +464,7 @@ public static CompletableFuture> allResultsOfFastFail(CompletionStag /** * Returns a new CompletableFuture with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -245,7 +483,7 @@ public static CompletableFuture> mostResultsOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -283,7 +521,7 @@ public static CompletableFuture> mostResultsOfSuccess( } /** - * Multi-Gets(MGet) the results in the same order of the given cfs, + * Multi-Gets(MGet) the results in the same order of the given cfs arguments, * use the result value if the given stage is completed successfully, else use the given valueIfNotSuccess * * @param cfs MUST be *Non-Minimal* CF instances in order to read results(`getSuccessNow`), @@ -660,7 +898,7 @@ private static T tupleOf0(Object... elements) { /** * Returns a new CompletableFuture with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -678,7 +916,7 @@ public static CompletableFuture> mostTupleOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given two stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given two stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -698,7 +936,7 @@ public static CompletableFuture> mostTupleOfSuccess( /** * Returns a new CompletableFuture with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -717,7 +955,7 @@ public static CompletableFuture> mostTupleOfSucc /** * Returns a new CompletableFuture with the most results in the same order of - * the given three stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given three stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -737,7 +975,7 @@ public static CompletableFuture> mostTupleOfSucc /** * Returns a new CompletableFuture with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -757,7 +995,7 @@ public static CompletableFuture> mostTup /** * Returns a new CompletableFuture with the most results in the same order of - * the given four stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given four stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -778,7 +1016,7 @@ public static CompletableFuture> mostTup /** * Returns a new CompletableFuture with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -798,7 +1036,7 @@ public static CompletableFuture> /** * Returns a new CompletableFuture with the most results in the same order of - * the given five stages in the given time({@code timeout}), aka as many results as possible in the given time. + * the given five stages arguments in the given time({@code timeout}, aka as many results as possible in the given time). *

* If the given stage is successful, its result is the completed value; Otherwise the value {@code null}. * @@ -828,6 +1066,315 @@ private static CompletableFuture mostTupleOfSuccess0( .handle((unused, ex) -> tupleOf0(MGetSuccessNow0(null, cfArray))); } + //////////////////////////////////////////////////////////////////////////////// + //# then-multi-actions(M*) methods + // + // - thenMRun*Async(Runnable): Runnable* -> CompletableFuture + // thenMRunAsync / thenMRunFastFailAsync + // - thenMAccept*Async(Supplier): Consumer* -> CompletableFuture + // thenMAcceptAsync / thenMAcceptFastFailAsync + // - thenMApply*Async(Supplier): Function* -> CompletableFuture> + // thenMApplyAsync / thenMApplyFastFailAsync / thenMApplyMostSuccessAsync + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Runnable... actions) { + return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunAsync(CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> CompletableFuture.allOf(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the CompletableFuture's default asynchronous execution facility. + *

+ * This method is the same as {@link #thenMRunAsync(CompletionStage, Runnable...)} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Runnable... actions) { + return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * executes the given actions using the given Executor. + *

+ * This method is the same as {@link #thenMRunAsync(CompletionStage, Executor, Runnable...)} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + public static CompletableFuture thenMRunFastFailAsync( + CompletionStage cf, Executor executor, Runnable... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(unused -> allOfFastFail(wrapActions(executor, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> CompletableFuture.allOf(wrapConsumers(executor, v, actions))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the given stage's result as the argument to the given actions. + *

+ * This method is the same as {@link #thenMAcceptAsync(CompletionStage, Executor, Consumer[])} + * except for the fast-fail behavior. + * + * @param actions the actions to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(actions, "actions is null"); + for (int i = 0; i < actions.length; i++) { + requireNonNull(actions[i], "action" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allOfFastFail(wrapConsumers(executor, v, actions))); + } + + private static CompletableFuture[] wrapConsumers(Executor executor, T v, Consumer[] actions) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor); + } + return cfs; + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Function... fns) { + return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOf(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Function... fns) { + return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the same order of the given Functions arguments. + *

+ * This method is the same as {@link #thenMApplyAsync(CompletionStage, Executor, Function[])} + * except for the fast-fail behavior. + * + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Executor executor, Function... fns) { + requireNonNull(cf, "cf is null"); + requireNonNull(executor, "executor is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> allResultsOfFastFail(wrapFunctions(executor, v, fns))); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the CompletableFuture's default asynchronous execution facility, + * with the most values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Functions arguments. + *

+ * If the given function is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns); + } + + /** + * Returns a new CompletableFuture that, when the given stage completes normally, + * is executed using the given Executor, with the most values obtained by calling the given Functions + * (with the given stage's result as the argument to the given functions) + * in the given time({@code timeout}, aka as many results as possible in the given time) + * in the same order of the given Functions arguments. + *

+ * If the given function is successful in the given time, the return result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param executor the executor to use for asynchronous execution + * @param timeout how long to wait in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @param valueIfNotSuccess the value to return if not completed successfully + * @param fns the functions to use to compute the values of the returned CompletableFuture + * @param the functions' return type + * @return the new CompletableFuture + */ + @SafeVarargs + public static CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, Executor executor, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + requireNonNull(executor, "executor is null"); + requireNonNull(unit, "unit is null"); + requireNonNull(fns, "fns is null"); + for (int i = 0; i < fns.length; i++) { + requireNonNull(fns[i], "fn" + (i + 1) + " is null"); + } + return toNonMinCf(cf).thenCompose(v -> mostResultsOfSuccess( + executor, timeout, unit, valueIfNotSuccess, wrapFunctions(executor, v, fns) + )); + } + + private static CompletableFuture[] wrapFunctions( + Executor executor, T v, Function[] fns) { + @SuppressWarnings("unchecked") + CompletableFuture[] cfs = new CompletableFuture[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor); + } + return cfs; + } + //////////////////////////////////////////////////////////////////////////////// //# `then both(binary input)` methods with fast-fail support: // @@ -944,7 +1491,6 @@ public static CompletableFuture thenAcceptBothFastFail( * @return the new CompletableFuture * @see CompletionStage#thenAcceptBothAsync(CompletionStage, BiConsumer) */ - @SuppressWarnings("unchecked") public static CompletableFuture thenAcceptBothFastFailAsync( CompletionStage cf1, CompletionStage cf2, BiConsumer action) { @@ -1028,7 +1574,6 @@ public static CompletableFuture thenCombineFastFail( * @return the new CompletableFuture * @see CompletionStage#thenCombineAsync(CompletionStage, BiFunction) */ - @SuppressWarnings("unchecked") public static CompletableFuture thenCombineFastFailAsync( CompletionStage cf1, CompletionStage cf2, BiFunction fn) { diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt index d43f83fa..48561d0e 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CffuExtensions.kt @@ -66,7 +66,7 @@ private const val ERROR_MSG_FOR_COLL = "no cffuFactory argument provided when th private const val ERROR_MSG_FOR_ARRAY = "no cffuFactory argument provided when this array is empty" /** - * Returns a new Cffu with the results in the **same order** of all the given Cffus, + * Returns a new Cffu with the results in the **same order** of the given Cffus arguments, * the new Cffu is completed when all the given Cffus complete. * If any of the given Cffus complete exceptionally, then the returned Cffu * also does so, with a CompletionException holding this exception as its cause. @@ -89,7 +89,7 @@ fun Collection>.allResultsOfCffu(cffuFactory: CffuFactory = ABSE } /** - * Returns a new Cffu with the results in the **same order** of all the given Cffus, + * Returns a new Cffu with the results in the **same order** of the given Cffus arguments, * the new Cffu is completed when all the given Cffus complete. * If any of the given Cffus complete exceptionally, then the returned Cffu * also does so, with a CompletionException holding this exception as its cause. @@ -112,7 +112,7 @@ fun Array>.allResultsOfCffu(cffuFactory: CffuFactory = ABSEN } /** - * Returns a new Cffu with the results in the **same order** of all the given stages, + * Returns a new Cffu with the results in the **same order** of the given stages arguments, * the new Cffu is completed when all the given stages complete. * If any of the given stages complete exceptionally, then the returned Cffu * also does so, with a CompletionException holding this exception as its cause. @@ -130,7 +130,7 @@ fun Collection>.allResultsOfCffu(cffuFactory: CffuFac cffuFactory.allResultsOf(*toTypedArray()) /** - * Returns a new Cffu with the results in the **same order** of all the given stages, + * Returns a new Cffu with the results in the **same order** of the given stages arguments, * the new Cffu is completed when all the given stages complete. * If any of the given stages complete exceptionally, then the returned Cffu * also does so, with a CompletionException holding this exception as its cause. @@ -225,7 +225,7 @@ fun Array>.allOfCffu(cffuFactory: CffuFactory): Cffu Collection>.allResultsOfFastFailCffu(cffuFactory: CffuFactor } /** - * Returns a new Cffu with the results in the **same order** of all the given Cffus, + * Returns a new Cffu with the results in the **same order** of the given Cffus arguments, * the new Cffu success when all the given Cffus success. * If any of the given Cffus complete exceptionally, then the returned Cffu * also does so *without* waiting other incomplete given Cffus, @@ -273,7 +273,7 @@ fun Array>.allResultsOfFastFailCffu(cffuFactory: CffuFactory } /** - * Returns a new Cffu with the results in the **same order** of all the given stages, + * Returns a new Cffu with the results in the **same order** of the given stages arguments, * the new Cffu success when all the given stages success. * If any of the given stages complete exceptionally, then the returned Cffu * also does so *without* waiting other incomplete given stages, @@ -292,7 +292,7 @@ fun Collection>.allResultsOfFastFailCffu(cffuFactory: cffuFactory.allResultsOfFastFail(*toTypedArray()) /** - * Returns a new Cffu with the results in the **same order** of all the given stages, + * Returns a new Cffu with the results in the **same order** of the given stages arguments, * the new Cffu success when all the given stages success. * If any of the given stages complete exceptionally, then the returned Cffu * also does so *without* waiting other incomplete given stages, @@ -392,7 +392,7 @@ fun Array>.allOfFastFailCffu(cffuFactory: CffuFactory): C /** * Returns a new Cffu with the most results in the **same order** of - * the given Cffus in the given time(`timeout`), aka as many results as possible in the given time. + * the given Cffus arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -413,7 +413,7 @@ fun Collection>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given Cffus in the given time(`timeout`), aka as many results as possible in the given time. + * the given Cffus arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -434,7 +434,7 @@ fun Array>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -453,7 +453,7 @@ fun Collection>.mostResultsOfSuccessCffu( /** * Returns a new Cffu with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * (aka the result extraction logic is [Cffu.getSuccessNow]). diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt index 8db439e1..071fd3a7 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt @@ -5,7 +5,6 @@ package io.foldright.cffu.kotlin import io.foldright.cffu.Cffu import io.foldright.cffu.CffuState import io.foldright.cffu.CompletableFutureUtils -import java.util.* import java.util.concurrent.* import java.util.function.* import java.util.function.Function @@ -107,7 +106,7 @@ fun Array>.allOfFastFailCompletableFuture(): CompletableF /** * Returns a new CompletableFuture with the results in the **same order** of all the given - * CompletableFutures, the returned new CompletableFuture is completed when all the given CompletableFutures complete. + * CompletableFutures arguments, the returned new CompletableFuture is completed when all the given CompletableFutures complete. * If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture * also does so, with a CompletionException holding this exception as its cause. * If no CompletableFutures are provided, returns a CompletableFuture completed with the value empty list. @@ -125,7 +124,7 @@ fun Collection>.allResultsOfCompletableFuture(): Comp /** * Returns a new CompletableFuture with the results in the **same order** of all the given - * CompletableFutures, the returned new CompletableFuture is completed when all the given CompletableFutures complete. + * CompletableFutures arguments, the returned new CompletableFuture is completed when all the given CompletableFutures complete. * If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture * also does so, with a CompletionException holding this exception as its cause. * If no CompletableFutures are provided, returns a CompletableFuture completed with the value empty list. @@ -143,7 +142,7 @@ fun Array>.allResultsOfCompletableFuture(): Compl /** * Returns a new CompletableFuture with the results in the **same order** of all the given - * CompletableFutures, the new CompletableFuture success when all the given CompletableFutures success. + * CompletableFutures arguments, the new CompletableFuture success when all the given CompletableFutures success. * If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture * also does so *without* waiting other incomplete given CompletableFutures, * with a CompletionException holding this exception as its cause. @@ -162,7 +161,7 @@ fun Collection>.allResultsOfFastFailCompletableFuture /** * Returns a new CompletableFuture with the results in the **same order** of all the given - * CompletableFutures, the new CompletableFuture success when all the given CompletableFutures success. + * CompletableFutures arguments, the new CompletableFuture success when all the given CompletableFutures success. * If any of the given CompletableFutures complete exceptionally, then the returned CompletableFuture * also does so *without* waiting other incomplete given CompletableFutures, * with a CompletionException holding this exception as its cause. @@ -181,7 +180,7 @@ fun Array>.allResultsOfFastFailCompletableFuture( /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -197,7 +196,7 @@ fun Collection>.mostResultsOfSuccessCompletableFuture /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -213,7 +212,7 @@ fun Array>.mostResultsOfSuccessCompletableFuture( /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. * @@ -232,7 +231,7 @@ fun Collection>.mostResultsOfSuccessCompletableFuture /** * Returns a new CompletableFuture with the most results in the **same order** of - * the given stages in the given time(`timeout`), aka as many results as possible in the given time. + * the given stages arguments in the given time(`timeout`, aka as many results as possible in the given time). * * If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess. *