From 50fbd806115aa08d9c06f0399f83deb47ba2abb2 Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Wed, 12 Jun 2024 12:56:37 +0800 Subject: [PATCH] WIP --- .../cffu/CompletableFutureUtils.java | 305 ++++++++++++++++++ 1 file changed, 305 insertions(+) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 655239e0..268a14d8 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -27,6 +27,197 @@ * @author Jerry Lee (oldratlee at gmail dot com) */ public final class CompletableFutureUtils { + //////////////////////////////////////////////////////////////////////////////// + //# multi-run methods + //////////////////////////////////////////////////////////////////////////////// + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunAsync(Runnable... actions) { + return mRunAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor after runs the given actions. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOf(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunAsync(Executor executor, Runnable... actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return CompletableFuture.allOf(cfs); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior. + * + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable) + */ + public static CompletableFuture mRunFastFailAsync(Runnable... actions) { + return mRunFastFailAsync(AsyncPoolHolder.ASYNC_POOL, actions); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor after runs the given actions. + *

+ * This method is the same as {@link #mRunAsync(Executor, Runnable...)} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param actions the actions to run before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see #allOfFastFail(CompletionStage[]) + * @see CompletableFuture#runAsync(Runnable, Executor) + */ + public static CompletableFuture mRunFastFailAsync(Executor executor, Runnable... actions) { + CompletableFuture[] cfs = new CompletableFuture[actions.length]; + for (int i = 0; i < actions.length; i++) { + cfs[i] = CompletableFuture.runAsync(actions[i], executor); + } + return allOfFastFail(cfs); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + public static CompletableFuture> mSupplyAsync(Supplier... suppliers) { + return mSupplyAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor with the values obtained by calling the given Suppliers. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOf(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SuppressWarnings("unchecked") + public static CompletableFuture> mSupplyAsync(Executor executor, Supplier... suppliers) { + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync((Supplier) suppliers[i], executor); + } + return allResultsOf(cfs); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the CompletableFuture's default asynchronous execution facility + * with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + @SuppressWarnings("unchecked") + public static CompletableFuture> mSupplyFastFailAsync(Supplier... suppliers) { + return mSupplyFastFailAsync(AsyncPoolHolder.ASYNC_POOL, suppliers); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by tasks running in the supplied Executor with the values obtained by calling the given Suppliers. + *

+ * This method is the same as {@link #mSupplyAsync(Executor, Supplier[])} except for the fast-fail behavior. + * + * @param executor the executor to use for asynchronous execution + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #allResultsOfFastFail(CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SuppressWarnings("unchecked") + public static CompletableFuture> mSupplyFastFailAsync(Executor executor, Supplier... suppliers) { + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync((Supplier) suppliers[i], executor); + } + return allResultsOfFastFail(cfs); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the CompletableFuture's default asynchronous execution facility + * in the given time({@code timeout}), aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier) + */ + public static CompletableFuture> mSupplyMostSuccess( + long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, Supplier... suppliers) { + return mSupplyMostSuccess(AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, suppliers); + } + + /** + * Returns a new CompletableFuture with the most results in the same order of tasks + * running in the supplied Executor in the given time({@code timeout}), + * aka as many results as possible in the given time. + *

+ * If the given supplier is successful in time, its result is the completed value; + * Otherwise the given valueIfNotSuccess. + * + * @param suppliers functions returning the value to be used to complete the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see #mostResultsOfSuccess(Executor, long, TimeUnit, Object, CompletionStage[]) + * @see CompletableFuture#supplyAsync(Supplier, Executor) + */ + @SuppressWarnings("unchecked") + public static CompletableFuture> mSupplyMostSuccess( + Executor executor, long timeout, TimeUnit unit, + @Nullable T valueIfNotSuccess, Supplier... suppliers) { + CompletableFuture[] cfs = new CompletableFuture[suppliers.length]; + for (int i = 0; i < suppliers.length; i++) { + cfs[i] = CompletableFuture.supplyAsync((Supplier) suppliers[i], executor); + } + return mostResultsOfSuccess(executor, timeout, unit, valueIfNotSuccess, cfs); + } + //////////////////////////////////////////////////////////////////////////////// //# allOf*/mostResultsOfSuccess methods //////////////////////////////////////////////////////////////////////////////// @@ -836,6 +1027,120 @@ private static CompletableFuture mostTupleOfSuccess0( // - thenCombineFastFail*(BiFunction): (T1, T2) -> U //////////////////////////////////////////////////////////////////////////////// + public static CompletableFuture thenMRunAsync(CompletionStage cf, Runnable... actions) { + return thenMRunAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMRunAsync(CompletionStage cf, Executor executor, Runnable... actions) { + return toNonMinCf(cf).thenCompose(unused -> mRunAsync(executor, actions)); + } + + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Runnable... actions) { + return thenMRunFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMRunFastFailAsync(CompletionStage cf, Executor executor, Runnable... actions) { + return toNonMinCf(cf).thenCompose(unused -> mRunFastFailAsync(executor, actions)); + } + + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMAcceptAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + return toNonMinCf(cf).thenCompose(v -> { + Runnable[] rs = new Runnable[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + rs[idx] = () -> actions[idx].accept(v); + } + return mRunAsync(executor, rs); + }); + } + + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Consumer... actions) { + return thenMAcceptFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, actions); + } + + public static CompletableFuture thenMAcceptFastFailAsync( + CompletionStage cf, Executor executor, Consumer... actions) { + return toNonMinCf(cf).thenCompose(v -> { + Runnable[] rs = new Runnable[actions.length]; + for (int i = 0; i < actions.length; i++) { + final int idx = i; + rs[idx] = () -> actions[idx].accept(v); + } + return mRunFastFailAsync(executor, rs); + }); + } + + public CompletableFuture> thenMApplyAsync( + CompletionStage cf, Function... fns) { + return thenMApplyAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + public CompletableFuture> thenMApplyAsync( + CompletionStage cf, Executor executor, Function... fns) { + return toNonMinCf(cf).thenCompose(v -> { + @SuppressWarnings("unchecked") + Supplier[] suppliers = new Supplier[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + suppliers[idx] = () -> fns[idx].apply(v); + } + return mSupplyAsync(executor, suppliers); + }); + } + + public CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Function... fns) { + return thenMApplyFastFailAsync(cf, AsyncPoolHolder.ASYNC_POOL, fns); + } + + public CompletableFuture> thenMApplyFastFailAsync( + CompletionStage cf, Executor executor, Function... fns) { + return toNonMinCf(cf).thenCompose(v -> { + @SuppressWarnings("unchecked") + Supplier[] suppliers = new Supplier[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + suppliers[idx] = () -> fns[idx].apply(v); + } + return mSupplyFastFailAsync(executor, suppliers); + }); + } + + public CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return thenMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, valueIfNotSuccess, fns); + } + + public CompletableFuture> thenMApplyMostSuccessAsync( + CompletionStage cf, Executor executor, long timeout, TimeUnit unit, + @Nullable U valueIfNotSuccess, Function... fns) { + return toNonMinCf(cf).thenCompose(v -> { + @SuppressWarnings("unchecked") + Supplier[] suppliers = new Supplier[fns.length]; + for (int i = 0; i < fns.length; i++) { + final int idx = i; + suppliers[idx] = () -> fns[idx].apply(v); + } + return mSupplyMostSuccess(executor, timeout, unit, valueIfNotSuccess, suppliers); + }); + } + + //////////////////////////////////////////////////////////////////////////////// + //# `then both(binary input)` methods with fast-fail support: + // + // - runAfterBothFastFail*(Runnable): Void, Void -> Void + // - thenAcceptBothFastFail*(BiConsumer): (T1, T2) -> Void + // - thenCombineFastFail*(BiFunction): (T1, T2) -> U + //////////////////////////////////////////////////////////////////////////////// + /** * Returns a new CompletableFuture that, when two given stages both complete normally, executes the given action. * if any of the given stage complete exceptionally, then the returned CompletableFuture also does so