Skip to content

Commit

Permalink
tupleMApplyMostSuccessAsync的实现
Browse files Browse the repository at this point in the history
  • Loading branch information
huhaosumail committed Jun 25, 2024
1 parent 503dec1 commit 74f1c94
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 0 deletions.
246 changes: 246 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,252 @@ public static <T, U1, U2, U3, U4, U5> CompletableFuture<Tuple5<U1, U2, U3, U4, U
return f_toCf(cf).thenCompose(v -> allTupleOf0(true, wrapFunctions(executor, v, fns)));
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2> CompletableFuture<Tuple2<U1, U2>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf,
long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1, Function<? super T, ? extends U2> fn2) {
return tupleMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, fn1, fn2);
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2> CompletableFuture<Tuple2<U1, U2>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, Executor executor, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1, Function<? super T, ? extends U2> fn2) {
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireArrayAndEleNonNull("fn", fn1, fn2);

return f_toCf(cf).thenCompose(v -> mostTupleOfSuccess0(executor, timeout, unit, CompletableFuture.supplyAsync(() -> fn1.apply(v), executor), CompletableFuture.supplyAsync(() -> fn2.apply(v), executor)));
}


/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3> CompletableFuture<Tuple3<U1, U2, U3>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf,
long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1, Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3) {
return tupleMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, fn1, fn2, fn3);
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3> CompletableFuture<Tuple3<U1, U2, U3>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, Executor executor, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1, Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3) {
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireArrayAndEleNonNull("fn", fn1, fn2, fn3);

return f_toCf(cf).thenCompose(v -> mostTupleOfSuccess0(executor, timeout, unit, CompletableFuture.supplyAsync(() -> fn1.apply(v), executor),
CompletableFuture.supplyAsync(() -> fn2.apply(v), executor), CompletableFuture.supplyAsync(() -> fn3.apply(v), executor)));
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param fn4 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @param <U4> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3, U4> CompletableFuture<Tuple4<U1, U2, U3, U4>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1,
Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3, Function<? super T, ? extends U4> fn4) {
return tupleMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, fn1, fn2, fn3, fn4);
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param fn4 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @param <U4> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3, U4> CompletableFuture<Tuple4<U1, U2, U3, U4>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, Executor executor, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1,
Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3, Function<? super T, ? extends U4> fn4) {
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireArrayAndEleNonNull("fn", fn1, fn2, fn3, fn4);

return f_toCf(cf).thenCompose(v -> mostTupleOfSuccess0(executor, timeout, unit, CompletableFuture.supplyAsync(() -> fn1.apply(v), executor),
CompletableFuture.supplyAsync(() -> fn2.apply(v), executor), CompletableFuture.supplyAsync(() -> fn3.apply(v), executor), CompletableFuture.supplyAsync(() -> fn4.apply(v), executor)));
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param fn4 the function to use to compute the values of the returned CompletableFuture
* @param fn5 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @param <U4> the function return type
* @param <U5> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3, U4, U5> CompletableFuture<Tuple5<U1, U2, U3, U4, U5>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1,
Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3, Function<? super T, ? extends U4> fn4,
Function<? super T, ? extends U5> fn5) {
return tupleMApplyMostSuccessAsync(cf, AsyncPoolHolder.ASYNC_POOL, timeout, unit, fn1, fn2, fn3, fn4, fn5);
}

