From 908a0feb994f75ffe3fcbdf67f10d169f667802c Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Tue, 30 Apr 2024 00:56:13 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20improve=20generic=20type=20declarat?= =?UTF-8?q?ion=20=F0=9F=A7=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cffu/CompletableFutureUtils.java | 141 ++++++++++-------- .../foldright/cffu/DelayExecutionHelpers.java | 16 +- .../cffu/CompletableFutureUtilsTest.java | 30 +++- 3 files changed, 112 insertions(+), 75 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index fc43455f..70ae0f37 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -55,9 +55,8 @@ public final class CompletableFutureUtils { * @return a new CompletableFuture that is completed when all the given stages complete * @throws NullPointerException if the array or any of its elements are {@code null} */ - @SuppressWarnings({"unchecked", "rawtypes"}) public static CompletableFuture allOf(CompletionStage... cfs) { - return CompletableFuture.allOf(toCompletableFutureArray((CompletionStage[]) cfs)); + return CompletableFuture.allOf(fToCfArray(cfs)); } /** @@ -77,7 +76,6 @@ public static CompletableFuture allOf(CompletionStage... cfs) { */ @Contract(pure = true) @SafeVarargs - @SuppressWarnings({"unchecked"}) public static CompletableFuture> allResultsOf(CompletionStage... cfs) { requireCfsAndEleNonNull(cfs); final int size = cfs.length; @@ -87,7 +85,7 @@ public static CompletableFuture> allResultsOf(CompletionStage[] resultSetterCfs = createResultSetterCfs(cfs, result); - return CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> (List) arrayList(result)); + return fCast(CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> arrayList(result))); } /** @@ -107,23 +105,22 @@ public static CompletableFuture> allResultsOf(CompletionStage allOfFastFail(CompletionStage... cfs) { requireCfsAndEleNonNull(cfs); final int size = cfs.length; if (size == 0) return CompletableFuture.completedFuture(null); if (size == 1) return cfs[0].toCompletableFuture().thenApply(v -> null); - final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size]; + final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size]; // NOTE: fill ONE MORE element of failedOrBeIncomplete LATER - final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1]; + final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1]; fill(cfs, successOrBeIncomplete, failedOrBeIncomplete); // NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE: // a cf that is successful when all given cfs success, otherwise be incomplete failedOrBeIncomplete[size] = CompletableFuture.allOf(successOrBeIncomplete); - return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete); + return fCast(CompletableFuture.anyOf(failedOrBeIncomplete)); } /** @@ -146,27 +143,26 @@ public static CompletableFuture allOfFastFail(CompletionStage... cfs) { */ @Contract(pure = true) @SafeVarargs - @SuppressWarnings({"unchecked", "rawtypes"}) public static CompletableFuture> allResultsOfFastFail(CompletionStage... cfs) { requireCfsAndEleNonNull(cfs); final int size = cfs.length; if (size == 0) return CompletableFuture.completedFuture(arrayList()); if (size == 1) return csToListCf(cfs[0]); - final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size]; + final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size]; // NOTE: fill ONE MORE element of failedOrBeIncomplete LATER - final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1]; + final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1]; fill(cfs, successOrBeIncomplete, failedOrBeIncomplete); // NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE: // a cf that is successful when all given cfs success, otherwise be incomplete failedOrBeIncomplete[size] = allResultsOf(successOrBeIncomplete); - return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete); + return fCast(CompletableFuture.anyOf(failedOrBeIncomplete)); } @SafeVarargs - private static CompletionStage[] requireCfsAndEleNonNull(CompletionStage... css) { + private static > S[] requireCfsAndEleNonNull(S... css) { requireNonNull(css, "cfs is null"); for (int i = 0; i < css.length; i++) { requireNonNull(css[i], "cf" + (i + 1) + " is null"); @@ -202,23 +198,38 @@ private static CompletableFuture[] createResultSetterCfs(CompletionSta return resultSetterCfs; } - @SuppressWarnings({"unchecked", "rawtypes"}) - private static void fill(CompletionStage[] cfs, - CompletableFuture[] successOrBeIncomplete, - CompletableFuture[] failedOrBeIncomplete) { - final CompletableFuture incomplete = new CompletableFuture(); + private static void fill(CompletionStage[] cfs, + CompletableFuture[] successOrBeIncomplete, + CompletableFuture[] failedOrBeIncomplete) { + final CompletableFuture incomplete = new CompletableFuture<>(); for (int i = 0; i < cfs.length; i++) { - final CompletionStage cf = cfs[i]; + final CompletionStage cf = cfs[i]; successOrBeIncomplete[i] = cf.toCompletableFuture() - .handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(identity()); + .handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(x -> x); failedOrBeIncomplete[i] = cf.toCompletableFuture() - .handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(identity()); + .handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(x -> x); } } + /** + * force cast the value type of CompletableFuture. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private static CompletableFuture fCast(CompletableFuture f) { + return (CompletableFuture) f; + } + + /** + * force converts input {@link CompletionStage} array element to {@link CompletableFuture}. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private static CompletableFuture[] fToCfArray(CompletionStage[] cfs) { + return toCompletableFutureArray((CompletionStage[]) cfs); + } + //////////////////////////////////////////////////////////////////////////////// //# anyOf* methods //////////////////////////////////////////////////////////////////////////////// @@ -243,9 +254,8 @@ private static void fill(CompletionStage[] cfs, */ @Contract(pure = true) @SafeVarargs - @SuppressWarnings({"unchecked", "rawtypes"}) public static CompletableFuture anyOf(CompletionStage... cfs) { - return (CompletableFuture) CompletableFuture.anyOf(toCompletableFutureArray((CompletionStage[]) cfs)); + return fCast(CompletableFuture.anyOf(fToCfArray(cfs))); } /** @@ -267,23 +277,22 @@ public static CompletableFuture anyOf(CompletionStage... cfs */ @Contract(pure = true) @SafeVarargs - @SuppressWarnings({"unchecked", "rawtypes"}) public static CompletableFuture anyOfSuccess(CompletionStage... cfs) { requireCfsAndEleNonNull(cfs); final int size = cfs.length; if (size == 0) return failedFuture(new NoCfsProvidedException()); - if (size == 1) return (CompletableFuture) copy(cfs[0].toCompletableFuture()); + if (size == 1) return fCast(copy(cfs[0].toCompletableFuture())); // NOTE: fill ONE MORE element of successOrBeIncompleteCfs LATER - final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size + 1]; - final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size]; + final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size + 1]; + final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size]; fill(cfs, successOrBeIncomplete, failedOrBeIncomplete); // NOTE: fill the ONE MORE element of successOrBeIncompleteCfs HERE // a cf that is failed when all given cfs fail, otherwise be incomplete successOrBeIncomplete[size] = CompletableFuture.allOf(failedOrBeIncomplete); - return (CompletableFuture) CompletableFuture.anyOf(successOrBeIncomplete); + return fCast(CompletableFuture.anyOf(successOrBeIncomplete)); } //////////////////////////////////////////////////////////////////////////////// @@ -886,7 +895,7 @@ public static CompletableFuture acceptEitherSuccessAsync( * @see CompletionStage#applyToEither(CompletionStage, Function) */ public static CompletableFuture applyToEitherSuccess( - CompletionStage cf1, CompletionStage cf2, Function fn) { + CompletionStage cf1, CompletionStage cf2, Function fn) { final CompletionStage[] css = requireCfsAndEleNonNull(cf1, cf2); requireNonNull(fn, "fn is null"); @@ -907,7 +916,7 @@ public static CompletableFuture applyToEitherSuccess( * @see CompletionStage#applyToEitherAsync(CompletionStage, Function) */ public static CompletableFuture applyToEitherSuccessAsync( - CompletionStage cf1, CompletionStage cf2, Function fn) { + CompletionStage cf1, CompletionStage cf2, Function fn) { final CompletionStage[] css = requireCfsAndEleNonNull(cf1, cf2); requireNonNull(fn, "fn is null"); @@ -929,7 +938,7 @@ public static CompletableFuture applyToEitherSuccessAsync( */ public static CompletableFuture applyToEitherSuccessAsync( CompletionStage cf1, CompletionStage cf2, - Function fn, Executor executor) { + Function fn, Executor executor) { final CompletionStage[] css = requireCfsAndEleNonNull(cf1, cf2); requireNonNull(fn, "fn is null"); requireNonNull(executor, "executor is null"); @@ -1029,6 +1038,7 @@ public static CompletableFuture failedFuture(Throwable ex) { if (IS_JAVA9_PLUS) { return CompletableFuture.failedFuture(ex); } + requireNonNull(ex, "ex is null"); final CompletableFuture cf = new CompletableFuture<>(); cf.completeExceptionally(ex); return cf; @@ -1163,16 +1173,16 @@ public static CompletableFuture exceptionallyAsync( * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter * @return given CompletableFuture */ - public static CompletableFuture orTimeout(CompletableFuture cf, long timeout, TimeUnit unit) { + public static > C orTimeout(C cf, long timeout, TimeUnit unit) { if (IS_JAVA9_PLUS) { - return cf.orTimeout(timeout, unit); - } - - requireNonNull(unit, "unit is null"); - // below code is copied from CompletableFuture#orTimeout with small adoption - if (!cf.isDone()) { - ScheduledFuture f = Delayer.delayToTimoutCf(cf, timeout, unit); - cf.whenComplete(new FutureCanceller(f)); + cf.orTimeout(timeout, unit); + } else { + requireNonNull(unit, "unit is null"); + // below code is copied from CompletableFuture#orTimeout with small adoption + if (!cf.isDone()) { + ScheduledFuture f = Delayer.delayToTimoutCf(cf, timeout, unit); + cf.whenComplete(new FutureCanceller(f)); + } } return cf; } @@ -1185,17 +1195,17 @@ public static CompletableFuture orTimeout(CompletableFuture cf, long t * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter * @return given CompletableFuture */ - public static CompletableFuture completeOnTimeout( - CompletableFuture cf, @Nullable T value, long timeout, TimeUnit unit) { + public static > C completeOnTimeout( + C cf, @Nullable T value, long timeout, TimeUnit unit) { if (IS_JAVA9_PLUS) { - return cf.completeOnTimeout(value, timeout, unit); - } - - requireNonNull(unit, "unit is null"); - // below code is copied from CompletableFuture#completeOnTimeout with small adoption - if (!cf.isDone()) { - ScheduledFuture f = Delayer.delayToCompleteCf(cf, value, timeout, unit); - cf.whenComplete(new FutureCanceller(f)); + cf.completeOnTimeout(value, timeout, unit); + } else { + requireNonNull(unit, "unit is null"); + // below code is copied from CompletableFuture#completeOnTimeout with small adoption + if (!cf.isDone()) { + ScheduledFuture f = Delayer.delayToCompleteCf(cf, value, timeout, unit); + cf.whenComplete(new FutureCanceller(f)); + } } return cf; } @@ -1215,7 +1225,6 @@ public static CompletableFuture exceptionallyCompose( if (IS_JAVA12_PLUS) { return cf.exceptionallyCompose(fn); } - requireNonNull(fn, "fn is null"); // below code is copied from CompletionStage.exceptionallyCompose return cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(identity()); @@ -1249,7 +1258,6 @@ public static CompletableFuture exceptionallyComposeAsync( if (IS_JAVA12_PLUS) { return cf.exceptionallyComposeAsync(fn, executor); } - requireNonNull(fn, "fn is null"); requireNonNull(executor, "executor is null"); // below code is copied from CompletionStage.exceptionallyComposeAsync @@ -1294,7 +1302,6 @@ public static CompletableFuture exceptionallyComposeAsync( @Nullable public static T join(CompletableFuture cf, long timeout, TimeUnit unit) { if (cf.isDone()) return cf.join(); - return orTimeout(copy(cf), timeout, unit).join(); } @@ -1313,7 +1320,7 @@ public static T join(CompletableFuture cf, long timeout, TimeUnit unit) { */ @Contract(pure = true) @Nullable - public static T resultNow(CompletableFuture cf) { + public static T resultNow(CompletableFuture cf) { if (IS_JAVA19_PLUS) { return cf.resultNow(); } @@ -1350,7 +1357,7 @@ public static T resultNow(CompletableFuture cf) { * @see CompletableFuture#resultNow() */ @Contract(pure = true) - public static Throwable exceptionNow(CompletableFuture cf) { + public static Throwable exceptionNow(CompletableFuture cf) { if (IS_JAVA19_PLUS) { return cf.exceptionNow(); } @@ -1388,7 +1395,7 @@ public static Throwable exceptionNow(CompletableFuture cf) { * @see Future.State */ @Contract(pure = true) - public static CffuState state(CompletableFuture cf) { + public static CffuState state(CompletableFuture cf) { if (IS_JAVA19_PLUS) { return CffuState.toCffuState(cf.state()); } @@ -1424,7 +1431,7 @@ public static CffuState state(CompletableFuture cf) { * @param supplier a function returning the value to be used to complete given CompletableFuture * @return given CompletableFuture */ - public static CompletableFuture completeAsync(CompletableFuture cf, Supplier supplier) { + public static > C completeAsync(C cf, Supplier supplier) { return completeAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL); } @@ -1436,15 +1443,16 @@ public static CompletableFuture completeAsync(CompletableFuture cf, Su * @param executor the executor to use for asynchronous execution * @return given CompletableFuture */ - public static CompletableFuture completeAsync( - CompletableFuture cf, Supplier supplier, Executor executor) { + public static > C completeAsync( + C cf, Supplier supplier, Executor executor) { if (IS_JAVA9_PLUS) { - return cf.completeAsync(supplier, executor); + cf.completeAsync(supplier, executor); + } else { + requireNonNull(supplier, "supplier is null"); + requireNonNull(executor, "executor is null"); + // below code is copied from CompletableFuture#completeAsync with small adoption + executor.execute(new CfCompleterBySupplier<>(cf, supplier)); } - requireNonNull(supplier, "supplier is null"); - requireNonNull(executor, "executor is null"); - // below code is copied from CompletableFuture#completeAsync with small adoption - executor.execute(new CfCompleterBySupplier<>(cf, supplier)); return cf; } @@ -1490,11 +1498,11 @@ public static CompletableFuture copy(CompletableFuture cf) { /** * Returns a new incomplete CompletableFuture of the type to be returned by a CompletionStage method. * - * @param the type of the value + * @param the type of the value * @return a new CompletableFuture */ @Contract(pure = true) - public static CompletableFuture newIncompleteFuture(CompletableFuture cf) { + public static CompletableFuture newIncompleteFuture(CompletableFuture cf) { if (IS_JAVA9_PLUS) { return cf.newIncompleteFuture(); } @@ -1536,6 +1544,7 @@ public static Executor defaultExecutor() { @Contract(pure = true) @SafeVarargs public static CompletableFuture[] toCompletableFutureArray(CompletionStage... stages) { + requireNonNull(stages, "stages is null"); @SuppressWarnings("unchecked") CompletableFuture[] ret = new CompletableFuture[stages.length]; for (int i = 0; i < stages.length; i++) { diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index 65de22da..d5116ea9 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -46,7 +46,8 @@ public static ScheduledFuture delayToTimoutCf(CompletableFuture cf, long d * @return a Future can be used to cancel the delayed task(complete CF) * @see FutureCanceller */ - public static ScheduledFuture delayToCompleteCf(CompletableFuture cf, T value, long delay, TimeUnit unit) { + public static ScheduledFuture delayToCompleteCf( + CompletableFuture cf, T value, long delay, TimeUnit unit) { return delay(new CfCompleter<>(cf, value), delay, unit); } @@ -128,11 +129,10 @@ public void run() { * Action to complete cf */ final class CfCompleter implements Runnable { - private final CompletableFuture cf; + private final CompletableFuture cf; private final T value; - @SuppressWarnings("BoundedWildcard") - CfCompleter(CompletableFuture cf, T value) { + CfCompleter(CompletableFuture cf, T value) { this.cf = cf; this.value = value; } @@ -171,10 +171,10 @@ public void accept(Object ignore, Throwable ex) { @SuppressFBWarnings("SE_BAD_FIELD") final class CfCompleterBySupplier extends ForkJoinTask implements Runnable, CompletableFuture.AsynchronousCompletionTask { - CompletableFuture dep; - Supplier fn; + private CompletableFuture dep; + private Supplier fn; - CfCompleterBySupplier(CompletableFuture dep, Supplier fn) { + CfCompleterBySupplier(CompletableFuture dep, Supplier fn) { this.dep = dep; this.fn = fn; } @@ -196,7 +196,7 @@ public boolean exec() { @Override public void run() { - CompletableFuture d; + CompletableFuture d; Supplier f; if ((d = dep) != null && (f = fn) != null) { dep = null; diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 3a1b5e90..6c31b022 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Function; +import java.util.function.Supplier; import static io.foldright.cffu.CompletableFutureUtils.*; import static io.foldright.test_utils.TestUtils.*; @@ -1005,6 +1005,34 @@ void test_executor() { assertSame(e, screenExecutor(e)); } + //////////////////////////////////////////////////////////////////////////////// + //# check type parameter declaration, Variance(covariance/contravariance) + //////////////////////////////////////////////////////////////////////////////// + + @Test + @SuppressWarnings("UnnecessaryLocalVariable") + void checkTypeParameterDeclaration() throws Exception { + final CompletableFuture f = completedFuture(42); + + final CompletableFuture fe = f; + final BiConsumer c = (v, ex) -> { + }; + CompletableFutureUtils.peek(fe, c).get(); + CompletableFutureUtils.peekAsync(fe, c).get(); + CompletableFutureUtils.peekAsync(fe, c, executorService).get(); + + final CompletableFuture fs = f; + final Supplier s = () -> 0; + fs.complete(0); + CompletableFutureUtils.completeAsync(fs, s).complete(1); + CompletableFutureUtils.completeAsync(fs, s, executorService).complete(1); + + CompletableFuture fq = f; + orTimeout(fq, 1, TimeUnit.MILLISECONDS); + orTimeout(fs, 1, TimeUnit.MILLISECONDS); + orTimeout(fe, 1, TimeUnit.MILLISECONDS); + } + //////////////////////////////////////////////////////////////////////////////// //# test helper fields ////////////////////////////////////////////////////////////////////////////////