diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 0450fcc7..5da48060 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -16,7 +16,6 @@ import java.util.function.*; import static io.foldright.cffu.Delayer.atCfDelayerThread; -import static io.foldright.cffu.ExceptionReporter.reportUncaughtException; import static io.foldright.cffu.InternalCommonUtils.*; import static io.foldright.cffu.LLCF.*; import static java.util.Objects.requireNonNull; @@ -3061,15 +3060,10 @@ C completeOnTimeout(C cfThis, @Nullable T value, long timeout, TimeUnit unit) { private static > C hopExecutorIfAtCfDelayerThread(C cf, Executor executor) { CompletableFuture ret = newIncompleteFuture(cf); - cf.whenComplete((v, ex) -> { - try { - if (!atCfDelayerThread()) completeCf(ret, v, ex); - else screenExecutor(executor).execute(() -> completeCf(ret, v, ex)); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("handle of executor hop", e); - } - }); + peek0(cf, (v, ex) -> { + if (!atCfDelayerThread()) completeCf(ret, v, ex); + else screenExecutor(executor).execute(() -> completeCf(ret, v, ex)); + }, "handle of executor hop"); return (C) ret; } @@ -3252,15 +3246,7 @@ C peek(C cfThis, BiConsumer action) { requireNonNull(cfThis, "cfThis is null"); requireNonNull(action, "action is null"); - cfThis.whenComplete((v, ex) -> { - try { - action.accept(v, ex); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("the action of peek", e); - } - }); - return cfThis; + return peek0(cfThis, action, "the action of peek"); } /** @@ -3320,15 +3306,7 @@ C peekAsync(C cfThis, BiConsumer action, Executor requireNonNull(action, "action is null"); requireNonNull(executor, "executor is null"); - cfThis.whenCompleteAsync((v, ex) -> { - try { - action.accept(v, ex); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("the action of peekAsync", e); - } - }, executor); - return cfThis; + return peekAsync0(cfThis, action, "the action of peekAsync", executor); } // endregion diff --git a/cffu-core/src/main/java/io/foldright/cffu/LLCF.java b/cffu-core/src/main/java/io/foldright/cffu/LLCF.java index 9cbcac27..e8f7d9d8 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/LLCF.java +++ b/cffu-core/src/main/java/io/foldright/cffu/LLCF.java @@ -1,13 +1,17 @@ package io.foldright.cffu; +import org.jetbrains.annotations.Contract; + import java.util.ArrayList; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; import java.util.function.Supplier; +import static io.foldright.cffu.ExceptionReporter.reportUncaughtException; import static io.foldright.cffu.InternalCommonUtils.mapArray; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -123,6 +127,46 @@ static CompletableFuture toNonMinCfCopy0(CompletionStage s) return isMinStageCf(f) ? f.toCompletableFuture() : IS_JAVA9_PLUS ? f.copy() : f.thenApply(x -> x); } + /** + * Peeks the result by executing the given action when the given stage completes, returns the given stage. + * The uncaught exceptions thrown by the action are reported. + * + * @see CompletableFutureUtils#peek(CompletionStage, BiConsumer) + */ + @Contract("_, _, _ -> param1") + static > + C peek0(C cfThis, BiConsumer action, String where) { + cfThis.whenComplete((v, ex) -> { + try { + action.accept(v, ex); + } catch (Throwable e) { + if (ex != null) e.addSuppressed(ex); + reportUncaughtException(where, e); + } + }); + return cfThis; + } + + /** + * Peeks the result by executing the given action using the supplied executor when the given stage completes, + * returns the given stage. The uncaught exceptions thrown by the action are reported. + * + * @see CompletableFutureUtils#peekAsync(CompletionStage, BiConsumer, Executor) + */ + @Contract("_, _, _, _ -> param1") + static > + C peekAsync0(C cfThis, BiConsumer action, String where, Executor executor) { + cfThis.whenCompleteAsync((v, ex) -> { + try { + action.accept(v, ex); + } catch (Throwable e) { + if (ex != null) e.addSuppressed(ex); + reportUncaughtException(where, e); + } + }, executor); + return cfThis; + } + public static boolean isMinStageCf(CompletableFuture cf) { return cf.getClass().equals(MIN_STAGE_CLASS); }