Skip to content

Commit

Permalink
refactor: improve generic type declaration 🧬
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed May 4, 2024
1 parent 00319fa commit b6b8df0
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 85 deletions.
143 changes: 78 additions & 65 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public final class CompletableFutureUtils {
* @return a new CompletableFuture that is completed when all the given stages complete
* @throws NullPointerException if the array or any of its elements are {@code null}
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static CompletableFuture<Void> allOf(CompletionStage<?>... cfs) {
return CompletableFuture.allOf(toCompletableFutureArray((CompletionStage[]) cfs));
return CompletableFuture.allOf(f_toCfArray(cfs));
}

/**
Expand All @@ -77,7 +76,6 @@ public static CompletableFuture<Void> allOf(CompletionStage<?>... cfs) {
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked"})
public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
Expand All @@ -87,7 +85,7 @@ public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? exte
final Object[] result = new Object[size];
final CompletableFuture<Void>[] resultSetterCfs = createResultSetterCfs(cfs, result);

return CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> (List<T>) arrayList(result));
return f_cast(CompletableFuture.allOf(resultSetterCfs).thenApply(unused -> arrayList(result)));
}

/**
Expand All @@ -107,23 +105,22 @@ public static <T> CompletableFuture<List<T>> allResultsOf(CompletionStage<? exte
* @see CompletableFuture#allOf(CompletableFuture[])
*/
@Contract(pure = true)
@SuppressWarnings({"unchecked", "rawtypes"})
public static CompletableFuture<Void> allOfFastFail(CompletionStage<?>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return CompletableFuture.completedFuture(null);
if (size == 1) return cfs[0].toCompletableFuture().thenApply(v -> null);

final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size + 1];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE:
// a cf that is successful when all given cfs success, otherwise be incomplete
failedOrBeIncomplete[size] = CompletableFuture.allOf(successOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete);
return f_cast(CompletableFuture.anyOf(failedOrBeIncomplete));
}

