diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 79ca16ae..fea4462a 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -208,11 +208,12 @@ public static <T> CompletableFuture<List<T>> mostResultsOfSuccess( if (cfs.length == 0) return CompletableFuture.completedFuture(arrayList()); if (cfs.length == 1) { - final CompletableFuture<T> f = copy(toCf(cfs[0])); + final CompletableFuture<T> f = copy(toNonMinCf(cfs[0])); return orTimeout(f, timeout, unit).handle((unused, ex) -> arrayList(getSuccessNow(f, valueIfNotSuccess))); } - final CompletableFuture<T>[] cfArray = f_toCfArray(cfs); + // MUST be Non-Minimal CF instances in order to read results, otherwise UnsupportedOperationException + final CompletableFuture<T>[] cfArray = f_toNonMinCfArray(cfs); return orTimeout(CompletableFuture.allOf(cfArray), timeout, unit) .handle((unused, ex) -> MGetSuccessNow(valueIfNotSuccess, cfArray)); } @@ -279,17 +280,31 @@ private static <T> CompletableFuture<T> f_cast(CompletableFuture<?> f) { * IGNORE the compile-time type check. */ private static <T> CompletableFuture<T>[] f_toCfArray(CompletionStage<? extends T>[] stages) { + return toCfArray0(stages, CompletableFutureUtils::toCf); + } + + /** + * Force converts {@link CompletionStage} array to {@link CompletableFuture} array, + * IGNORE the compile-time type check. + */ + private static <T> CompletableFuture<T>[] f_toNonMinCfArray(CompletionStage<? extends T>[] stages) { + return toCfArray0(stages, CompletableFutureUtils::toNonMinCf); + } + + private static <T> CompletableFuture<T>[] toCfArray0( + CompletionStage<? extends T>[] stages, + Function<CompletionStage<? extends T>, CompletableFuture<T>> converter) { requireNonNull(stages, "cfs is null"); @SuppressWarnings("unchecked") CompletableFuture<T>[] ret = new CompletableFuture[stages.length]; for (int i = 0; i < stages.length; i++) { - ret[i] = toCf(requireNonNull(stages[i], "cf" + (i + 1) + " is null")); + ret[i] = converter.apply(requireNonNull(stages[i], "cf" + (i + 1) + " is null")); } return ret; } /** - * Converts CompletionStage to CompletableFuture, reuse cf instance as much as possible. + * Converts CompletionStage to CompletableFuture, reuse cf instance as many as possible. * <p> * <strong>CAUTION:</strong> because reused the CF instances, * so the returned CF instances MUST NOT be written(e.g. {@link CompletableFuture#complete(Object)}). @@ -302,6 +317,24 @@ private static <T> CompletableFuture<T> toCf(CompletionStage<? extends T> s) { else return (CompletableFuture) s.toCompletableFuture(); } + /** + * Converts CompletionStage to Non-MinimalStage CompletableFuture, reuse cf instance as many as possible. + * <p> + * <strong>CAUTION:</strong> because reused the CF instances, + * so the returned CF instances MUST NOT be written(e.g. {@link CompletableFuture#complete(Object)}). + * Otherwise, the caller should defensive copy instead of writing it directly. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private static <T> CompletableFuture<T> toNonMinCf(CompletionStage<? extends T> s) { + final CompletableFuture<T> f; + if (s instanceof CompletableFuture) f = (CompletableFuture<T>) s; + else if (s instanceof Cffu) f = ((Cffu) s).cffuUnwrap(); + else return (CompletableFuture) s.toCompletableFuture(); + + return "java.util.concurrent.CompletableFuture$MinimalStage".equals(s.getClass().getName()) + ? f.toCompletableFuture() : f; + } + //////////////////////////////////////////////////////////////////////////////// //# anyOf* methods //////////////////////////////////////////////////////////////////////////////// diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 5f884be3..ec22a29c 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -33,14 +33,18 @@ class CompletableFutureUtilsTest { void test_allOf__success__trivial_case() throws Exception { assertEquals(Arrays.asList(n, n + 1, n + 2), allResultsOf( completedFuture(n), - completedFuture(n + 1), + completedStage(n + 1), completedFuture(n + 2) ).get()); - assertEquals(Arrays.asList(n, n + 1), allResultsOf(completedFuture(n), completedFuture(n + 1) + assertEquals(Arrays.asList(n, n + 1), allResultsOf( + completedFuture(n), + completedStage(n + 1) ).get()); assertEquals(Collections.singletonList(n), allResultsOf(completedFuture(n)).get()); + assertEquals(Collections.singletonList(n), allResultsOf(completedStage(n)).toCompletableFuture().get()); + assertThrows(UnsupportedOperationException.class, () -> allResultsOf(completedStage(n)).get()); assertEquals(Collections.emptyList(), allResultsOf().get()); @@ -48,28 +52,29 @@ void test_allOf__success__trivial_case() throws Exception { assertEquals(Arrays.asList(n, n + 1, n + 2), allResultsOfFastFail( completedFuture(n), - completedFuture(n + 1), + completedStage(n + 1), completedFuture(n + 2) ).get()); assertEquals(Arrays.asList(n, n + 1), allResultsOfFastFail( completedFuture(n), - completedFuture(n + 1) + completedStage(n + 1) ).get()); assertEquals(Collections.singletonList(n), allResultsOfFastFail(completedFuture(n)).get()); + assertEquals(Collections.singletonList(n), allResultsOfFastFail(completedStage(n)).toCompletableFuture().get()); + assertThrows(UnsupportedOperationException.class, () -> allResultsOfFastFail(completedStage(n)).get()); assertEquals(Collections.emptyList(), allResultsOfFastFail().get()); //////////////////////////////////////////////////////////////////////////////// - assertNull(allOfFastFail(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get()); - - assertNull(allOfFastFail(completedFuture(n), completedFuture(n + 1)).get()); - - assertNull(allOfFastFail(completedFuture(n)).get()); - - assertNull(allOfFastFail().get()); + Arrays.asList( + allOfFastFail(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)), + allOfFastFail(completedFuture(n), completedFuture(n + 1)), + allOfFastFail(completedFuture(n)), + allOfFastFail() + ).forEach(f -> assertNull(f.join())); } @Test @@ -319,6 +324,7 @@ void test_allOf__exceptionally() throws Exception { @Test void test_mostOf() throws Exception { final CompletableFuture<Integer> completed = completedFuture(n); + final CompletableFuture<Integer> completedStage = completedFuture(n); final CompletableFuture<Integer> failed = failedFuture(rte); final CompletableFuture<Integer> cancelled = createCancelledFuture(); final CompletableFuture<Integer> incomplete = createIncompleteFuture(); @@ -327,12 +333,14 @@ void test_mostOf() throws Exception { assertEquals(Collections.singletonList(n), mostResultsOfSuccess( 10, TimeUnit.MILLISECONDS, null, completed).get()); + assertEquals(Collections.singletonList(n), mostResultsOfSuccess( + 10, TimeUnit.MILLISECONDS, anotherN, completedStage).get()); assertEquals(Arrays.asList(n, null, null, null), mostResultsOfSuccess( 10, TimeUnit.MILLISECONDS, null, completed, failed, cancelled, incomplete ).get()); assertEquals(Arrays.asList(n, anotherN, anotherN, anotherN), mostResultsOfSuccess( - 10, TimeUnit.MILLISECONDS, anotherN, completed, failed, cancelled, incomplete + 10, TimeUnit.MILLISECONDS, anotherN, completedStage, failed, cancelled, incomplete ).get()); assertEquals(Arrays.asList(anotherN, anotherN, anotherN), mostResultsOfSuccess( @@ -367,10 +375,12 @@ void test_mostOf_wontModifyInputCf() throws Exception { @Test void test_anyOf__success__trivial_case() throws Exception { - assertEquals(n, anyOf(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get()); - assertEquals(n, anyOf(completedFuture(n), completedFuture(n + 1)).get()); + assertEquals(n, anyOf(completedFuture(n), completedStage(n + 1), completedFuture(n + 2)).get()); + assertEquals(n, anyOf(completedFuture(n), completedStage(n + 1)).get()); assertEquals(n, anyOf(completedFuture(n)).get()); + assertEquals(n, anyOf(completedStage(n)).toCompletableFuture().get()); + assertThrows(UnsupportedOperationException.class, () -> anyOf(completedStage(n)).get()); assertFalse(anyOf().isDone()); // success with incomplete CF @@ -378,20 +388,21 @@ void test_anyOf__success__trivial_case() throws Exception { //////////////////////////////////////// - assertEquals(n, anyOfSuccess(completedFuture(n), completedFuture(n + 1), completedFuture(n + 2)).get()); - assertEquals(n, anyOfSuccess(completedFuture(n), completedFuture(n + 1)).get()); + assertEquals(n, anyOfSuccess(completedFuture(n), completedStage(n + 1), completedFuture(n + 2)).get()); + assertEquals(n, anyOfSuccess(completedFuture(n), completedStage(n + 1)).get()); assertEquals(n, anyOfSuccess(completedFuture(n)).get()); - try { - anyOfSuccess().get(); + assertEquals(n, anyOfSuccess(completedStage(n)).toCompletableFuture().get()); + assertThrows(UnsupportedOperationException.class, () -> anyOfSuccess(completedStage(n)).get()); - fail(); - } catch (ExecutionException expected) { - assertSame(NoCfsProvidedException.class, expected.getCause().getClass()); - } + assertInstanceOf(NoCfsProvidedException.class, + assertThrows(ExecutionException.class, + () -> anyOfSuccess().get() + ).getCause()); // success with incomplete CF assertEquals(n, anyOfSuccess(createIncompleteFuture(), createIncompleteFuture(), completedFuture(n)).get()); + assertEquals(n, anyOfSuccess(createIncompleteFuture(), createIncompleteFuture(), completedStage(n)).get()); } @Test @@ -569,9 +580,9 @@ void test_anyOf__concurrent() throws Exception { @Test void test_allTupleOf() throws Exception { final CompletableFuture<Integer> cf_n = completedFuture(n); - final CompletableFuture<String> cf_s = completedFuture(s); + final CompletionStage<String> cf_s = completedStage(s); final CompletableFuture<Double> cf_d = completedFuture(d); - final CompletableFuture<Integer> cf_an = completedFuture(anotherN); + final CompletionStage<Integer> cf_an = completedStage(anotherN); final CompletableFuture<Integer> cf_nn = completedFuture(n + n); assertEquals(Tuple2.of(n, s), allTupleOf(cf_n, cf_s).get()); @@ -593,9 +604,9 @@ void test_allTupleOf_exceptionally() throws Exception { final CompletableFuture<Object> fail = failedFuture(rte); final CompletableFuture<Integer> cf_n = completedFuture(n); - final CompletableFuture<String> cf_s = completedFuture(s); + final CompletionStage<String> cf_s = completedStage(s); final CompletableFuture<Double> cf_d = completedFuture(d); - final CompletableFuture<Integer> cf_an = completedFuture(anotherN); + final CompletionStage<Integer> cf_an = completedStage(anotherN); try { allTupleOf(cf_n, fail).get(); @@ -665,7 +676,7 @@ void test_allTupleOf_NotFastFail() throws Exception { final CompletableFuture<String> cf_s = completedFuture(s); final CompletableFuture<Double> cf_d = completedFuture(d); - final CompletableFuture<Integer> cf_an = completedFuture(anotherN); + final CompletionStage<Integer> cf_an = completedStage(anotherN); try { allTupleOf(incomplete, fail).get(100, TimeUnit.MILLISECONDS);