Skip to content

Commit

Permalink
feat: implement mostResultsOfSuccess methods ☘️
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed May 18, 2024
1 parent 6595ccf commit 68f4359
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 9 deletions.
24 changes: 23 additions & 1 deletion cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,11 @@ public final <T> Cffu<T>[] toCffuArray(CompletionStage<T>... stages) {
}

////////////////////////////////////////////////////////////////////////////////
//# allOf* methods
//# allOf*/mostResultsOfSuccess methods
//
// - allOf / allOfFastFail
// - allResultsOf / allResultsOfFastFail
// - allResultsOf / allResultsOfFastFail
////////////////////////////////////////////////////////////////////////////////

/**
Expand Down Expand Up @@ -425,6 +426,27 @@ public final <T> Cffu<List<T>> allResultsOfFastFail(CompletionStage<? extends T>
return new0(CompletableFutureUtils.allResultsOfFastFail(cfs));
}

/**
* Returns a new Cffu with the most results in the <strong>same order</strong> of
* the given stages in the given time({@code timeout}), aka as many results as possible in the given time.
* <p>
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is {@link Cffu#getSuccessNow(Object)}).
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @param cfs the stages
* @see Cffu#getSuccessNow(Object)
* @see CompletableFutureUtils#batchGetSuccessNow(Object, CompletionStage[])
*/
@Contract(pure = true)
@SafeVarargs
public final <T> Cffu<List<T>> mostResultsOfSuccess(
long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, CompletionStage<? extends T>... cfs) {
return new0(CompletableFutureUtils.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, cfs));
}

////////////////////////////////////////////////////////////////////////////////
//# anyOf* methods:
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@ReturnValuesAreNonnullByDefault
public final class CompletableFutureUtils {
////////////////////////////////////////////////////////////////////////////////
//# allOf* methods
//# allOf*/mostResultsOfSuccess methods
////////////////////////////////////////////////////////////////////////////////

/**
Expand Down Expand Up @@ -185,6 +185,38 @@ public static <T> CompletableFuture<List<T>> allResultsOfFastFail(CompletionStag
return f_cast(ret);
}

/**
* Returns a new CompletableFuture with the most results in the <strong>same order</strong> of
* the given stages in the given time({@code timeout}), aka as many results as possible in the given time.
* <p>
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is {@link #getSuccessNow(CompletionStage, Object)}).
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @param cfs the stages
* @see #batchGetSuccessNow(Object, CompletionStage[])
* @see #getSuccessNow(CompletionStage, Object)
*/
@Contract(pure = true)
@SafeVarargs
public static <T> CompletableFuture<List<T>> mostResultsOfSuccess(
long timeout, TimeUnit unit, @Nullable T valueIfNotSuccess, CompletionStage<? extends T>... cfs) {
requireNonNull(unit, "unit is null");
requireCfsAndEleNonNull(cfs);

if (cfs.length == 0) return CompletableFuture.completedFuture(arrayList());
if (cfs.length == 1) {
final CompletableFuture<T> f = copy(toCf(cfs[0]));
return orTimeout(f, timeout, unit).handle((unused, ex) -> arrayList(getSuccessNow(f, valueIfNotSuccess)));
}

final CompletableFuture<T>[] cfArray = f_toCfArray(cfs);
return orTimeout(CompletableFuture.allOf(cfArray), timeout, unit)
.handle((unused, ex) -> batchGetSuccessNow(valueIfNotSuccess, cfArray));
}

@SafeVarargs
private static <S extends CompletionStage<?>> S[] requireCfsAndEleNonNull(S... css) {
requireNonNull(css, "cfs is null");
Expand Down Expand Up @@ -1444,6 +1476,7 @@ public static CffuState state(CompletableFuture<?> cf) {
* (aka the result extraction logic is {@link #getSuccessNow(CompletionStage, Object)}).
*
* @param cfs the stages
* @see #mostResultsOfSuccess(long, TimeUnit, Object, CompletionStage[])
* @see #getSuccessNow(CompletionStage, Object)
*/
@Contract(pure = true)
Expand Down
25 changes: 25 additions & 0 deletions cffu-core/src/test/java/io/foldright/cffu/CffuFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.*;
import java.util.function.Function;

import static io.foldright.cffu.CompletableFutureUtils.failedFuture;
import static io.foldright.cffu.CompletableFutureUtils.toCompletableFutureArray;
Expand Down Expand Up @@ -294,6 +295,30 @@ void test_allResultsOf_exceptionally() throws Exception {
}
}

@Test
void test_mostOf() throws Exception {
final Cffu<Integer> completed = cffuFactory.completedFuture(n);
final Cffu<Integer> failed = cffuFactory.failedFuture(rte);
final Cffu<Integer> cancelled = cffuFactory.toCffu(createCancelledFuture());
final Cffu<Integer> incomplete = cffuFactory.toCffu(createIncompleteFuture());

assertEquals(Arrays.asList(n, null, null, null), cffuFactory.mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, completed, failed, cancelled, incomplete
).get());
assertEquals(Arrays.asList(n, anotherN, anotherN, anotherN), cffuFactory.mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, completed, failed, cancelled, incomplete
).get());

assertEquals(Arrays.asList(anotherN, anotherN, anotherN), cffuFactory.mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, failed, cancelled, incomplete
).get());

// do not wait for failed and cancelled
assertEquals(Arrays.asList(anotherN, anotherN), cffuFactory.mostResultsOfSuccess(
10, TimeUnit.DAYS, anotherN, failed, cancelled
).get());
}

@Test
void test_anyOf() throws Exception {
assertEquals(n, cffuFactory.anyOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,51 @@ void test_allOf__exceptionally() throws Exception {
}
}

@Test
void test_mostOf() throws Exception {
final CompletableFuture<Integer> completed = completedFuture(n);
final CompletableFuture<Integer> failed = failedFuture(rte);
final CompletableFuture<Integer> cancelled = createCancelledFuture();
final CompletableFuture<Integer> incomplete = createIncompleteFuture();

assertEquals(0, mostResultsOfSuccess(10, TimeUnit.MILLISECONDS, null).get().size());

assertEquals(Collections.singletonList(n), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, completed).get());

assertEquals(Arrays.asList(n, null, null, null), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, completed, failed, cancelled, incomplete
).get());
assertEquals(Arrays.asList(n, anotherN, anotherN, anotherN), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, completed, failed, cancelled, incomplete
).get());

