Skip to content

Commit

Permalink
Cffu新增multi-actions的实现及相关单元测试
Browse files Browse the repository at this point in the history
  • Loading branch information
huhaosumail committed Jun 25, 2024
1 parent 4dc50e0 commit 4afd763
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 24 deletions.
85 changes: 84 additions & 1 deletion cffu-core/src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.Contract;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.*;

Expand Down Expand Up @@ -210,7 +211,89 @@ public Cffu<Void> thenRunAsync(Runnable action, Executor executor) {
// region## Then-Multi-Actions(thenM*) Methods
////////////////////////////////////////////////////////////

// TODO: TO BE implemented!!

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* with the values obtained by calling the given Suppliers
* in the <strong>same order</strong> of the given Suppliers arguments.
* <p>
* This method is the same as {@link #mSupplyAsync(Supplier[])} except for the fast-fail behavior.
*
* @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture
* @param <T> the suppliers' return type
* @return the new CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public <T> Cffu<List<T>> mSupplyFastFailAsync(Supplier<? extends T>... suppliers) {
return reset0(CompletableFutureUtils.mSupplyFastFailAsync(suppliers));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* with the most values obtained by calling the given Suppliers
* in the given time({@code timeout}, aka as many results as possible in the given time)
* in the <strong>same order</strong> of the given Suppliers arguments.
* <p>
* If the given supplier is successful in the given time, the return result is the completed value;
* Otherwise the given valueIfNotSuccess.
*
* @param valueIfNotSuccess the value to return if not completed successfully
* @param timeout how long to wait in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
* @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture
* @param <T> the suppliers' return type
* @return the new CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public <T> Cffu<List<T>> mSupplyMostSuccessAsync(
@Nullable T valueIfNotSuccess, long timeout, TimeUnit unit, Supplier<? extends T>... suppliers) {
return reset0(CompletableFutureUtils.mSupplyMostSuccessAsync(valueIfNotSuccess, timeout, unit, suppliers));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* with the values obtained by calling the given Suppliers
* in the <strong>same order</strong> of the given Suppliers arguments.
*
* @param suppliers the suppliers returning the value to be used to complete the returned CompletableFuture
* @param <T> the suppliers' return type
* @return the new CompletableFuture
* @see CompletableFuture#supplyAsync(Supplier)
*/
public <T> Cffu<List<T>> mSupplyAsync(Supplier<? extends T>... suppliers) {
return reset0(CompletableFutureUtils.mSupplyAsync(suppliers));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* after runs the given actions.
* <p>
* This method is the same as {@link #mRunAsync(Runnable...)} except for the fast-fail behavior.
*
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see CompletableFuture#runAsync(Runnable)
*/
public Cffu<Void> mRunFastFailAsync(Runnable... actions) {
return reset0(CompletableFutureUtils.mRunFastFailAsync(actions));
}

/**
* Returns a new CompletableFuture that is asynchronously completed
* by tasks running in the CompletableFuture's default asynchronous execution facility
* after runs the given actions.
*
* @param actions the actions to run before completing the returned CompletableFuture
* @return the new CompletableFuture
* @see CompletableFuture#runAsync(Runnable)
*/
public Cffu<Void> mRunAsync(Runnable... actions) {
return reset0(CompletableFutureUtils.mRunAsync(actions));
}

// endregion
////////////////////////////////////////////////////////////
Expand Down
45 changes: 22 additions & 23 deletions cffu-core/src/main/java/io/foldright/cffu/CffuFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public Cffu<Void> runAsync(Runnable action, Executor executor) {
* @see #allResultsOfFastFail(CompletionStage[])
* @see CompletableFuture#supplyAsync(Supplier)
*/
public <T> Cffu<List<T>> mSupplyFastFailAsync(Supplier<? extends T>... suppliers) {
public <T> Cffu<List<T>> mSupplyFastFailAsync(Supplier<? extends T>... suppliers) {
return create(CompletableFutureUtils.mSupplyFastFailAsync(suppliers));
}

Expand Down Expand Up @@ -223,7 +223,7 @@ public <T> Cffu<List<T>> mSupplyAsync(Supplier<? extends T>... suppliers) {
* @see #allOfFastFail(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable)
*/
public Cffu<Void> mRunFastFailAsync(Runnable... actions) {
public Cffu<Void> mRunFastFailAsync(Runnable... actions) {
return create(CompletableFutureUtils.mRunFastFailAsync(actions));
}

Expand All @@ -237,7 +237,7 @@ public Cffu<Void> mRunFastFailAsync(Runnable... actions) {
* @see #allOf(CompletionStage[])
* @see CompletableFuture#runAsync(Runnable)
*/
public Cffu<Void> mRunAsync(Runnable... actions) {
public Cffu<Void> mRunAsync(Runnable... actions) {
return create(CompletableFutureUtils.mRunAsync(actions));
}

Expand Down Expand Up @@ -273,7 +273,7 @@ public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyFastFailAsync(
*/
public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyFastFailAsync(
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2) {
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor,supplier1, supplier2));
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor, supplier1, supplier2));
}

/**
Expand Down Expand Up @@ -302,8 +302,8 @@ public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyFastFailAsync(
* @see #allResultsOfFastFail(CompletionStage[])
*/
public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyFastFailAsync(
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor,supplier1, supplier2, supplier3));
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor, supplier1, supplier2, supplier3));
}

/**
Expand Down Expand Up @@ -333,9 +333,9 @@ public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyFastFailAsync(
* @see #allResultsOfFastFail(CompletionStage[])
*/
public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyFastFailAsync(
Executor executor,Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4) {
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor,supplier1, supplier2, supplier3, supplier4));
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor, supplier1, supplier2, supplier3, supplier4));
}

/**
Expand Down Expand Up @@ -367,7 +367,7 @@ public <T1, T2, T3, T4, T5> Cffu<Tuple5<T1, T2, T3, T4, T5>> tupleMSupplyFastFai
public <T1, T2, T3, T4, T5> Cffu<Tuple5<T1, T2, T3, T4, T5>> tupleMSupplyFastFailAsync(
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3,
Supplier<? extends T4> supplier4, Supplier<? extends T5> supplier5) {
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor,supplier1, supplier2, supplier3, supplier4, supplier5));
return create(CompletableFutureUtils.tupleMSupplyFastFailAsync(executor, supplier1, supplier2, supplier3, supplier4, supplier5));
}


Expand Down Expand Up @@ -397,7 +397,7 @@ public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyMostSuccessAsync(
*/
public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyMostSuccessAsync(Executor executor,
long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(executor,timeout, unit, supplier1, supplier2));
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(executor, timeout, unit, supplier1, supplier2));
}

