Skip to content

Commit

Permalink
refactor: improve generic type declaration 🧬
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed May 4, 2024
1 parent f9c2f44 commit 908a0fe
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 75 deletions.
141 changes: 75 additions & 66 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public final class CompletableFutureUtils {
* @return a new CompletableFuture that is completed when all the given stages complete
* @throws NullPointerException if the array or any of its elements are {@code null}
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static CompletableFuture<Void> allOf(CompletionStage<?>... cfs) {
return CompletableFuture.allOf(toCompletableFutureArray((CompletionStage[]) cfs));
return CompletableFuture.allOf(fToCfArray(cfs));
}

/**
Expand All @@ -77,7 +76,6 @@ public static CompletableFuture<Void> allOf(CompletionStage<?>... cfs) {
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked"})
public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
Expand All @@ -87,7 +85,7 @@ public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? exte
final Object[] result = new Object[size];
final CompletableFuture<Void>[] resultSetterCfs = createResultSetterCfs(cfs, result);

return CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> (List<T>) arrayList(result));
return fCast(CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> arrayList(result)));
}

/**
Expand All @@ -107,23 +105,22 @@ public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? exte
* @see CompletableFuture#allOf(CompletableFuture[])
*/
@Contract(pure = true)
@SuppressWarnings({"unchecked", "rawtypes"})
public static CompletableFuture<Void> allOfFastFail(CompletionStage<?>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return CompletableFuture.completedFuture(null);
if (size == 1) return cfs[0].toCompletableFuture().thenApply(v -> null);

final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size + 1];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE:
// a cf that is successful when all given cfs success, otherwise be incomplete
failedOrBeIncomplete[size] = CompletableFuture.allOf(successOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete);
return fCast(CompletableFuture.anyOf(failedOrBeIncomplete));
}