/**
Expand All @@ -146,27 +143,26 @@ public static CompletableFuture<Void> allOfFastFail(CompletionStage<?>... cfs) {
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<List<T>> allResultsOfFastFail(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return CompletableFuture.completedFuture(arrayList());
if (size == 1) return csToListCf(cfs[0]);

final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size + 1];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE:
// a cf that is successful when all given cfs success, otherwise be incomplete
failedOrBeIncomplete[size] = allResultsOf(successOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(failedOrBeIncomplete);
return f_cast(CompletableFuture.anyOf(failedOrBeIncomplete));
}

@SafeVarargs
private static <T> CompletionStage<? extends T>[] requireCfsAndEleNonNull(CompletionStage<? extends T>... css) {
private static <S extends CompletionStage<?>> S[] requireCfsAndEleNonNull(S... css) {
requireNonNull(css, "cfs is null");
for (int i = 0; i < css.length; i++) {
requireNonNull(css[i], "cf" + (i + 1) + " is null");
Expand Down Expand Up @@ -202,23 +198,40 @@ private static <T> CompletableFuture<Void>[] createResultSetterCfs(CompletionSta
return resultSetterCfs;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static void fill(CompletionStage[] cfs,
CompletableFuture[] successOrBeIncomplete,
CompletableFuture[] failedOrBeIncomplete) {
final CompletableFuture incomplete = new CompletableFuture();
private static <T> void fill(CompletionStage<? extends T>[] cfs,
CompletableFuture<? extends T>[] successOrBeIncomplete,
CompletableFuture<? extends T>[] failedOrBeIncomplete) {
final CompletableFuture<T> incomplete = new CompletableFuture<>();

for (int i = 0; i < cfs.length; i++) {
final CompletionStage cf = cfs[i];
final CompletionStage<? extends T> cf = cfs[i];

successOrBeIncomplete[i] = cf.toCompletableFuture()
.handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(identity());
.handle((v, ex) -> ex == null ? cf : incomplete).thenCompose(x -> x);

failedOrBeIncomplete[i] = cf.toCompletableFuture()
.handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(identity());
.handle((v, ex) -> ex == null ? incomplete : cf).thenCompose(x -> x);
}
}

/**
* Force cast CompletableFuture with the value type,
* IGNORE the compile-time type check.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> CompletableFuture<T> f_cast(CompletableFuture<?> f) {
return (CompletableFuture) f;
}

/**
* Force converts {@link CompletionStage} array to {@link CompletableFuture} array,
* IGNORE the compile-time type check.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> CompletableFuture<T>[] f_toCfArray(CompletionStage<? extends T>[] cfs) {
return toCompletableFutureArray((CompletionStage[]) cfs);
}

////////////////////////////////////////////////////////////////////////////////
//# anyOf* methods
////////////////////////////////////////////////////////////////////////////////
Expand All @@ -243,9 +256,8 @@ private static void fill(CompletionStage[] cfs,
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<T> anyOf(CompletionStage<? extends T>... cfs) {
return (CompletableFuture) CompletableFuture.anyOf(toCompletableFutureArray((CompletionStage[]) cfs));
return f_cast(CompletableFuture.anyOf(f_toCfArray(cfs)));
}

/**
Expand All @@ -267,23 +279,22 @@ public static <T> CompletableFuture<T> anyOf(CompletionStage<? extends T>... cfs
*/
@Contract(pure = true)
@SafeVarargs
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> CompletableFuture<T> anyOfSuccess(CompletionStage<? extends T>... cfs) {
requireCfsAndEleNonNull(cfs);
final int size = cfs.length;
if (size == 0) return failedFuture(new NoCfsProvidedException());
if (size == 1) return (CompletableFuture) copy(cfs[0].toCompletableFuture());
if (size == 1) return f_cast(copy(cfs[0].toCompletableFuture()));

// NOTE: fill ONE MORE element of successOrBeIncompleteCfs LATER
final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[size];
final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[size + 1];
final CompletableFuture<?>[] failedOrBeIncomplete = new CompletableFuture[size];
fill(cfs, successOrBeIncomplete, failedOrBeIncomplete);

// NOTE: fill the ONE MORE element of successOrBeIncompleteCfs HERE
// a cf that is failed when all given cfs fail, otherwise be incomplete
successOrBeIncomplete[size] = CompletableFuture.allOf(failedOrBeIncomplete);

return (CompletableFuture) CompletableFuture.anyOf(successOrBeIncomplete);
return f_cast(CompletableFuture.anyOf(successOrBeIncomplete));
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -886,7 +897,7 @@ public static <T> CompletableFuture<Void> acceptEitherSuccessAsync(
* @see CompletionStage#applyToEither(CompletionStage, Function)
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccess(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, U> fn) {
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, ? extends U> fn) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");

Expand All @@ -907,7 +918,7 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccess(
* @see CompletionStage#applyToEitherAsync(CompletionStage, Function)
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, U> fn) {
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2, Function<? super T, ? extends U> fn) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");

Expand All @@ -929,7 +940,7 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
*/
public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
CompletionStage<? extends T> cf1, CompletionStage<? extends T> cf2,
Function<? super T, U> fn, Executor executor) {
Function<? super T, ? extends U> fn, Executor executor) {
final CompletionStage<? extends T>[] css = requireCfsAndEleNonNull(cf1, cf2);
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
Expand Down Expand Up @@ -1164,16 +1175,16 @@ public static <T> CompletableFuture<T> exceptionallyAsync(
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
public static <C extends CompletableFuture<?>> C orTimeout(C cf, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
return cf.orTimeout(timeout, unit);
}

requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
cf.orTimeout(timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#orTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToTimoutCf(cf, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
}
}
return cf;
}
Expand All @@ -1186,17 +1197,17 @@ public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> cf, long t
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeOnTimeout(
CompletableFuture<T> cf, @Nullable T value, long timeout, TimeUnit unit) {
public static <T, C extends CompletableFuture<? super T>> C completeOnTimeout(
C cf, @Nullable T value, long timeout, TimeUnit unit) {
if (IS_JAVA9_PLUS) {
return cf.completeOnTimeout(value, timeout, unit);
}

requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cf, value, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
cf.completeOnTimeout(value, timeout, unit);
} else {
requireNonNull(unit, "unit is null");
// below code is copied from CompletableFuture#completeOnTimeout with small adoption
if (!cf.isDone()) {
ScheduledFuture<?> f = Delayer.delayToCompleteCf(cf, value, timeout, unit);
cf.whenComplete(new FutureCanceller(f));
}
}
return cf;
}
Expand Down Expand Up @@ -1311,7 +1322,7 @@ public static <T> T join(CompletableFuture<T> cf, long timeout, TimeUnit unit) {
*/
@Contract(pure = true)
@Nullable
public static <T> T resultNow(CompletableFuture<T> cf) {
public static <T> T resultNow(CompletableFuture<? extends T> cf) {
if (IS_JAVA19_PLUS) {
return cf.resultNow();
}
Expand Down Expand Up @@ -1348,7 +1359,7 @@ public static <T> T resultNow(CompletableFuture<T> cf) {
* @see CompletableFuture#resultNow()
*/
@Contract(pure = true)
public static <T> Throwable exceptionNow(CompletableFuture<T> cf) {
public static Throwable exceptionNow(CompletableFuture<?> cf) {
if (IS_JAVA19_PLUS) {
return cf.exceptionNow();
}
Expand Down Expand Up @@ -1386,7 +1397,7 @@ public static <T> Throwable exceptionNow(CompletableFuture<T> cf) {
* @see Future.State
*/
@Contract(pure = true)
public static <T> CffuState state(CompletableFuture<T> cf) {
public static CffuState state(CompletableFuture<?> cf) {
if (IS_JAVA19_PLUS) {
return CffuState.toCffuState(cf.state());
}
Expand Down Expand Up @@ -1422,7 +1433,7 @@ public static <T> CffuState state(CompletableFuture<T> cf) {
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> cf, Supplier<? extends T> supplier) {
public static <T, C extends CompletableFuture<? super T>> C completeAsync(C cf, Supplier<? extends T> supplier) {
return completeAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL);
}

Expand All @@ -1434,15 +1445,16 @@ public static <T> CompletableFuture<T> completeAsync(CompletableFuture<T> cf, Su
* @param executor the executor to use for asynchronous execution
* @return given CompletableFuture
*/
public static <T> CompletableFuture<T> completeAsync(
CompletableFuture<T> cf, Supplier<? extends T> supplier, Executor executor) {
public static <T, C extends CompletableFuture<? super T>> C completeAsync(
C cf, Supplier<? extends T> supplier, Executor executor) {
if (IS_JAVA9_PLUS) {
return cf.completeAsync(supplier, executor);
cf.completeAsync(supplier, executor);
} else {
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
executor.execute(new CfCompleterBySupplier<>(cf, supplier));
}
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
executor.execute(new CfCompleterBySupplier<>(cf, supplier));
return cf;
}

Expand Down Expand Up @@ -1488,11 +1500,11 @@ public static <T> CompletableFuture<T> copy(CompletableFuture<T> cf) {
/**
* Returns a new incomplete CompletableFuture of the type to be returned by a CompletionStage method.
*
* @param <T> the type of the value
* @param <U> the type of the value
* @return a new CompletableFuture
*/
@Contract(pure = true)
public static <T, U> CompletableFuture<U> newIncompleteFuture(CompletableFuture<T> cf) {
public static <U> CompletableFuture<U> newIncompleteFuture(CompletableFuture<?> cf) {
if (IS_JAVA9_PLUS) {
return cf.newIncompleteFuture();
}
Expand Down Expand Up @@ -1547,9 +1559,10 @@ public static <T> CompletableFuture<T>[] toCompletableFutureArray(CompletionStag
* Convert CompletableFuture list to CompletableFuture array.
*/
@Contract(pure = true)
@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T>[] completableFutureListToArray(List<CompletableFuture<T>> cfList) {
return cfList.toArray(new CompletableFuture[0]);
@SuppressWarnings("unchecked")
final CompletableFuture<T>[] a = new CompletableFuture[0];
return cfList.toArray(a);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public static ScheduledFuture<?> delayToTimoutCf(CompletableFuture<?> cf, long d
* @return a Future can be used to cancel the delayed task(complete CF)
* @see FutureCanceller
*/
public static <T> ScheduledFuture<?> delayToCompleteCf(CompletableFuture<T> cf, T value, long delay, TimeUnit unit) {
public static <T> ScheduledFuture<?> delayToCompleteCf(
CompletableFuture<? super T> cf, T value, long delay, TimeUnit unit) {
return delay(new CfCompleter<>(cf, value), delay, unit);
}

Expand Down Expand Up @@ -128,11 +129,10 @@ public void run() {
* Action to complete cf
*/
final class CfCompleter<T> implements Runnable {
private final CompletableFuture<T> cf;
private final CompletableFuture<? super T> cf;
private final T value;

@SuppressWarnings("BoundedWildcard")
CfCompleter(CompletableFuture<T> cf, T value) {
CfCompleter(CompletableFuture<? super T> cf, T value) {
this.cf = cf;
this.value = value;
}
Expand Down Expand Up @@ -171,10 +171,10 @@ public void accept(Object ignore, Throwable ex) {
@SuppressFBWarnings("SE_BAD_FIELD")
final class CfCompleterBySupplier<T> extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
CompletableFuture<T> dep;
Supplier<? extends T> fn;
private CompletableFuture<? super T> dep;
private Supplier<? extends T> fn;

CfCompleterBySupplier(CompletableFuture<T> dep, Supplier<? extends T> fn) {
CfCompleterBySupplier(CompletableFuture<? super T> dep, Supplier<? extends T> fn) {
this.dep = dep;
this.fn = fn;
}
Expand All @@ -196,7 +196,7 @@ public boolean exec() {

@Override
public void run() {
CompletableFuture<T> d;
CompletableFuture<? super T> d;
Supplier<? extends T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null;
Expand Down
Loading

0 comments on commit b6b8df0

Please sign in to comment.