/**
Expand All @@ -413,7 +413,7 @@ public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyMostSuccessAsync(
long timeout, TimeUnit unit,
Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
timeout, unit, supplier1, supplier2, supplier3));
timeout, unit, supplier1, supplier2, supplier3));
}

/**
Expand All @@ -426,7 +426,7 @@ public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyMostSuccessAsync(
* @return the new Cffu
*/
public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyMostSuccessAsync(
Executor executor,long timeout, TimeUnit unit,
Executor executor, long timeout, TimeUnit unit,
Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
executor, timeout, unit, supplier1, supplier2, supplier3));
Expand All @@ -445,7 +445,7 @@ public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyMostSuccessAsyn
long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
timeout, unit, supplier1, supplier2, supplier3, supplier4));
timeout, unit, supplier1, supplier2, supplier3, supplier4));
}

/**
Expand All @@ -458,7 +458,7 @@ public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyMostSuccessAsyn
* @return the new Cffu
*/
public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyMostSuccessAsync(
Executor executor, long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Executor executor, long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
executor, timeout, unit, supplier1, supplier2, supplier3, supplier4));
Expand All @@ -477,7 +477,7 @@ public <T1, T2, T3, T4, T5> Cffu<Tuple5<T1, T2, T3, T4, T5>> tupleMSupplyMostSuc
long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4, Supplier<? extends T5> supplier5) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
timeout, unit, supplier1, supplier2, supplier3, supplier4, supplier5));
timeout, unit, supplier1, supplier2, supplier3, supplier4, supplier5));
}