/**
Expand All @@ -146,27 +143,26 @@ public static CompletableFuture<Void> allOfFastFail(CompletionStage<?>... cfs) {
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<List<T>> allResultsOfFastFail(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return CompletableFuture.completedFuture(arrayList());
if (size == 1) return csToListCf(cfs[0]);

final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size + 1];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE:
// a cf that is successful when all given cfs success, otherwise be incomplete
failedOrBeIncomplete[size] = allResultsOf(successOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete);
return fCast(CompletableFuture.anyOf(failedOrBeIncomplete));
}

@SafeVarargs
private static <T> CompletionStage<? extends T>[] requireCfsAndEleNonNull(CompletionStage<? extends T>... css) {
private static <S extends CompletionStage<?>> S[] requireCfsAndEleNonNull(S... css) {
requireNonNull(css, "cfs is null");
for (int i = 0; i < css.length; i++) {
requireNonNull(css[i], "cf" + (i + 1) + " is null");
Expand Down Expand Up @@ -202,23 +198,38 @@ private static <T> CompletableFuture<Void>[] createResultSetterCfs(CompletionSta
return resultSetterCfs;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static void fill(CompletionStage[] cfs,
CompletableFuture[] successOrBeIncomplete,
CompletableFuture[] failedOrBeIncomplete) {
final CompletableFuture incomplete = new CompletableFuture();
private static <T> void fill(CompletionStage<? extends T>[] cfs,
CompletableFuture<? extends T>[] successOrBeIncomplete,
CompletableFuture<? extends T>[] failedOrBeIncomplete) {
final CompletableFuture<T> incomplete = new CompletableFuture<>();

for (int i = 0; i < cfs.length; i++) {
final CompletionStage cf = cfs[i];
final CompletionStage<? extends T> cf = cfs[i];

successOrBeIncomplete[i] = cf.toCompletableFuture()
.handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(identity());
.handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(x -> x);

failedOrBeIncomplete[i] = cf.toCompletableFuture()
.handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(identity());
.handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(x -> x);
}
}

/**
* force cast the value type of CompletableFuture.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> CompletableFuture<T> fCast(CompletableFuture<?> f) {
return (CompletableFuture) f;
}

/**
* force converts input {@link CompletionStage} array element to {@link CompletableFuture}.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> CompletableFuture<T>[] fToCfArray(CompletionStage<? extends T>[] cfs) {
return toCompletableFutureArray((CompletionStage[]) cfs);
}

////////////////////////////////////////////////////////////////////////////////
//# anyOf* methods
////////////////////////////////////////////////////////////////////////////////
Expand All @@ -243,9 +254,8 @@ private static void fill(CompletionStage[] cfs,
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<T> anyOf(CompletionStage<? extends T>... cfs) {
return (CompletableFuture) CompletableFuture.anyOf(toCompletableFutureArray((CompletionStage[]) cfs));
return fCast(CompletableFuture.anyOf(fToCfArray(cfs)));
}

/**
Expand All @@ -267,23 +277,22 @@ public static <T> CompletableFuture<T> anyOf(CompletionStage<? extends T>... cfs
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<T> anyOfSuccess(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return failedFuture(new NoCfsProvidedException());
if (size == 1) return (CompletableFuture) copy(cfs[0].toCompletableFuture());
if (size == 1) return fCast(copy(cfs[0].toCompletableFuture()));

// NOTE: fill ONE MORE element of successOrBeIncompleteCfs LATER
final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of successOrBeIncompleteCfs HERE
// a cf that is failed when all given cfs fail, otherwise be incomplete
successOrBeIncomplete[size] = CompletableFuture.allOf(failedOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(successOrBeIncomplete);
return fCast(CompletableFuture.anyOf(successOrBeIncomplete));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -886,7 +895,7 @@ public static <T> CompletableFuture<Void> acceptEitherSuccessAsync(
* @see CompletionStage#applyToEither(CompletionStage, Function)
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccess(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, U> fn) {
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, ? extends U> fn) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");

Expand All @@ -907,7 +916,7 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccess(
* @see CompletionStage#applyToEitherAsync(CompletionStage, Function)
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, U> fn) {
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, ? extends U> fn) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");

Expand All @@ -929,7 +938,7 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2,
Function<? super T, U> fn, Executor executor) {
Function<? super T, ? extends U> fn, Executor executor) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
Expand Down Expand Up @@ -1029,6 +1038,7 @@ public static <T> CompletableFuture<T> failedFuture(Throwable ex) {
if (IS_JAVA9_PLUS) {
return CompletableFuture.failedFuture(ex);
}
requireNonNull(ex, "ex is null");
final CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(ex);
return cf;
Expand Down Expand Up @@ -1163,16 +1173,16 @@ public static <T> CompletableFuture<T> exceptionallyAsync(
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
return cf.orTimeout(timeout, unit);
}

requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
cf.orTimeout(timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
}
}
return cf;
}
Expand All @@ -1185,17 +1195,17 @@ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> cf, long t
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeOnTimeout(
CompletableFuture<T> cf, @Nullable T value, long timeout, TimeUnit unit) {
public static <T, C extends CompletableFuture<? super T>> C completeOnTimeout(
C cf, @Nullable T value, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
return cf.completeOnTimeout(value, timeout, unit);
}

requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cf, value, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
cf.completeOnTimeout(value, timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cf, value, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
}
}
return cf;
}
Expand All @@ -1215,7 +1225,6 @@ public static <T> CompletableFuture<T> exceptionallyCompose(
if (IS_JAVA12_PLUS) {
return cf.exceptionallyCompose(fn);
}

requireNonNull(fn, "fn is null");
// below code is copied from CompletionStage.exceptionallyCompose
return cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(identity());
Expand Down Expand Up @@ -1249,7 +1258,6 @@ public static <T> CompletableFuture<T> exceptionallyComposeAsync(
if (IS_JAVA12_PLUS) {
return cf.exceptionallyComposeAsync(fn, executor);
}

requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletionStage.exceptionallyComposeAsync
Expand Down Expand Up @@ -1294,7 +1302,6 @@ public static <T> CompletableFuture<T> exceptionallyComposeAsync(
@Nullable
public static <T> T join(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
if (cf.isDone()) return cf.join();

return orTimeout(copy(cf), timeout, unit).join();
}

Expand All @@ -1313,7 +1320,7 @@ public static <T> T join(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
*/
@Contract(pure = true)
@Nullable
public static <T> T resultNow(CompletableFuture<T> cf) {
public static <T> T resultNow(CompletableFuture<? extends T> cf) {
if (IS_JAVA19_PLUS) {
return cf.resultNow();
}
Expand Down Expand Up @@ -1350,7 +1357,7 @@ public static <T> T resultNow(CompletableFuture<T> cf) {
* @see CompletableFuture#resultNow()
*/
@Contract(pure = true)
public static <T> Throwable exceptionNow(CompletableFuture<T> cf) {
public static Throwable exceptionNow(CompletableFuture<?> cf) {
if (IS_JAVA19_PLUS) {
return cf.exceptionNow();
}
Expand Down Expand Up @@ -1388,7 +1395,7 @@ public static <T> Throwable exceptionNow(CompletableFuture<T> cf) {
* @see Future.State
*/
@Contract(pure = true)
public static <T> CffuState state(CompletableFuture<T> cf) {
public static CffuState state(CompletableFuture<?> cf) {
if (IS_JAVA19_PLUS) {
return CffuState.toCffuState(cf.state());
}
Expand Down Expand Up @@ -1424,7 +1431,7 @@ public static <T> CffuState state(CompletableFuture<T> cf) {
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> cf, Supplier<? extends T> supplier) {
public static <T, C extends CompletableFuture<? super T>> C completeAsync(C cf, Supplier<? extends T> supplier) {
return completeAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL);
}

Expand All @@ -1436,15 +1443,16 @@ public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> cf, Su
* @param executor the executor to use for asynchronous execution
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeAsync(
CompletableFuture<T> cf, Supplier<? extends T> supplier, Executor executor) {
public static <T, C extends CompletableFuture<? super T>> C completeAsync(
C cf, Supplier<? extends T> supplier, Executor executor) {
if (IS_JAVA9_PLUS) {
return cf.completeAsync(supplier, executor);
cf.completeAsync(supplier, executor);
} else {
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
executor.execute(new CfCompleterBySupplier<>(cf, supplier));
}
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
executor.execute(new CfCompleterBySupplier<>(cf, supplier));
return cf;
}

Expand Down Expand Up @@ -1490,11 +1498,11 @@ public static <T> CompletableFuture<T> copy(CompletableFuture<T> cf) {
/**
* Returns a new incomplete CompletableFuture of the type to be returned by a CompletionStage method.
*
* @param <T> the type of the value
* @param <U> the type of the value
* @return a new CompletableFuture
*/
@Contract(pure = true)
public static <T, U> CompletableFuture<U> newIncompleteFuture(CompletableFuture<T> cf) {
public static <U> CompletableFuture<U> newIncompleteFuture(CompletableFuture<?> cf) {
if (IS_JAVA9_PLUS) {
return cf.newIncompleteFuture();
}
Expand Down Expand Up @@ -1536,6 +1544,7 @@ public static Executor defaultExecutor() {
@Contract(pure = true)
@SafeVarargs
public static <T> CompletableFuture<T>[] toCompletableFutureArray(CompletionStage<T>... stages) {
requireNonNull(stages, "stages is null");
@SuppressWarnings("unchecked")
CompletableFuture<T>[] ret = new CompletableFuture[stages.length];
for (int i = 0; i < stages.length; i++) {
Expand Down
Loading

0 comments on commit 908a0fe

Please sign in to comment.