From a476fdbd9a5b22ee3f47d02a4834c0c0eb8116ef Mon Sep 17 00:00:00 2001 From: Jerry Lee Date: Sat, 18 May 2024 22:41:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20methods=20`completeExceptio?= =?UTF-8?q?nallyAsync`=20in=20`CompletableFutureUtils`=20=F0=9F=92=A3=20?= =?UTF-8?q?=E2=9C=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TODO add method for cffu and CFExtensions --- .../src/main/java/io/foldright/cffu/Cffu.java | 25 ++++++ .../cffu/CompletableFutureUtils.java | 32 ++++++++ .../foldright/cffu/DelayExecutionHelpers.java | 76 +++++++++++++++++-- .../cffu/CompletableFutureUtilsTest.java | 22 +++--- .../java/io/foldright/test_utils/TestUtils.kt | 10 +++ .../kotlin/CompletableFutureExtensions.kt | 24 +++++- .../test/CompletableFutureExtensionsTest.kt | 14 +++- 7 files changed, 184 insertions(+), 19 deletions(-) diff --git a/cffu-core/src/main/java/io/foldright/cffu/Cffu.java b/cffu-core/src/main/java/io/foldright/cffu/Cffu.java index 8a672324..da00c0ff 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/Cffu.java +++ b/cffu-core/src/main/java/io/foldright/cffu/Cffu.java @@ -1842,6 +1842,31 @@ public boolean completeExceptionally(Throwable ex) { return cf.completeExceptionally(ex); } + /** + * If not already completed, completes given Cffu with the exception result + * of the given Supplier function invoked from an asynchronous task using the default executor. + * + * @param supplier a function returning the value to be used to complete given Cffu + * @return the given Cffu + */ + public Cffu completeExceptionallyAsync(Supplier supplier) { + return completeExceptionallyAsync(supplier, fac.defaultExecutor()); + } + + /** + * If not already completed, completes given Cffu with the exception result + * of the given Supplier function invoked from an asynchronous task using the given executor. + * + * @param supplier a function returning the value to be used to complete given Cffu + * @param executor the executor to use for asynchronous execution + * @return the given Cffu + */ + public Cffu completeExceptionallyAsync(Supplier supplier, Executor executor) { + checkMinimalStage(); + CompletableFutureUtils.completeExceptionallyAsync(cf, supplier, executor); + return this; + } + /** * If not already completed, completes this Cffu with a {@link CancellationException}. * Dependent Cffus that have not already completed will also complete exceptionally, diff --git a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java index 64fa5410..115e7f78 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java +++ b/cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java @@ -1653,6 +1653,7 @@ C completeAsync(C cf, Supplier supplier, Executor executor) { if (IS_JAVA9_PLUS) { cf.completeAsync(supplier, executor); } else { + requireNonNull(cf, "cf is null"); requireNonNull(supplier, "supplier is null"); requireNonNull(executor, "executor is null"); // below code is copied from CompletableFuture#completeAsync with small adoption @@ -1661,6 +1662,37 @@ C completeAsync(C cf, Supplier supplier, Executor executor) { return cf; } + /** + * If not already completed, completes given CompletableFuture with the exception result + * of the given Supplier function invoked from an asynchronous task using the default executor. + * + * @param supplier a function returning the value to be used to complete given CompletableFuture + * @return the given CompletableFuture + * @see CompletableFuture#completeExceptionally(Throwable) + */ + public static > + C completeExceptionallyAsync(C cf, Supplier supplier) { + return completeExceptionallyAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL); + } + + /** + * If not already completed, completes given CompletableFuture with the exception result + * of the given Supplier function invoked from an asynchronous task using the given executor. + * + * @param supplier a function returning the value to be used to complete given CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the given CompletableFuture + * @see CompletableFuture#completeExceptionally(Throwable) + */ + public static > + C completeExceptionallyAsync(C cf, Supplier supplier, Executor executor) { + requireNonNull(cf, "cf is null"); + requireNonNull(supplier, "supplier is null"); + requireNonNull(executor, "executor is null"); + executor.execute(new CfExCompleterBySupplier(cf, supplier)); + return cf; + } + //# Re-Config methods /** diff --git a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java index d5116ea9..054e6fff 100644 --- a/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java +++ b/cffu-core/src/main/java/io/foldright/cffu/DelayExecutionHelpers.java @@ -16,7 +16,10 @@ /** * Singleton delay scheduler, used only for starting and cancelling tasks + *

