Skip to content

Commit

Permalink
feat: implement methods completeExceptionallyAsync in `CompletableF…
Browse files Browse the repository at this point in the history
…utureUtils` 💣 ✨

TODO add method for cffu and CFExtensions
  • Loading branch information
oldratlee committed May 24, 2024
1 parent 5a5ad44 commit c01e314
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 19 deletions.
25 changes: 25 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,31 @@ public boolean completeExceptionally(Throwable ex) {
return cf.completeExceptionally(ex);
}

/**
* If not already completed, completes given Cffu with the exception result
* of the given Supplier function invoked from an asynchronous task using the default executor.
*
* @param supplier a function returning the value to be used to complete given Cffu
* @return the given Cffu
*/
public Cffu<T> completeExceptionallyAsync(Supplier<? extends Throwable> supplier) {
return completeExceptionallyAsync(supplier, fac.defaultExecutor());
}

/**
* If not already completed, completes given Cffu with the exception result
* of the given Supplier function invoked from an asynchronous task using the given executor.
*
* @param supplier a function returning the value to be used to complete given Cffu
* @param executor the executor to use for asynchronous execution
* @return the given Cffu
*/
public Cffu<T> completeExceptionallyAsync(Supplier<? extends Throwable> supplier, Executor executor) {
checkMinimalStage();
CompletableFutureUtils.completeExceptionallyAsync(cf, supplier, executor);
return this;
}

/**
* If not already completed, completes this Cffu with a {@link CancellationException}.
* Dependent Cffus that have not already completed will also complete exceptionally,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,7 @@ C completeAsync(C cf, Supplier<? extends T> supplier, Executor executor) {
if (IS_JAVA9_PLUS) {
cf.completeAsync(supplier, executor);
} else {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
Expand All @@ -1661,6 +1662,37 @@ C completeAsync(C cf, Supplier<? extends T> supplier, Executor executor) {
return cf;
}

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the default executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return the given CompletableFuture
* @see CompletableFuture#completeExceptionally(Throwable)
*/
public static <C extends CompletableFuture<?>>
C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier) {
return completeExceptionallyAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL);
}

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the given executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the given CompletableFuture
* @see CompletableFuture#completeExceptionally(Throwable)
*/
public static <C extends CompletableFuture<?>>
C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
executor.execute(new CfExCompleterBySupplier(cf, supplier));
return cf;
}

//# Re-Config methods

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

/**
* Singleton delay scheduler, used only for starting and cancelling tasks
* <p>
* code is copied from {@link CompletableFuture.Delayer} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class Delayer {
private static final ScheduledThreadPoolExecutor delayer;

Expand Down Expand Up @@ -67,7 +70,10 @@ private Delayer() {

/**
* An executor wrapper with delayed execution.
* <p>
* code is copied from {@link CompletableFuture.DelayedExecutor} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class DelayedExecutor implements Executor {
private final long delay;
private final TimeUnit unit;
Expand All @@ -91,8 +97,11 @@ public void execute(@NonNull Runnable r) {
////////////////////////////////////////////////////////////////////////////////

/**
* Action to submit task(Runnable) to executor
* Action to submit task(Runnable) to executor.
* <p>
* code is copied from {@link CompletableFuture.TaskSubmitter} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class TaskSubmitter implements Runnable {
private final Executor executor;
private final Runnable action;
Expand All @@ -109,8 +118,11 @@ public void run() {
}

/**
* Action to cf.completeExceptionally with TimeoutException
* Action to cf.completeExceptionally with TimeoutException.
* <p>
* code is copied from {@link CompletableFuture.Timeout} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class CfTimeout implements Runnable {
private final CompletableFuture<?> cf;

Expand All @@ -126,8 +138,11 @@ public void run() {
}

/**
* Action to complete cf
* Action to complete cf.
* <p>
* code is copied from {@link CompletableFuture.DelayedCompleter} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class CfCompleter<T> implements Runnable {
private final CompletableFuture<? super T> cf;
private final T value;
Expand All @@ -144,12 +159,15 @@ public void run() {
}

/**
* Action to cancel unneeded scheduled task by Future (for example timeouts)
* Action to cancel unneeded scheduled task by Future (for example timeouts).
* <p>
* code is copied from {@link CompletableFuture.Canceller} with small adoption.
*
* @see Delayer#delay(Runnable, long, TimeUnit)
* @see Delayer#delayToTimoutCf(CompletableFuture, long, TimeUnit)
* @see Delayer#delayToCompleteCf(CompletableFuture, Object, long, TimeUnit)
*/
@SuppressWarnings("JavadocReference")
final class FutureCanceller implements BiConsumer<Object, Throwable> {
private final Future<?> f;

Expand All @@ -165,9 +183,8 @@ public void accept(Object ignore, Throwable ex) {
}

/**
* code is copied from {@code CompletableFuture#AsyncSupply} with small adoption.
* code is copied from {@code CompletableFuture.AsyncSupply} with small adoption.
*/
@SuppressWarnings("serial")
@SuppressFBWarnings("SE_BAD_FIELD")
final class CfCompleterBySupplier<T> extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
Expand Down Expand Up @@ -211,3 +228,50 @@ public void run() {
}
}
}

/**
* code is adopted from {@code CompletableFuture.AsyncSupply}.
*/
@SuppressFBWarnings("SE_BAD_FIELD")
final class CfExCompleterBySupplier extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
private CompletableFuture<?> dep;
private Supplier<? extends Throwable> fn;

CfExCompleterBySupplier(CompletableFuture<?> dep, Supplier<? extends Throwable> fn) {
this.dep = dep;
this.fn = fn;
}

@Override
public Void getRawResult() {
return null;
}

@Override
public void setRawResult(Void v) {
}

