diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 0450fcc7..d41a6799 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -16,7 +16,6 @@ import java.util.function.*; import static io.foldright.cffu.Delayer.atCfDelayerThread; -import static io.foldright.cffu.ExceptionReporter.reportUncaughtException; import static io.foldright.cffu.InternalCommonUtils.*; import static io.foldright.cffu.LLCF.*; import static java.util.Objects.requireNonNull; @@ -3061,15 +3060,10 @@ C completeOnTimeout(C cfThis, @Nullable T value, long timeout, TimeUnit unit) { private static > C hopExecutorIfAtCfDelayerThread(C cf, Executor executor) { CompletableFuture ret = newIncompleteFuture(cf); - cf.whenComplete((v, ex) -> { - try { - if (!atCfDelayerThread()) completeCf(ret, v, ex); - else screenExecutor(executor).execute(() -> completeCf(ret, v, ex)); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("handle of executor hop", e); - } - }); + safeHandle0(cf, (v, ex) -> { + if (!atCfDelayerThread()) completeCf(ret, v, ex); + else screenExecutor(executor).execute(() -> completeCf(ret, v, ex)); + }, "handle of executor hop"); return (C) ret; } @@ -3252,15 +3246,7 @@ C peek(C cfThis, BiConsumer action) { requireNonNull(cfThis, "cfThis is null"); requireNonNull(action, "action is null"); - cfThis.whenComplete((v, ex) -> { - try { - action.accept(v, ex); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("the action of peek", e); - } - }); - return cfThis; + return safeHandle0(cfThis, action, "the action of peek"); } /** @@ -3320,15 +3306,7 @@ C peekAsync(C cfThis, BiConsumer action, Executor requireNonNull(action, "action is null"); requireNonNull(executor, "executor is null"); - cfThis.whenCompleteAsync((v, ex) -> { - try { - action.accept(v, ex); - } catch (Throwable e) { - if (ex != null) e.addSuppressed(ex); - reportUncaughtException("the action of peekAsync", e); - } - }, executor); - return cfThis; + return safeHandleAsync0(cfThis, action, "the action of peekAsync", executor); } // endregion diff --git a/cffu-core/src/main/java/io/foldright/cffu/LLCF.java b/cffu-core/src/main/java/io/foldright/cffu/LLCF.java index 9cbcac27..ae648f28 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/LLCF.java +++ b/cffu-core/src/main/java/io/foldright/cffu/LLCF.java @@ -1,13 +1,17 @@ package io.foldright.cffu; +import org.jetbrains.annotations.Contract; + import java.util.ArrayList; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; import java.util.function.Supplier; +import static io.foldright.cffu.ExceptionReporter.reportUncaughtException; import static io.foldright.cffu.InternalCommonUtils.mapArray; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -111,6 +115,33 @@ static CompletableFuture toNonMinCf0(CompletionStage s) { return isMinStageCf(f) ? f.toCompletableFuture() : f; } + @Contract("_, _, _ -> param1") + static > + C safeHandle0(C cfThis, BiConsumer action, String where) { + cfThis.whenComplete((v, ex) -> { + try { + action.accept(v, ex); + } catch (Throwable e) { + if (ex != null) e.addSuppressed(ex); + reportUncaughtException(where, e); + } + }); + return cfThis; + } + + @Contract("_, _, _, _ -> param1") + static > + C safeHandleAsync0(C cfThis, BiConsumer action, String where, Executor executor) { + cfThis.whenCompleteAsync((v, ex) -> { + try { + action.accept(v, ex); + } catch (Throwable e) { + if (ex != null) e.addSuppressed(ex); + reportUncaughtException(where, e); + } + }, executor); + return cfThis; + } /** * Converts CompletionStage to a non-minimal-stage CompletableFuture copy. This method is type safe. *