/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
* with the most values obtained by calling the given Functions
* (with the given stage's result as the argument to the given functions)
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Functions arguments.
* <p>
* If the given function is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param fn1 the function to use to compute the values of the returned CompletableFuture
* @param fn2 the function to use to compute the values of the returned CompletableFuture
* @param fn3 the function to use to compute the values of the returned CompletableFuture
* @param fn4 the function to use to compute the values of the returned CompletableFuture
* @param fn5 the function to use to compute the values of the returned CompletableFuture
* @param <U1> the function return type
* @param <U2> the function return type
* @param <U3> the function return type
* @param <U4> the function return type
* @param <U5> the function return type
* @return the new CompletableFuture
*/
public static <T, U1, U2, U3, U4, U5> CompletableFuture<Tuple5<U1, U2, U3, U4, U5>> tupleMApplyMostSuccessAsync(
CompletionStage<? extends T> cf, Executor executor, long timeout, TimeUnit unit, Function<? super T, ? extends U1> fn1,
Function<? super T, ? extends U2> fn2, Function<? super T, ? extends U3> fn3, Function<? super T, ? extends U4> fn4,
Function<? super T, ? extends U5> fn5) {
requireNonNull(executor, "executor is null");
requireNonNull(unit, "unit is null");
requireArrayAndEleNonNull("fn", fn1, fn2, fn3, fn4, fn5);

return f_toCf(cf).thenCompose(v -> mostTupleOfSuccess0(executor, timeout, unit, CompletableFuture.supplyAsync(() -> fn1.apply(v), executor),
CompletableFuture.supplyAsync(() -> fn2.apply(v), executor), CompletableFuture.supplyAsync(() -> fn3.apply(v), executor),
CompletableFuture.supplyAsync(() -> fn4.apply(v), executor), CompletableFuture.supplyAsync(() -> fn5.apply(v), executor)));
}


/**
* Returns a new CompletableFuture that, when the given stage completes normally,
* is executed using the CompletableFuture's default asynchronous execution facility,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,45 @@ void test_tupleMSupplyMostSuccessAsync() throws Exception {
assertEquals(Tuple5.of(n, s, d, anotherN, n + n), tupleMSupplyMostSuccessAsync(defaultExecutor(), 100, TimeUnit.MILLISECONDS, supplier_n, supplier_s, supplier_d, supplier_an, supplier_nn).get());
}


@Test
void test_tupleMApplyMostSuccessAsync() throws Exception {
final CompletableFuture<Integer> completed = completedFuture(n);
final Function<Integer, Integer> function_n = (x) -> {
sleep(100);
return n;
};

final Function<Integer, String> function_s = (x) -> {
sleep(100);
return s;
};

final Function<Integer, Double> function_d = (x) -> {
sleep(100);
return d;
};
final Function<Integer, Integer> function_an = (x) -> {
sleep(100);
return anotherN;
};
final Function<Integer, Integer> function_nn = (x) -> {
sleep(100);
return n + n;
};
assertEquals(Tuple2.of(n, s), tupleMApplyMostSuccessAsync(completed, 100, TimeUnit.MILLISECONDS, function_n, function_s).get());
assertEquals(Tuple2.of(n, s), tupleMApplyMostSuccessAsync(completed, defaultExecutor(), 100, TimeUnit.MILLISECONDS, function_n, function_s).get());

assertEquals(Tuple3.of(n, s, d), tupleMApplyMostSuccessAsync(completed, 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d).get());
assertEquals(Tuple3.of(n, s, d), tupleMApplyMostSuccessAsync(completed, defaultExecutor(), 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d).get());

assertEquals(Tuple4.of(n, s, d, anotherN), tupleMApplyMostSuccessAsync(completed, 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d, function_an).get());
assertEquals(Tuple4.of(n, s, d, anotherN), tupleMApplyMostSuccessAsync(completed, defaultExecutor(), 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d, function_an).get());

assertEquals(Tuple5.of(n, s, d, anotherN, n + n), tupleMApplyMostSuccessAsync(completed, 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d, function_an, function_nn).get());
assertEquals(Tuple5.of(n, s, d, anotherN, n + n), tupleMApplyMostSuccessAsync(completed, defaultExecutor(), 100, TimeUnit.MILLISECONDS, function_n, function_s, function_d, function_an, function_nn).get());
}

@Test
void test_thenTupleMApplyAsync() throws Exception {
final CompletableFuture<Integer> completed = completedFuture(n);
Expand Down

0 comments on commit 74f1c94

Please sign in to comment.