Skip to content

Commit

Permalink
fix: use toNonMinCf instead of toCf in method `mostResultsOfSucce…
Browse files Browse the repository at this point in the history
…ss` 🔄💣

avoid `UnsupportedOperationException` for minimal stage input
  • Loading branch information
oldratlee committed May 24, 2024
1 parent 61125ed commit 3c9ffd1
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,12 @@ public static <T> CompletableFuture<List<T>> mostResultsOfSuccess(

if (cfs.length == 0) return CompletableFuture.completedFuture(arrayList());
if (cfs.length == 1) {
final CompletableFuture<T> f = copy(toCf(cfs[0]));
final CompletableFuture<T> f = copy(toNonMinCf(cfs[0]));
return orTimeout(f, timeout, unit).handle((unused, ex) -> arrayList(getSuccessNow(f, valueIfNotSuccess)));
}

final CompletableFuture<T>[] cfArray = f_toCfArray(cfs);
// MUST be Non-Minimal CF instances in order to read results, otherwise UnsupportedOperationException
final CompletableFuture<T>[] cfArray = f_toNonMinCfArray(cfs);
return orTimeout(CompletableFuture.allOf(cfArray), timeout, unit)
.handle((unused, ex) -> MGetSuccessNow(valueIfNotSuccess, cfArray));
}
Expand Down Expand Up @@ -279,17 +280,31 @@ private static <T> CompletableFuture<T> f_cast(CompletableFuture<?> f) {
* IGNORE the compile-time type check.
*/
private static <T> CompletableFuture<T>[] f_toCfArray(CompletionStage<? extends T>[] stages) {
return toCfArray0(stages, CompletableFutureUtils::toCf);
}

/**
* Force converts {@link CompletionStage} array to {@link CompletableFuture} array,
* IGNORE the compile-time type check.
*/
private static <T> CompletableFuture<T>[] f_toNonMinCfArray(CompletionStage<? extends T>[] stages) {
return toCfArray0(stages, CompletableFutureUtils::toNonMinCf);
}

private static <T> CompletableFuture<T>[] toCfArray0(
CompletionStage<? extends T>[] stages,
Function<CompletionStage<? extends T>, CompletableFuture<T>> converter) {
requireNonNull(stages, "cfs is null");
@SuppressWarnings("unchecked")
CompletableFuture<T>[] ret = new CompletableFuture[stages.length];
for (int i = 0; i < stages.length; i++) {
ret[i] = toCf(requireNonNull(stages[i], "cf" + (i + 1) + " is null"));
ret[i] = converter.apply(requireNonNull(stages[i], "cf" + (i + 1) + " is null"));
}
return ret;
}

/**
* Converts CompletionStage to CompletableFuture, reuse cf instance as much as possible.
* Converts CompletionStage to CompletableFuture, reuse cf instance as many as possible.
* <p>
* <strong>CAUTION:</strong> because reused the CF instances,
* so the returned CF instances MUST NOT be written(e.g. {@link CompletableFuture#complete(Object)}).
Expand All @@ -302,6 +317,24 @@ private static <T> CompletableFuture<T> toCf(CompletionStage<? extends T> s) {
else return (CompletableFuture) s.toCompletableFuture();
}

/**
* Converts CompletionStage to Non-MinimalStage CompletableFuture, reuse cf instance as many as possible.
* <p>
* <strong>CAUTION:</strong> because reused the CF instances,
* so the returned CF instances MUST NOT be written(e.g. {@link CompletableFuture#complete(Object)}).
* Otherwise, the caller should defensive copy instead of writing it directly.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> CompletableFuture<T> toNonMinCf(CompletionStage<? extends T> s) {
final CompletableFuture<T> f;
if (s instanceof CompletableFuture) f = (CompletableFuture<T>) s;
else if (s instanceof Cffu) f = ((Cffu) s).cffuUnwrap();
else return (CompletableFuture) s.toCompletableFuture();

return "java.util.concurrent.CompletableFuture$MinimalStage".equals(s.getClass().getName())
? f.toCompletableFuture() : f;
}

////////////////////////////////////////////////////////////////////////////////
//# anyOf* methods
////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,48 @@ class CompletableFutureUtilsTest {
void test_allOf__success__trivial_case() throws Exception {
assertEquals(Arrays.asList(n, n + 1, n + 2), allResultsOf(
completedFuture(n),
completedFuture(n + 1),
completedStage(n + 1),
completedFuture(n + 2)
).get());

assertEquals(Arrays.asList(n, n + 1), allResultsOf(completedFuture(n), completedFuture(n + 1)
assertEquals(Arrays.asList(n, n + 1), allResultsOf(
completedFuture(n),
completedStage(n + 1)
).get());

assertEquals(Collections.singletonList(n), allResultsOf(completedFuture(n)).get());
assertEquals(Collections.singletonList(n), allResultsOf(completedStage(n)).toCompletableFuture().get());
assertThrows(UnsupportedOperationException.class, () -> allResultsOf(completedStage(n)).get());

assertEquals(Collections.emptyList(), allResultsOf().get());

////////////////////////////////////////////////////////////////////////////////

assertEquals(Arrays.asList(n, n + 1, n + 2), allResultsOfFastFail(
completedFuture(n),
completedFuture(n + 1),
completedStage(n + 1),
completedFuture(n + 2)
).get());

assertEquals(Arrays.asList(n, n + 1), allResultsOfFastFail(
completedFuture(n),
completedFuture(n + 1)
completedStage(n + 1)
).get());

assertEquals(Collections.singletonList(n), allResultsOfFastFail(completedFuture(n)).get());
assertEquals(Collections.singletonList(n), allResultsOfFastFail(completedStage(n)).toCompletableFuture().get());
assertThrows(UnsupportedOperationException.class, () -> allResultsOfFastFail(completedStage(n)).get());

assertEquals(Collections.emptyList(), allResultsOfFastFail().get());

////////////////////////////////////////////////////////////////////////////////

assertNull(allOfFastFail(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get());

assertNull(allOfFastFail(completedFuture(n), completedFuture(n + 1)).get());

assertNull(allOfFastFail(completedFuture(n)).get());

assertNull(allOfFastFail().get());
Arrays.asList(
allOfFastFail(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)),
allOfFastFail(completedFuture(n), completedFuture(n + 1)),
allOfFastFail(completedFuture(n)),
allOfFastFail()
).forEach(f -> assertNull(f.join()));
}

@Test
Expand Down Expand Up @@ -319,6 +324,7 @@ void test_allOf__exceptionally() throws Exception {
@Test
void test_mostOf() throws Exception {
final CompletableFuture<Integer> completed = completedFuture(n);
final CompletableFuture<Integer> completedStage = completedFuture(n);
final CompletableFuture<Integer> failed = failedFuture(rte);
final CompletableFuture<Integer> cancelled = createCancelledFuture();
final CompletableFuture<Integer> incomplete = createIncompleteFuture();
Expand All @@ -327,12 +333,14 @@ void test_mostOf() throws Exception {

assertEquals(Collections.singletonList(n), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, completed).get());
assertEquals(Collections.singletonList(n), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, completedStage).get());

assertEquals(Arrays.asList(n, null, null, null), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, null, completed, failed, cancelled, incomplete
).get());
assertEquals(Arrays.asList(n, anotherN, anotherN, anotherN), mostResultsOfSuccess(
10, TimeUnit.MILLISECONDS, anotherN, completed, failed, cancelled, incomplete
10, TimeUnit.MILLISECONDS, anotherN, completedStage, failed, cancelled, incomplete
).get());

assertEquals(Arrays.asList(anotherN, anotherN, anotherN), mostResultsOfSuccess(
Expand Down Expand Up @@ -367,31 +375,34 @@ void test_mostOf_wontModifyInputCf() throws Exception {

@Test
void test_anyOf__success__trivial_case() throws Exception {
assertEquals(n, anyOf(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get());
assertEquals(n, anyOf(completedFuture(n), completedFuture(n + 1)).get());
assertEquals(n, anyOf(completedFuture(n), completedStage(n + 1), completedFuture(n + 2)).get());
assertEquals(n, anyOf(completedFuture(n), completedStage(n + 1)).get());

assertEquals(n, anyOf(completedFuture(n)).get());
assertEquals(n, anyOf(completedStage(n)).toCompletableFuture().get());
assertThrows(UnsupportedOperationException.class, () -> anyOf(completedStage(n)).get());
assertFalse(anyOf().isDone());

// success with incomplete CF
assertEquals(n, anyOf(createIncompleteFuture(), createIncompleteFuture(), completedFuture(n)).get());

////////////////////////////////////////

assertEquals(n, anyOfSuccess(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get());
assertEquals(n, anyOfSuccess(completedFuture(n), completedFuture(n + 1)).get());
assertEquals(n, anyOfSuccess(completedFuture(n), completedStage(n + 1), completedFuture(n + 2)).get());
assertEquals(n, anyOfSuccess(completedFuture(n), completedStage(n + 1)).get());

assertEquals(n, anyOfSuccess(completedFuture(n)).get());
try {
anyOfSuccess().get();
assertEquals(n, anyOfSuccess(completedStage(n)).toCompletableFuture().get());
assertThrows(UnsupportedOperationException.class, () -> anyOfSuccess(completedStage(n)).get());

fail();
} catch (ExecutionException expected) {
assertSame(NoCfsProvidedException.class, expected.getCause().getClass());
}
assertInstanceOf(NoCfsProvidedException.class,
assertThrows(ExecutionException.class,
() -> anyOfSuccess().get()
).getCause());

// success with incomplete CF
assertEquals(n, anyOfSuccess(createIncompleteFuture(), createIncompleteFuture(), completedFuture(n)).get());
assertEquals(n, anyOfSuccess(createIncompleteFuture(), createIncompleteFuture(), completedStage(n)).get());
}

@Test
Expand Down Expand Up @@ -569,9 +580,9 @@ void test_anyOf__concurrent() throws Exception {
@Test
void test_allTupleOf() throws Exception {
final CompletableFuture<Integer> cf_n = completedFuture(n);
final CompletableFuture<String> cf_s = completedFuture(s);
final CompletionStage<String> cf_s = completedStage(s);
final CompletableFuture<Double> cf_d = completedFuture(d);
final CompletableFuture<Integer> cf_an = completedFuture(anotherN);
final CompletionStage<Integer> cf_an = completedStage(anotherN);
final CompletableFuture<Integer> cf_nn = completedFuture(n + n);

assertEquals(Tuple2.of(n, s), allTupleOf(cf_n, cf_s).get());
Expand All @@ -593,9 +604,9 @@ void test_allTupleOf_exceptionally() throws Exception {
final CompletableFuture<Object> fail = failedFuture(rte);

final CompletableFuture<Integer> cf_n = completedFuture(n);
final CompletableFuture<String> cf_s = completedFuture(s);
final CompletionStage<String> cf_s = completedStage(s);
final CompletableFuture<Double> cf_d = completedFuture(d);
final CompletableFuture<Integer> cf_an = completedFuture(anotherN);
final CompletionStage<Integer> cf_an = completedStage(anotherN);

try {
allTupleOf(cf_n, fail).get();
Expand Down Expand Up @@ -665,7 +676,7 @@ void test_allTupleOf_NotFastFail() throws Exception {

final CompletableFuture<String> cf_s = completedFuture(s);
final CompletableFuture<Double> cf_d = completedFuture(d);
final CompletableFuture<Integer> cf_an = completedFuture(anotherN);
final CompletionStage<Integer> cf_an = completedStage(anotherN);

try {
allTupleOf(incomplete, fail).get(100, TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit 3c9ffd1

Please sign in to comment.