From fd60918c81ff78436604cc24354c9d859ffcafdb Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sat, 25 May 2024 19:58:50 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20complement=20the=20`null=20check`?= =?UTF-8?q?=20logic=20=F0=9F=91=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtils.java | 103 ++++++++++++------ 1 file changed, 68 insertions(+), 35 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index f9ef98f9..1f4fa7c7 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -63,9 +63,11 @@ public final class CompletableFutureUtils { * @see CompletableFuture#allOf(CompletableFuture[]) */ public static CompletableFuture allOf(CompletionStage... cfs) { + requireNonNull(cfs, "cfs is null"); + if (cfs.length == 0) return completedFuture(null); // Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input // in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException) - if (cfs.length == 1) return toNonMinCfCopy(cfs[0]).thenApply(unused -> null); + if (cfs.length == 1) return toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null")).thenApply(unused -> null); return CompletableFuture.allOf(f_toCfArray(cfs)); } @@ -91,7 +93,9 @@ public static CompletableFuture> allResultsOf(CompletionStage[] resultSetterCfs = createResultSetterCfs(cfs, result); @@ -177,7 +181,9 @@ public static CompletableFuture> allResultsOfFastFail(CompletionStag requireCfsAndEleNonNull(cfs); final int size = cfs.length; if (size == 0) return completedFuture(arrayList()); - if (size == 1) return csToListCf(cfs[0]); + // Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input + // in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException) + if (size == 1) return toNonMinCfCopy(cfs[0]).thenApply(CompletableFutureUtils::arrayList); final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size]; // NOTE: fill ONE MORE element of failedOrBeIncomplete LATER @@ -229,15 +235,16 @@ public static CompletableFuture> mostResultsOfSuccess( public static CompletableFuture> mostResultsOfSuccess( Executor executorWhenTimeout, long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, CompletionStage... cfs) { + requireNonNull(executorWhenTimeout, "executorWhenTimeout is null"); requireNonNull(unit, "unit is null"); - requireCfsAndEleNonNull(cfs); + requireNonNull(cfs, "cfs is null"); if (cfs.length == 0) return completedFuture(arrayList()); if (cfs.length == 1) { // Defensive copy input cf to non-minimal-stage instance in order to // 1. avoid writing it by `completeOnTimeout` and is able to read its result(`getSuccessNow`) // 2. ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException) - final CompletableFuture f = toNonMinCfCopy(cfs[0]); + final CompletableFuture f = toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null")); return cffuOrTimeout(f, executorWhenTimeout, timeout, unit) .handle((unused, ex) -> arrayList(getSuccessNow(f, valueIfNotSuccess))); } @@ -286,12 +293,6 @@ private static List arrayList(T... elements) { return ret; } - private static CompletableFuture> csToListCf(CompletionStage s) { - // Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input - // in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException) - return toNonMinCfCopy(s).thenApply(CompletableFutureUtils::arrayList); - } - /** * Returns a cf array whose elements do the result collection. */ @@ -420,10 +421,12 @@ static boolean isMinStageCf(CompletionStage s) { @Contract(pure = true) @SafeVarargs public static CompletableFuture anyOf(CompletionStage... cfs) { + requireNonNull(cfs, "cfs is null"); + if (cfs.length == 0) return new CompletableFuture<>(); // Defensive copy input cf to non-minimal-stage instance for SINGLE input in order to ensure that // 1. avoid writing the input cf unexpectedly it by caller code // 2. the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException) - if (cfs.length == 1) return toNonMinCfCopy(cfs[0]); + if (cfs.length == 1) return toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null")); CompletableFuture ret = CompletableFuture.anyOf(f_toCfArray(cfs)); return f_cast(ret); } @@ -665,7 +668,7 @@ public static CompletableFuture> mostTupleOfSuccess( public static CompletableFuture> mostTupleOfSuccess( Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage cf1, CompletionStage cf2) { - return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2)); + return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2); } /** @@ -704,7 +707,7 @@ public static CompletableFuture> mostTupleOfSucc public static CompletableFuture> mostTupleOfSuccess( Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage cf1, CompletionStage cf2, CompletionStage cf3) { - return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3)); + return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3); } /** @@ -745,7 +748,7 @@ public static CompletableFuture> mostTup Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage cf1, CompletionStage cf2, CompletionStage cf3, CompletionStage cf4) { - return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3, cf4)); + return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3, cf4); } /** @@ -786,11 +789,12 @@ public static CompletableFuture> Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage cf1, CompletionStage cf2, CompletionStage cf3, CompletionStage cf4, CompletionStage cf5) { - return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3, cf4, cf5)); + return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3, cf4, cf5); } private static CompletableFuture mostTupleOfSuccess0( - Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage[] css) { + Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage... css) { + requireNonNull(executorWhenTimeout, "executorWhenTimeout is null"); requireNonNull(unit, "unit is null"); // MUST be *Non-Minimal* CF instances in order to read results(`getSuccessNow`), // otherwise UnsupportedOperationException @@ -1263,6 +1267,9 @@ public static CompletableFuture applyToEitherSuccessAsync( */ public static > C peek(C cf, BiConsumer action) { + requireNonNull(cf, "cf is null"); + requireNonNull(action, "action is null"); + cf.whenComplete(action); return cf; } @@ -1287,8 +1294,7 @@ C peek(C cf, BiConsumer action) { */ public static > C peekAsync(C cf, BiConsumer action) { - cf.whenCompleteAsync(action); - return cf; + return peekAsync(cf, action, AsyncPoolHolder.ASYNC_POOL); } /** @@ -1310,6 +1316,10 @@ C peekAsync(C cf, BiConsumer action) { */ public static > C peekAsync(C cf, BiConsumer action, Executor executor) { + requireNonNull(cf, "cf is null"); + requireNonNull(action, "action is null"); + requireNonNull(executor, "executor is null"); + cf.whenCompleteAsync(action, executor); return cf; } @@ -1330,10 +1340,10 @@ C peekAsync(C cf, BiConsumer action, Executor exec */ @Contract(pure = true) public static CompletableFuture failedFuture(Throwable ex) { + requireNonNull(ex, "ex is null"); if (IS_JAVA9_PLUS) { return CompletableFuture.failedFuture(ex); } - requireNonNull(ex, "ex is null"); final CompletableFuture cf = new CompletableFuture<>(); cf.completeExceptionally(ex); return cf; @@ -1374,9 +1384,7 @@ public static CompletionStage failedStage(Throwable ex) { if (IS_JAVA9_PLUS) { return CompletableFuture.failedStage(ex); } - CompletableFuture cf = new CompletableFuture<>(); - cf.completeExceptionally(ex); - return cf; + return failedFuture(ex); } //# Delay Execution @@ -1405,11 +1413,11 @@ public static Executor delayedExecutor(long delay, TimeUnit unit) { */ @Contract(pure = true) public static Executor delayedExecutor(long delay, TimeUnit unit, Executor executor) { + requireNonNull(unit, "unit is null"); + requireNonNull(executor, "executor is null"); if (IS_JAVA9_PLUS) { return CompletableFuture.delayedExecutor(delay, unit, executor); } - requireNonNull(unit, "unit is null"); - requireNonNull(executor, "executor is null"); return new DelayedExecutor(delay, unit, executor); } @@ -1447,11 +1455,12 @@ C exceptionallyAsync(C cf, Function fn) { @SuppressWarnings("unchecked") public static > C exceptionallyAsync(C cf, Function fn, Executor executor) { + requireNonNull(cf, "cf is null"); + requireNonNull(fn, "fn is null"); + requireNonNull(executor, "executor is null"); if (IS_JAVA12_PLUS) { return (C) cf.exceptionallyAsync(fn, executor); } - requireNonNull(fn, "fn is null"); - requireNonNull(executor, "executor is null"); // below code is copied from CompletionStage#exceptionallyAsync return (C) cf.handle((r, ex) -> (ex == null) ? cf : cf.handleAsync((r1, ex1) -> fn.apply(ex1), executor) @@ -1487,6 +1496,10 @@ public static > C cffuOrTimeout( */ public static > C cffuOrTimeout( C cf, Executor executorWhenTimeout, long timeout, TimeUnit unit) { + requireNonNull(cf, "cf is null"); + requireNonNull(executorWhenTimeout, "executorWhenTimeout is null"); + requireNonNull(unit, "unit is null"); + final C f = orTimeout(cf, timeout, unit); return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); } @@ -1517,10 +1530,11 @@ public static > C cffuOrTimeout( * @see #cffuOrTimeout(CompletableFuture, long, TimeUnit) */ public static > C orTimeout(C cf, long timeout, TimeUnit unit) { + requireNonNull(cf, "cf is null"); + requireNonNull(unit, "unit is null"); if (IS_JAVA9_PLUS) { cf.orTimeout(timeout, unit); } else { - requireNonNull(unit, "unit is null"); // below code is copied from CompletableFuture#orTimeout with small adoption if (!cf.isDone()) { ScheduledFuture f = Delayer.delayToTimoutCf(cf, timeout, unit); @@ -1557,6 +1571,10 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { */ public static > C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, long timeout, TimeUnit unit) { + requireNonNull(cf, "cf is null"); + requireNonNull(executorWhenTimeout, "executorWhenTimeout is null"); + requireNonNull(unit, "unit is null"); + final C f = completeOnTimeout(cf, value, timeout, unit); return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout); } @@ -1588,10 +1606,11 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l */ public static > C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) { + requireNonNull(cf, "cf is null"); + requireNonNull(unit, "unit is null"); if (IS_JAVA9_PLUS) { cf.completeOnTimeout(value, timeout, unit); } else { - requireNonNull(unit, "unit is null"); // below code is copied from CompletableFuture#completeOnTimeout with small adoption if (!cf.isDone()) { ScheduledFuture f = Delayer.delayToCompleteCf(cf, value, timeout, unit); @@ -1623,10 +1642,11 @@ C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) { @SuppressWarnings({"unchecked", "rawtypes"}) public static > C exceptionallyCompose(C cf, Function> fn) { + requireNonNull(cf, "cf is null"); + requireNonNull(fn, "fn is null"); if (IS_JAVA12_PLUS) { return (C) cf.exceptionallyCompose((Function) fn); } - requireNonNull(fn, "fn is null"); // below code is copied from CompletionStage.exceptionallyCompose return (C) cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(x -> x); } @@ -1657,11 +1677,12 @@ C exceptionallyComposeAsync(C cf, Function> C exceptionallyComposeAsync(C cf, Function> fn, Executor executor) { + requireNonNull(cf, "cf is null"); + requireNonNull(fn, "fn is null"); + requireNonNull(executor, "executor is null"); if (IS_JAVA12_PLUS) { return (C) cf.exceptionallyComposeAsync((Function) fn, executor); } - requireNonNull(fn, "fn is null"); - requireNonNull(executor, "executor is null"); // below code is copied from CompletionStage.exceptionallyComposeAsync return (C) cf.handle((r, ex) -> (ex == null) ? cf : cf.handleAsync((r1, ex1) -> fn.apply(ex1), executor).thenCompose(x -> x) @@ -1703,6 +1724,9 @@ C exceptionallyComposeAsync(C cf, Function T join(CompletableFuture cf, long timeout, TimeUnit unit) { + requireNonNull(cf, "cf is null"); + requireNonNull(unit, "unit is null"); + if (cf.isDone()) return cf.join(); // defensive copy input cf to avoid writing it by `orTimeout` return orTimeout(copy(cf), timeout, unit).join(); @@ -1720,6 +1744,7 @@ public static T join(CompletableFuture cf, long timeout, TimeUnit unit) { @Contract(pure = true) @Nullable public static T getSuccessNow(CompletableFuture cf, @Nullable T valueIfNotSuccess) { + requireNonNull(cf, "cf is null"); return cf.isDone() && !cf.isCompletedExceptionally() ? cf.join() : valueIfNotSuccess; } @@ -1739,6 +1764,7 @@ public static T getSuccessNow(CompletableFuture cf, @Nullable T @Contract(pure = true) @Nullable public static T resultNow(Future cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA19_PLUS) { return cf.resultNow(); } @@ -1786,6 +1812,7 @@ public static T resultNow(Future cf) { */ @Contract(pure = true) public static Throwable exceptionNow(Future cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA19_PLUS) { return cf.exceptionNow(); } @@ -1824,6 +1851,7 @@ public static Throwable exceptionNow(Future cf) { */ @Contract(pure = true) public static CffuState state(Future cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA19_PLUS) { return CffuState.toCffuState(cf.state()); } @@ -1884,12 +1912,12 @@ C completeAsync(C cf, Supplier supplier) { */ public static > C completeAsync(C cf, Supplier supplier, Executor executor) { + requireNonNull(cf, "cf is null"); + requireNonNull(supplier, "supplier is null"); + requireNonNull(executor, "executor is null"); if (IS_JAVA9_PLUS) { cf.completeAsync(supplier, executor); } else { - requireNonNull(cf, "cf is null"); - requireNonNull(supplier, "supplier is null"); - requireNonNull(executor, "executor is null"); // below code is copied from CompletableFuture#completeAsync with small adoption executor.execute(new CfCompleterBySupplier<>(cf, supplier)); } @@ -1923,6 +1951,7 @@ C completeExceptionallyAsync(C cf, Supplier supplier, Execu requireNonNull(cf, "cf is null"); requireNonNull(supplier, "supplier is null"); requireNonNull(executor, "executor is null"); + executor.execute(new CfExCompleterBySupplier(cf, supplier)); return cf; } @@ -1943,6 +1972,7 @@ C completeExceptionallyAsync(C cf, Supplier supplier, Execu */ @Contract(pure = true) public static CompletionStage minimalCompletionStage(CompletableFuture cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA9_PLUS) { return cf.minimalCompletionStage(); } @@ -1960,6 +1990,7 @@ public static CompletionStage minimalCompletionStage(CompletableFuture */ @Contract(pure = true) public static CompletableFuture copy(CompletableFuture cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA9_PLUS) { return cf.copy(); } @@ -1974,6 +2005,7 @@ public static CompletableFuture copy(CompletableFuture cf) { */ @Contract(pure = true) public static CompletableFuture newIncompleteFuture(CompletableFuture cf) { + requireNonNull(cf, "cf is null"); if (IS_JAVA9_PLUS) { return cf.newIncompleteFuture(); } @@ -2030,6 +2062,7 @@ public static CompletableFuture[] toCompletableFutureArray(CompletionStag */ @Contract(pure = true) public static CompletableFuture[] completableFutureListToArray(List> cfList) { + requireNonNull(cfList, "cfList is null"); @SuppressWarnings("unchecked") final CompletableFuture[] a = new CompletableFuture[cfList.size()]; return cfList.toArray(a);