Skip to content

Commit

Permalink
refactor: complement the null check logic 👻
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed May 30, 2024
1 parent e1a88fc commit ed36ec1
Showing 1 changed file with 68 additions and 35 deletions.
103 changes: 68 additions & 35 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ public final class CompletableFutureUtils {
* @see CompletableFuture#allOf(CompletableFuture[])
*/
public static CompletableFuture<Void> allOf(CompletionStage<?>... cfs) {
requireNonNull(cfs, "cfs is null");
if (cfs.length == 0) return completedFuture(null);
// Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input
// in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
if (cfs.length == 1) return toNonMinCfCopy(cfs[0]).thenApply(unused -> null);
if (cfs.length == 1) return toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null")).thenApply(unused -> null);
return CompletableFuture.allOf(f_toCfArray(cfs));
}

Expand All @@ -91,7 +93,9 @@ public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? exte
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return completedFuture(arrayList());
if (size == 1) return csToListCf(cfs[0]);
// Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input
// in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
if (size == 1) return toNonMinCfCopy(cfs[0]).thenApply(CompletableFutureUtils::arrayList);

final Object[] result = new Object[size];
final CompletableFuture<Void>[] resultSetterCfs = createResultSetterCfs(cfs, result);
Expand Down Expand Up @@ -177,7 +181,9 @@ public static <T> CompletableFuture<List<T>> allResultsOfFastFail(CompletionStag
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return completedFuture(arrayList());
if (size == 1) return csToListCf(cfs[0]);
// Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input
// in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
if (size == 1) return toNonMinCfCopy(cfs[0]).thenApply(CompletableFutureUtils::arrayList);

final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
Expand Down Expand Up @@ -229,15 +235,16 @@ public static <T> CompletableFuture<List<T>> mostResultsOfSuccess(
public static <T> CompletableFuture<List<T>> mostResultsOfSuccess(
Executor executorWhenTimeout, long timeout, TimeUnit unit,
@Nullable T valueIfNotSuccess, CompletionStage<? extends T>... cfs) {
requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");
requireNonNull(unit, "unit is null");
requireCfsAndEleNonNull(cfs);
requireNonNull(cfs, "cfs is null");

if (cfs.length == 0) return completedFuture(arrayList());
if (cfs.length == 1) {
// Defensive copy input cf to non-minimal-stage instance in order to
// 1. avoid writing it by `completeOnTimeout` and is able to read its result(`getSuccessNow`)
// 2. ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
final CompletableFuture<T> f = toNonMinCfCopy(cfs[0]);
final CompletableFuture<T> f = toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null"));
return cffuOrTimeout(f, executorWhenTimeout, timeout, unit)
.handle((unused, ex) -> arrayList(getSuccessNow(f, valueIfNotSuccess)));
}
Expand Down Expand Up @@ -286,12 +293,6 @@ private static <T> List<T> arrayList(T... elements) {
return ret;
}

private static <T> CompletableFuture<List<T>> csToListCf(CompletionStage<? extends T> s) {
// Defensive copy input cf to non-minimal-stage instance(toNonMinCfCopy) for SINGLE input
// in order to ensure that the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
return toNonMinCfCopy(s).thenApply(CompletableFutureUtils::arrayList);
}

/**
* Returns a cf array whose elements do the result collection.
*/
Expand Down Expand Up @@ -420,10 +421,12 @@ static <T> boolean isMinStageCf(CompletionStage<? extends T> s) {
@Contract(pure = true)
@SafeVarargs
public static <T> CompletableFuture<T> anyOf(CompletionStage<? extends T>... cfs) {
requireNonNull(cfs, "cfs is null");
if (cfs.length == 0) return new CompletableFuture<>();
// Defensive copy input cf to non-minimal-stage instance for SINGLE input in order to ensure that
// 1. avoid writing the input cf unexpectedly it by caller code
// 2. the returned cf is not non-minimal-stage CF instance(UnsupportedOperationException)
if (cfs.length == 1) return toNonMinCfCopy(cfs[0]);
if (cfs.length == 1) return toNonMinCfCopy(requireNonNull(cfs[0], "cf1 is null"));
CompletableFuture<Object> ret = CompletableFuture.anyOf(f_toCfArray(cfs));
return f_cast(ret);
}
Expand Down Expand Up @@ -665,7 +668,7 @@ public static <T1, T2> CompletableFuture<Tuple2<T1, T2>> mostTupleOfSuccess(
public static <T1, T2> CompletableFuture<Tuple2<T1, T2>> mostTupleOfSuccess(
Executor executorWhenTimeout, long timeout, TimeUnit unit,
CompletionStage<? extends T1> cf1, CompletionStage<? extends T2> cf2) {
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2));
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2);
}

/**
Expand Down Expand Up @@ -704,7 +707,7 @@ public static <T1, T2, T3> CompletableFuture<Tuple3<T1, T2, T3>> mostTupleOfSucc
public static <T1, T2, T3> CompletableFuture<Tuple3<T1, T2, T3>> mostTupleOfSuccess(
Executor executorWhenTimeout, long timeout, TimeUnit unit,
CompletionStage<? extends T1> cf1, CompletionStage<? extends T2> cf2, CompletionStage<? extends T3> cf3) {
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3));
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3);
}

/**
Expand Down Expand Up @@ -745,7 +748,7 @@ public static <T1, T2, T3, T4> CompletableFuture<Tuple4<T1, T2, T3, T4>> mostTup
Executor executorWhenTimeout, long timeout, TimeUnit unit,
CompletionStage<? extends T1> cf1, CompletionStage<? extends T2> cf2,
CompletionStage<? extends T3> cf3, CompletionStage<? extends T4> cf4) {
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3, cf4));
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3, cf4);
}

/**
Expand Down Expand Up @@ -786,11 +789,12 @@ public static <T1, T2, T3, T4, T5> CompletableFuture<Tuple5<T1, T2, T3, T4, T5>>
Executor executorWhenTimeout, long timeout, TimeUnit unit,
CompletionStage<? extends T1> cf1, CompletionStage<? extends T2> cf2, CompletionStage<? extends T3> cf3,
CompletionStage<? extends T4> cf4, CompletionStage<? extends T5> cf5) {
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, requireCfsAndEleNonNull(cf1, cf2, cf3, cf4, cf5));
return mostTupleOfSuccess0(executorWhenTimeout, timeout, unit, cf1, cf2, cf3, cf4, cf5);
}

private static <T> CompletableFuture<T> mostTupleOfSuccess0(
Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage<?>[] css) {
Executor executorWhenTimeout, long timeout, TimeUnit unit, CompletionStage<?>... css) {
requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");
requireNonNull(unit, "unit is null");
// MUST be *Non-Minimal* CF instances in order to read results(`getSuccessNow`),
// otherwise UnsupportedOperationException
Expand Down Expand Up @@ -1263,6 +1267,9 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
*/
public static <T, C extends CompletionStage<? extends T>>
C peek(C cf, BiConsumer<? super T, ? super Throwable> action) {
requireNonNull(cf, "cf is null");
requireNonNull(action, "action is null");

cf.whenComplete(action);
return cf;
}
Expand All @@ -1287,8 +1294,7 @@ C peek(C cf, BiConsumer<? super T, ? super Throwable> action) {
*/
public static <T, C extends CompletionStage<? extends T>>
C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action) {
cf.whenCompleteAsync(action);
return cf;
return peekAsync(cf, action, AsyncPoolHolder.ASYNC_POOL);
}

/**
Expand All @@ -1310,6 +1316,10 @@ C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action) {
*/
public static <T, C extends CompletionStage<? extends T>>
C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(action, "action is null");
requireNonNull(executor, "executor is null");

cf.whenCompleteAsync(action, executor);
return cf;
}
Expand All @@ -1330,10 +1340,10 @@ C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action, Executor exec
*/
@Contract(pure = true)
public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
requireNonNull(ex, "ex is null");
if (IS_JAVA9_PLUS) {
return CompletableFuture.failedFuture(ex);
}
requireNonNull(ex, "ex is null");
final CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(ex);
return cf;
Expand Down Expand Up @@ -1374,9 +1384,7 @@ public static <T> CompletionStage<T> failedStage(Throwable ex) {
if (IS_JAVA9_PLUS) {
return CompletableFuture.failedStage(ex);
}
CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(ex);
return cf;
return failedFuture(ex);
}

//# Delay Execution
Expand Down Expand Up @@ -1405,11 +1413,11 @@ public static Executor delayedExecutor(long delay, TimeUnit unit) {
*/
@Contract(pure = true)
public static Executor delayedExecutor(long delay, TimeUnit unit, Executor executor) {
requireNonNull(unit, "unit is null");
requireNonNull(executor, "executor is null");
if (IS_JAVA9_PLUS) {
return CompletableFuture.delayedExecutor(delay, unit, executor);
}
requireNonNull(unit, "unit is null");
requireNonNull(executor, "executor is null");
return new DelayedExecutor(delay, unit, executor);
}

Expand Down Expand Up @@ -1447,11 +1455,12 @@ C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn) {
@SuppressWarnings("unchecked")
public static <T, C extends CompletionStage<? super T>>
C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
if (IS_JAVA12_PLUS) {
return (C) cf.exceptionallyAsync(fn, executor);
}
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletionStage#exceptionallyAsync
return (C) cf.handle((r, ex) -> (ex == null) ? cf :
cf.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor)
Expand Down Expand Up @@ -1487,6 +1496,10 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(
*/
public static <C extends CompletableFuture<?>> C cffuOrTimeout(
C cf, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
requireNonNull(cf, "cf is null");
requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");
requireNonNull(unit, "unit is null");

final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}
Expand Down Expand Up @@ -1517,10 +1530,11 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(
* @see #cffuOrTimeout(CompletableFuture, long, TimeUnit)
*/
public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, TimeUnit unit) {
requireNonNull(cf, "cf is null");
requireNonNull(unit, "unit is null");
if (IS_JAVA9_PLUS) {
cf.orTimeout(timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
Expand Down Expand Up @@ -1557,6 +1571,10 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
*/
public static <T, C extends CompletableFuture<? super T>>
C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, long timeout, TimeUnit unit) {
requireNonNull(cf, "cf is null");
requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");
requireNonNull(unit, "unit is null");

final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
}
Expand Down Expand Up @@ -1588,10 +1606,11 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l
*/
public static <T, C extends CompletableFuture<? super T>>
C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
requireNonNull(cf, "cf is null");
requireNonNull(unit, "unit is null");
if (IS_JAVA9_PLUS) {
cf.completeOnTimeout(value, timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cf, value, timeout, unit);
Expand Down Expand Up @@ -1623,10 +1642,11 @@ C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T, C extends CompletionStage<? super T>>
C exceptionallyCompose(C cf, Function<Throwable, ? extends CompletionStage<T>> fn) {
requireNonNull(cf, "cf is null");
requireNonNull(fn, "fn is null");
if (IS_JAVA12_PLUS) {
return (C) cf.exceptionallyCompose((Function) fn);
}
requireNonNull(fn, "fn is null");
// below code is copied from CompletionStage.exceptionallyCompose
return (C) cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(x -> x);
}
Expand Down Expand Up @@ -1657,11 +1677,12 @@ C exceptionallyComposeAsync(C cf, Function<Throwable, ? extends CompletionStage<
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T, C extends CompletionStage<? super T>>
C exceptionallyComposeAsync(C cf, Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
if (IS_JAVA12_PLUS) {
return (C) cf.exceptionallyComposeAsync((Function) fn, executor);
}
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletionStage.exceptionallyComposeAsync
return (C) cf.handle((r, ex) -> (ex == null) ? cf :
cf.handleAsync((r1, ex1) -> fn.apply(ex1), executor).thenCompose(x -> x)
Expand Down Expand Up @@ -1703,6 +1724,9 @@ C exceptionallyComposeAsync(C cf, Function<Throwable, ? extends CompletionStage<
@Blocking
@Nullable
public static <T> T join(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
requireNonNull(cf, "cf is null");
requireNonNull(unit, "unit is null");

if (cf.isDone()) return cf.join();
// defensive copy input cf to avoid writing it by `orTimeout`
return orTimeout(copy(cf), timeout, unit).join();
Expand All @@ -1720,6 +1744,7 @@ public static <T> T join(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
@Contract(pure = true)
@Nullable
public static <T> T getSuccessNow(CompletableFuture<? extends T> cf, @Nullable T valueIfNotSuccess) {
requireNonNull(cf, "cf is null");
return cf.isDone() && !cf.isCompletedExceptionally() ? cf.join() : valueIfNotSuccess;
}

Expand All @@ -1739,6 +1764,7 @@ public static <T> T getSuccessNow(CompletableFuture<? extends T> cf, @Nullable T
@Contract(pure = true)
@Nullable
public static <T> T resultNow(Future<T> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA19_PLUS) {
return cf.resultNow();
}
Expand Down Expand Up @@ -1786,6 +1812,7 @@ public static <T> T resultNow(Future<T> cf) {
*/
@Contract(pure = true)
public static Throwable exceptionNow(Future<?> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA19_PLUS) {
return cf.exceptionNow();
}
Expand Down Expand Up @@ -1824,6 +1851,7 @@ public static Throwable exceptionNow(Future<?> cf) {
*/
@Contract(pure = true)
public static CffuState state(Future<?> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA19_PLUS) {
return CffuState.toCffuState(cf.state());
}
Expand Down Expand Up @@ -1884,12 +1912,12 @@ C completeAsync(C cf, Supplier<? extends T> supplier) {
*/
public static <T, C extends CompletableFuture<? super T>>
C completeAsync(C cf, Supplier<? extends T> supplier, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
if (IS_JAVA9_PLUS) {
cf.completeAsync(supplier, executor);
} else {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
executor.execute(new CfCompleterBySupplier<>(cf, supplier));
}
Expand Down Expand Up @@ -1923,6 +1951,7 @@ C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier, Execu
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");

executor.execute(new CfExCompleterBySupplier(cf, supplier));
return cf;
}
Expand All @@ -1943,6 +1972,7 @@ C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier, Execu
*/
@Contract(pure = true)
public static <T> CompletionStage<T> minimalCompletionStage(CompletableFuture<T> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA9_PLUS) {
return cf.minimalCompletionStage();
}
Expand All @@ -1960,6 +1990,7 @@ public static <T> CompletionStage<T> minimalCompletionStage(CompletableFuture<T>
*/
@Contract(pure = true)
public static <T> CompletableFuture<T> copy(CompletableFuture<T> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA9_PLUS) {
return cf.copy();
}
Expand All @@ -1974,6 +2005,7 @@ public static <T> CompletableFuture<T> copy(CompletableFuture<T> cf) {
*/
@Contract(pure = true)
public static <U> CompletableFuture<U> newIncompleteFuture(CompletableFuture<?> cf) {
requireNonNull(cf, "cf is null");
if (IS_JAVA9_PLUS) {
return cf.newIncompleteFuture();
}
Expand Down Expand Up @@ -2030,6 +2062,7 @@ public static <T> CompletableFuture<T>[] toCompletableFutureArray(CompletionStag
*/
@Contract(pure = true)
public static <T> CompletableFuture<T>[] completableFutureListToArray(List<CompletableFuture<T>> cfList) {
requireNonNull(cfList, "cfList is null");
@SuppressWarnings("unchecked")
final CompletableFuture<T>[] a = new CompletableFuture[cfList.size()];
return cfList.toArray(a);
Expand Down

0 comments on commit ed36ec1

Please sign in to comment.