+ * code is copied from {@link CompletableFuture.Delayer} with small adoption. */ +@SuppressWarnings("JavadocReference") final class Delayer { private static final ScheduledThreadPoolExecutor delayer; @@ -67,7 +70,10 @@ private Delayer() { /** * An executor wrapper with delayed execution. + *

+ * code is copied from {@link CompletableFuture.DelayedExecutor} with small adoption. */ +@SuppressWarnings("JavadocReference") final class DelayedExecutor implements Executor { private final long delay; private final TimeUnit unit; @@ -91,8 +97,11 @@ public void execute(@NonNull Runnable r) { //////////////////////////////////////////////////////////////////////////////// /** - * Action to submit task(Runnable) to executor + * Action to submit task(Runnable) to executor. + *

+ * code is copied from {@link CompletableFuture.TaskSubmitter} with small adoption. */ +@SuppressWarnings("JavadocReference") final class TaskSubmitter implements Runnable { private final Executor executor; private final Runnable action; @@ -109,8 +118,11 @@ public void run() { } /** - * Action to cf.completeExceptionally with TimeoutException + * Action to cf.completeExceptionally with TimeoutException. + *

+ * code is copied from {@link CompletableFuture.Timeout} with small adoption. */ +@SuppressWarnings("JavadocReference") final class CfTimeout implements Runnable { private final CompletableFuture cf; @@ -126,8 +138,11 @@ public void run() { } /** - * Action to complete cf + * Action to complete cf. + *

+ * code is copied from {@link CompletableFuture.DelayedCompleter} with small adoption. */ +@SuppressWarnings("JavadocReference") final class CfCompleter implements Runnable { private final CompletableFuture cf; private final T value; @@ -144,12 +159,15 @@ public void run() { } /** - * Action to cancel unneeded scheduled task by Future (for example timeouts) + * Action to cancel unneeded scheduled task by Future (for example timeouts). + *

+ * code is copied from {@link CompletableFuture.Canceller} with small adoption. * * @see Delayer#delay(Runnable, long, TimeUnit) * @see Delayer#delayToTimoutCf(CompletableFuture, long, TimeUnit) * @see Delayer#delayToCompleteCf(CompletableFuture, Object, long, TimeUnit) */ +@SuppressWarnings("JavadocReference") final class FutureCanceller implements BiConsumer { private final Future f; @@ -165,9 +183,8 @@ public void accept(Object ignore, Throwable ex) { } /** - * code is copied from {@code CompletableFuture#AsyncSupply} with small adoption. + * code is copied from {@code CompletableFuture.AsyncSupply} with small adoption. */ -@SuppressWarnings("serial") @SuppressFBWarnings("SE_BAD_FIELD") final class CfCompleterBySupplier extends ForkJoinTask implements Runnable, CompletableFuture.AsynchronousCompletionTask { @@ -211,3 +228,50 @@ public void run() { } } } + +/** + * code is adopted from {@code CompletableFuture.AsyncSupply}. + */ +@SuppressFBWarnings("SE_BAD_FIELD") +final class CfExCompleterBySupplier extends ForkJoinTask + implements Runnable, CompletableFuture.AsynchronousCompletionTask { + private CompletableFuture dep; + private Supplier fn; + + CfExCompleterBySupplier(CompletableFuture dep, Supplier fn) { + this.dep = dep; + this.fn = fn; + } + + @Override + public Void getRawResult() { + return null; + } + + @Override + public void setRawResult(Void v) { + } + + @Override + public boolean exec() { + run(); + return false; + } + + @Override + public void run() { + CompletableFuture d; + Supplier f; + if ((d = dep) != null && (f = fn) != null) { + dep = null; + fn = null; + if (!d.isDone()) { + try { + d.completeExceptionally(f.get()); + } catch (Throwable ex) { + d.completeExceptionally(ex); + } + } + } + } +} diff --git a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java index 8ec807aa..9301163a 100644 --- a/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java +++ b/cffu-core/src/test/java/io/foldright/cffu/CompletableFutureUtilsTest.java @@ -982,7 +982,7 @@ void test_exceptionallyCompose() throws Exception { } @Test - @SuppressWarnings({"ResultOfMethodCallIgnored", "ThrowableNotThrown"}) + @SuppressWarnings("ThrowableNotThrown") void test_read() { final CompletableFuture completed = completedFuture(n); final FutureTask completedTask = new FutureTask<>(() -> n); @@ -1090,17 +1090,21 @@ void test_read() { void test_write() throws Exception { assertEquals(n, completeAsync(createIncompleteFuture(), () -> n).get()); assertEquals(n, completeAsync(createIncompleteFuture(), () -> n, commonPool()).get()); - try { - completeAsync(createIncompleteFuture(), () -> { - throw rte; - }).get(); - fail(); - } catch (ExecutionException expected) { - assertSame(rte, expected.getCause()); - } + assertSame(rte, assertThrows(ExecutionException.class, () -> + completeAsync(createIncompleteFuture(), () -> { + throw rte; + }).get() + ).getCause()); CompletableFuture completed = completedFuture(n); assertEquals(n, completeAsync(completed, () -> anotherN).get()); + + //////////////////////////////////////// + + assertSame(rte, assertThrows(ExecutionException.class, () -> + completeExceptionallyAsync(createIncompleteFuture(), () -> rte).get() + ).getCause()); + assertEquals(n, completeExceptionallyAsync(completedFuture(n), () -> rte).get()); } @Test diff --git a/cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt b/cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt index 7166d652..34ca1d32 100644 --- a/cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt +++ b/cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt @@ -369,6 +369,14 @@ private fun Cffu.shouldMinCffu(recursive: Boolean = false) { @Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS") completeExceptionally(null) }.message shouldBe "unsupported because this is a minimal stage" + shouldThrow { + @Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS") + completeExceptionallyAsync(null) + }.message shouldBe "unsupported because this is a minimal stage" + shouldThrow { + @Suppress("NULLABILITY_MISMATCH_BASED_ON_JAVA_ANNOTATIONS") + completeExceptionallyAsync(null, null) + }.message shouldBe "unsupported because this is a minimal stage" shouldThrow { cancel(false) }.message shouldBe "unsupported because this is a minimal stage" @@ -475,6 +483,8 @@ private fun Cffu.shouldNotMinCffu(recursive: Boolean = false) { completeAsync { null }.shouldNotMinCffu() completeAsync({ null }, blackHoleExecutor).shouldNotMinCffu() completeExceptionally(RuntimeException()) + completeExceptionallyAsync { null }.shouldNotMinCffu() + completeExceptionallyAsync({ null }, blackHoleExecutor).shouldNotMinCffu() cancel(false) } diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt index 70ab2c13..c8d8f7c3 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt @@ -1113,7 +1113,6 @@ fun CompletableFuture.resultNow(): T = * @return the exception thrown by the task * @throws IllegalStateException if the task has not completed, the task completed normally, * or the task was cancelled - * @see CompletableFuture.resultNow */ fun CompletableFuture<*>.exceptionNow(): Throwable = CompletableFutureUtils.exceptionNow(this) @@ -1152,6 +1151,29 @@ fun > C.completeAsync(supplier: Supplier): fun > C.completeAsync(supplier: Supplier, executor: Executor): C = CompletableFutureUtils.completeAsync(this, supplier, executor) +/** + * If not already completed, completes given CompletableFuture with the exception result + * of the given Supplier function invoked from an asynchronous task using the default executor. + * + * @param supplier a function returning the value to be used to complete given CompletableFuture + * @return the given CompletableFuture + * @see CompletableFuture.completeExceptionally + */ +fun > C.completeExceptionallyAsync(supplier: Supplier): C = + CompletableFutureUtils.completeExceptionallyAsync(this, supplier) + +/** + * If not already completed, completes given CompletableFuture with the exception result + * of the given Supplier function invoked from an asynchronous task using the given executor. + * + * @param supplier a function returning the value to be used to complete given CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the given CompletableFuture + * @see CompletableFuture.completeExceptionally + */ +fun > C.completeExceptionallyAsync(supplier: Supplier, executor: Executor): C = + CompletableFutureUtils.completeExceptionallyAsync(this, supplier, executor) + //# Re-Config methods /** diff --git a/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt b/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt index 5ed99355..d9b82b2b 100644 --- a/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt +++ b/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt @@ -8,6 +8,7 @@ import io.foldright.cffu.tuple.Tuple2 import io.foldright.cffu.tuple.Tuple3 import io.foldright.cffu.tuple.Tuple4 import io.foldright.cffu.tuple.Tuple5 +import io.foldright.test_utils.createIncompleteFuture import io.foldright.test_utils.sleep import io.foldright.test_utils.testThreadPoolExecutor import io.kotest.assertions.throwables.shouldThrow @@ -406,9 +407,16 @@ class CompletableFutureExtensionsTest : FunSpec({ } test("write methods") { - val cf = CompletableFuture() - cf.completeAsync { n }.get() shouldBe n - cf.completeAsync({ n }, testThreadPoolExecutor).get() shouldBe n + createIncompleteFuture().completeAsync { n }.get() shouldBe n + createIncompleteFuture().completeAsync({ n }, testThreadPoolExecutor).get() shouldBe n + + shouldThrow { + createIncompleteFuture().completeExceptionallyAsync { rte }.get() + }.cause shouldBeSameInstanceAs rte + + shouldThrow { + createIncompleteFuture().completeExceptionallyAsync({ rte }, testThreadPoolExecutor).get() + }.cause shouldBeSameInstanceAs rte } test("re_config") {