@Override
public boolean exec() {
run();
return false;
}

@Override
public void run() {
CompletableFuture<?> d;
Supplier<? extends Throwable> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null;
fn = null;
if (!d.isDone()) {
try {
d.completeExceptionally(f.get());
} catch (Throwable ex) {
d.completeExceptionally(ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ void test_exceptionallyCompose() throws Exception {
}

@Test
@SuppressWarnings({"ResultOfMethodCallIgnored", "ThrowableNotThrown"})
@SuppressWarnings("ThrowableNotThrown")
void test_read() {
final CompletableFuture<Integer> completed = completedFuture(n);
final FutureTask<Integer> completedTask = new FutureTask<>(() -> n);
Expand Down Expand Up @@ -1090,17 +1090,21 @@ void test_read() {
void test_write() throws Exception {
assertEquals(n, completeAsync(createIncompleteFuture(), () -> n).get());
assertEquals(n, completeAsync(createIncompleteFuture(), () -> n, commonPool()).get());
try {
completeAsync(createIncompleteFuture(), () -> {
throw rte;
}).get();
fail();
} catch (ExecutionException expected) {
assertSame(rte, expected.getCause());
}
assertSame(rte, assertThrows(ExecutionException.class, () ->
completeAsync(createIncompleteFuture(), () -> {
throw rte;
}).get()
).getCause());

CompletableFuture<Integer> completed = completedFuture(n);
assertEquals(n, completeAsync(completed, () -> anotherN).get());

////////////////////////////////////////

assertSame(rte, assertThrows(ExecutionException.class, () ->
completeExceptionallyAsync(createIncompleteFuture(), () -> rte).get()
).getCause());
assertEquals(n, completeExceptionallyAsync(completedFuture(n), () -> rte).get());
}

@Test
Expand Down
10 changes: 10 additions & 0 deletions cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,14 @@ private fun <T> Cffu<T>.shouldMinCffu(recursive: Boolean = false) {
@Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS")
completeExceptionally(null)
}.message shouldBe "unsupported because this is a minimal stage"
shouldThrow<UnsupportedOperationException> {
@Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS")
completeExceptionallyAsync(null)
}.message shouldBe "unsupported because this is a minimal stage"
shouldThrow<UnsupportedOperationException> {
@Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS")
completeExceptionallyAsync(null, null)
}.message shouldBe "unsupported because this is a minimal stage"
shouldThrow<UnsupportedOperationException> {
cancel(false)
}.message shouldBe "unsupported because this is a minimal stage"
Expand Down Expand Up @@ -475,6 +483,8 @@ private fun <T> Cffu<T>.shouldNotMinCffu(recursive: Boolean = false) {
completeAsync { null }.shouldNotMinCffu()
completeAsync({ null }, blackHoleExecutor).shouldNotMinCffu()
completeExceptionally(RuntimeException())
completeExceptionallyAsync { null }.shouldNotMinCffu()
completeExceptionallyAsync({ null }, blackHoleExecutor).shouldNotMinCffu()
cancel(false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,6 @@ fun <T> CompletableFuture<T>.resultNow(): T =
* @return the exception thrown by the task
* @throws IllegalStateException if the task has not completed, the task completed normally,
* or the task was cancelled
* @see CompletableFuture.resultNow
*/
fun CompletableFuture<*>.exceptionNow(): Throwable =
CompletableFutureUtils.exceptionNow(this)
Expand Down Expand Up @@ -1152,6 +1151,29 @@ fun <T, C : CompletableFuture<in T>> C.completeAsync(supplier: Supplier<out T>):
fun <T, C : CompletableFuture<in T>> C.completeAsync(supplier: Supplier<out T>, executor: Executor): C =
CompletableFutureUtils.completeAsync(this, supplier, executor)

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the default executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return the given CompletableFuture
* @see CompletableFuture.completeExceptionally
*/
fun <C : CompletableFuture<*>> C.completeExceptionallyAsync(supplier: Supplier<out Throwable>): C =
CompletableFutureUtils.completeExceptionallyAsync(this, supplier)

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the given executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the given CompletableFuture
* @see CompletableFuture.completeExceptionally
*/
fun <C : CompletableFuture<*>> C.completeExceptionallyAsync(supplier: Supplier<out Throwable>, executor: Executor): C =
CompletableFutureUtils.completeExceptionallyAsync(this, supplier, executor)

//# Re-Config methods

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.foldright.cffu.tuple.Tuple2
import io.foldright.cffu.tuple.Tuple3
import io.foldright.cffu.tuple.Tuple4
import io.foldright.cffu.tuple.Tuple5
import io.foldright.test_utils.createIncompleteFuture
import io.foldright.test_utils.sleep
import io.foldright.test_utils.testThreadPoolExecutor
import io.kotest.assertions.throwables.shouldThrow
Expand Down Expand Up @@ -406,9 +407,16 @@ class CompletableFutureExtensionsTest : FunSpec({
}

test("write methods") {
val cf = CompletableFuture<Int>()
cf.completeAsync { n }.get() shouldBe n
cf.completeAsync({ n }, testThreadPoolExecutor).get() shouldBe n
createIncompleteFuture<Int>().completeAsync { n }.get() shouldBe n
createIncompleteFuture<Int>().completeAsync({ n }, testThreadPoolExecutor).get() shouldBe n

shouldThrow<ExecutionException> {
createIncompleteFuture<Int>().completeExceptionallyAsync { rte }.get()
}.cause shouldBeSameInstanceAs rte

shouldThrow<ExecutionException> {
createIncompleteFuture<Int>().completeExceptionallyAsync({ rte }, testThreadPoolExecutor).get()
}.cause shouldBeSameInstanceAs rte
}

test("re_config") {
Expand Down

0 comments on commit c01e314

Please sign in to comment.