/**
Expand All @@ -490,7 +490,7 @@ public <T1, T2, T3, T4, T5> Cffu<Tuple5<T1, T2, T3, T4, T5>> tupleMSupplyMostSuc
* @return the new Cffu
*/
public <T1, T2, T3, T4, T5> Cffu<Tuple5<T1, T2, T3, T4, T5>> tupleMSupplyMostSuccessAsync(
Executor executor,long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Executor executor, long timeout, TimeUnit unit, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4, Supplier<? extends T5> supplier5) {
return create(CompletableFutureUtils.tupleMSupplyMostSuccessAsync(
executor, timeout, unit, supplier1, supplier2, supplier3, supplier4, supplier5));
Expand Down Expand Up @@ -518,8 +518,8 @@ public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyAsync(
* @see #allResultsOf(CompletionStage[])
*/
public <T1, T2> Cffu<Tuple2<T1, T2>> tupleMSupplyAsync(
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2) {
return create(CompletableFutureUtils.tupleMSupplyAsync(executor,supplier1, supplier2));
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2) {
return create(CompletableFutureUtils.tupleMSupplyAsync(executor, supplier1, supplier2));
}

/**
Expand All @@ -544,8 +544,8 @@ public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyAsync(
* @see #allResultsOf(CompletionStage[])
*/
public <T1, T2, T3> Cffu<Tuple3<T1, T2, T3>> tupleMSupplyAsync(
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyAsync(executor,supplier1, supplier2, supplier3));
Executor executor, Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2, Supplier<? extends T3> supplier3) {
return create(CompletableFutureUtils.tupleMSupplyAsync(executor, supplier1, supplier2, supplier3));
}

/**
Expand Down Expand Up @@ -574,7 +574,7 @@ public <T1, T2, T3, T4> Cffu<Tuple4<T1, T2, T3, T4>> tupleMSupplyAsync(
Executor executor,
Supplier<? extends T1> supplier1, Supplier<? extends T2> supplier2,
Supplier<? extends T3> supplier3, Supplier<? extends T4> supplier4) {
return create(CompletableFutureUtils.tupleMSupplyAsync(executor,supplier1, supplier2, supplier3, supplier4));
return create(CompletableFutureUtils.tupleMSupplyAsync(executor, supplier1, supplier2, supplier3, supplier4));
}

/**
Expand Down Expand Up @@ -1181,8 +1181,7 @@ public static <T> CompletableFuture<T>[] cffuArrayUnwrap(Cffu<T>... cfs) {
*/
@Contract(pure = true)
public static <T> Cffu<T>[] cffuListToArray(List<Cffu<T>> cffuList) {
@SuppressWarnings("unchecked")
final Cffu<T>[] a = new Cffu[cffuList.size()];
@SuppressWarnings("unchecked") final Cffu<T>[] a = new Cffu[cffuList.size()];
return cffuList.toArray(a);
}

Expand Down
49 changes: 49 additions & 0 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import static io.foldright.test_utils.TestUtils.*;
import static java.util.function.Function.identity;
Expand All @@ -33,6 +36,52 @@ class CffuTest {

private static CffuFactory forbidObtrudeMethodsCffuFactory;

////////////////////////////////////////////////////////////////////////////////
//# multi-actions(M*) methods
////////////////////////////////////////////////////////////////////////////////

@Test
void test_mRun() throws Exception {
final Cffu<Integer> completed = cffuFactory.completedFuture(n);

final Runnable runnable = () -> sleep(100);

final long tick = System.currentTimeMillis();
@SuppressWarnings("unchecked")
Cffu<Void>[] cfs = new Cffu[]{
completed.mRunAsync(runnable, runnable),
completed.mRunFastFailAsync(runnable, runnable)
};

assertTrue(System.currentTimeMillis() - tick < 50);
for (Cffu<Void> cf : cfs) {
assertNull(cf.get());
}
}

@Test
void test_mSupply() throws Exception {
final Cffu<Integer> completed = cffuFactory.completedFuture(n);
final Supplier<Integer> supplier = () -> {
sleep(100);
return n;
};

final long tick = System.currentTimeMillis();

@SuppressWarnings("unchecked")
Cffu<List<Integer>>[] cfs = new Cffu[]{
completed.mSupplyAsync(supplier, supplier),
completed.mSupplyFastFailAsync(supplier, supplier),
completed.mSupplyMostSuccessAsync(anotherN, 500, TimeUnit.MILLISECONDS, supplier, supplier)
};

assertTrue(System.currentTimeMillis() - tick < 50);
for (Cffu<List<Integer>> cf : cfs) {
assertEquals(Arrays.asList(n, n), cf.get());
}
}

@Test
void test_thenTupleMApplyAsync() throws Exception {
final Cffu<Integer> completed = cffuFactory.completedFuture(n);
Expand Down

0 comments on commit 4afd763

Please sign in to comment.