assertEquals(Arrays.asList(anotherN, anotherN, anotherN), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, failed, cancelled, incomplete
).get());

// do not wait for failed and cancelled
assertEquals(Arrays.asList(anotherN, anotherN), mostResultsOfSuccess(
10, TimeUnit.DAYS, anotherN, failed, cancelled
).get());
}

@Test
void test_mostOf_wontModifyInputCf() throws Exception {
final CompletableFuture<Integer> incomplete = createIncompleteFuture();
final CompletableFuture<Integer> incomplete2 = createIncompleteFuture();

assertEquals(Collections.singletonList(null), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, incomplete
).get());
assertEquals(Arrays.asList(null, null), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, incomplete, incomplete2
).get());

assertEquals(CffuState.RUNNING, state(incomplete));
assertEquals(CffuState.RUNNING, state(incomplete2));
}

////////////////////////////////////////////////////////////////////////////////
//# anyOf* methods
////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package io.foldright.cffu.kotlin

import io.foldright.cffu.Cffu
import io.foldright.cffu.CffuFactory
import io.foldright.cffu.CompletableFutureUtils
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit


////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -49,7 +51,7 @@ fun <T> Array<out CompletionStage<T>>.toCffu(cffuFactory: CffuFactory): Array<Cf
cffuFactory.toCffuArray(*this)

////////////////////////////////////////
// allOf* methods
//# allOf*/mostResultsOfSuccess* methods for Array/Collection
//
// - allResultsOfCffu
// - allOfCffu
Expand Down Expand Up @@ -389,8 +391,91 @@ fun Collection<CompletionStage<*>>.allOfFastFailCffu(cffuFactory: CffuFactory):
fun Array<out CompletionStage<*>>.allOfFastFailCffu(cffuFactory: CffuFactory): Cffu<Void> =
cffuFactory.allOfFastFail(*this)

/**
* Returns a new Cffu with the most results in the **same order** of
* the given Cffus in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [Cffu.getSuccessNow]).
*
* The result extraction logic can be customized using another overloaded [mostResultsOfSuccessCffu] method.
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see Cffu.getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
fun <T> Collection<Cffu<out T>>.mostResultsOfSuccessCffu(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T, cffuFactory: CffuFactory = ABSENT
): Cffu<List<T>> {
val factory: CffuFactory = if (cffuFactory !== ABSENT) cffuFactory
else firstOrNull()?.cffuFactory() ?: throw IllegalArgumentException(ERROR_MSG_FOR_COLL)
return factory.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this.toTypedArray())
}

/**
* Returns a new Cffu with the most results in the **same order** of
* the given Cffus in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given Cffu is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [Cffu.getSuccessNow]).
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see Cffu.getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
fun <T> Array<out Cffu<out T>>.mostResultsOfSuccessCffu(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T, cffuFactory: CffuFactory = ABSENT
): Cffu<List<T>> {
val factory: CffuFactory = if (cffuFactory !== ABSENT) cffuFactory
else firstOrNull()?.cffuFactory() ?: throw IllegalArgumentException(ERROR_MSG_FOR_COLL)
return factory.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this)
}

/**
* Returns a new Cffu with the most results in the **same order** of
* the given stages in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [Cffu.getSuccessNow]).
*
* The result extraction logic can be customized using another overloaded [mostResultsOfSuccessCffu] method.
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see Cffu.getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
@JvmName("mostResultsOfSuccessCffuCs")
fun <T> Collection<CompletionStage<out T>>.mostResultsOfSuccessCffu(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T, cffuFactory: CffuFactory
): Cffu<List<T>> =
cffuFactory.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this.toTypedArray())

/**
* Returns a new Cffu with the most results in the **same order** of
* the given stages in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [Cffu.getSuccessNow]).
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see Cffu.getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
fun <T> Array<out CompletionStage<out T>>.mostResultsOfSuccessCffu(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T, cffuFactory: CffuFactory
): Cffu<List<T>> =
cffuFactory.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this)

////////////////////////////////////////
// anyOf* methods
//# anyOf* methods for Array/Collection
//
// - anyOfCffu
// - anyOfSuccessCffu
Expand Down Expand Up @@ -527,7 +612,7 @@ fun <T> Array<out CompletionStage<out T>>.anyOfSuccessCffu(cffuFactory: CffuFact
cffuFactory.anyOfSuccess(*this)

////////////////////////////////////////
// cffuUnwrap methods
// cffuUnwrap methods for Array/Collection
////////////////////////////////////////

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import java.util.function.Function
////////////////////////////////////////////////////////////////////////////////

////////////////////////////////////////
//# allOf* methods for Array/Collection
//# allOf*/mostResultsOfSuccess* methods for Array/Collection
//
// - allOfCompletableFuture / allOfFastFailCompletableFuture
// - allResultsOfCompletableFuture / allResultsOfFastFailCompletableFuture
// - allOfCompletableFuture
// - allOfFastFailCompletableFuture
// - allResultsOfCompletableFuture
// - allResultsOfFastFailCompletableFuture
////////////////////////////////////////

/**
Expand Down Expand Up @@ -205,8 +207,44 @@ fun <T> Collection<CompletionStage<out T>>.allResultsOfFastFailCompletableFuture
fun <T> Array<out CompletionStage<out T>>.allResultsOfFastFailCompletableFuture(): CompletableFuture<List<T>> =
CompletableFutureUtils.allResultsOfFastFail(*this)

/**
* Returns a new CompletableFuture with the most results in the **same order** of
* the given stages in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [getSuccessNow]).
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
fun <T> Collection<CompletionStage<out T>>.mostResultsOfSuccessCompletableFuture(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T
): CompletableFuture<List<T>> =
CompletableFutureUtils.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this.toTypedArray())

/**
* Returns a new CompletableFuture with the most results in the **same order** of
* the given stages in the given time(`timeout`), aka as many results as possible in the given time.
*
* If the given stage is successful, its result is the completed value; Otherwise the given valueIfNotSuccess.
* (aka the result extraction logic is [getSuccessNow]).
*
* @param timeout how long to wait in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @param valueIfNotSuccess the value to return if not completed successfully
* @see getSuccessNow
* @see CompletableFutureUtils.batchGetSuccessNow
*/
fun <T> Array<out CompletionStage<out T>>.mostResultsOfSuccessCompletableFuture(
timeout: Long, unit: TimeUnit, valueIfNotSuccess: T
): CompletableFuture<List<T>> =
CompletableFutureUtils.mostResultsOfSuccess(timeout, unit, valueIfNotSuccess, *this)

////////////////////////////////////////
//# anyOf* methods
//# anyOf* methods for Array/Collection
//
// - anyOfCompletableFuture
// - anyOfSuccessCompletableFuture
Expand Down
Loading

0 comments on commit 68f4359

Please sign in to comment.