diff --git a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt index 6cc0fd52..af22da67 100644 --- a/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt +++ b/cffu-kotlin/src/main/java/io/foldright/cffu/kotlin/CompletableFutureExtensions.kt @@ -12,8 +12,8 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage import java.util.concurrent.Executor import java.util.concurrent.TimeUnit +import java.util.function.* import java.util.function.Function -import java.util.function.Supplier //////////////////////////////////////////////////////////////////////////////// @@ -192,7 +192,6 @@ fun Collection>.allOfFastFailCompletableFuture(): Completab fun Array>.allOfFastFailCompletableFuture(): CompletableFuture = CompletableFutureUtils.allOfFastFail(*this) - //////////////////////////////////////// //# anyOf* methods // @@ -396,6 +395,338 @@ fun CompletableFuture.combineFastFail( ): CompletableFuture> = CompletableFutureUtils.combineFastFail(this, cf2, cf3, cf4, cf5) +//////////////////////////////////////////////////////////////////////////////// +//# `then both(binary input)` methods with fast-fail support: +// +// - runAfterBothFastFail*(Runnable): Void, Void -> Void +// - thenAcceptBothFastFail*(BiConsumer): (T1, T2) -> Void +// - thenCombineFastFail*(BiFunction): (T1, T2) -> U +//////////////////////////////////////////////////////////////////////////////// + +/** + * Returns a new CompletableFuture that, when two given stages both complete normally, executes the given action. + * if any of the given stage complete exceptionally, then the returned CompletableFuture also does so + * **without** waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.runAfterBoth] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterBoth + */ +fun CompletionStage<*>.runAfterBothFastFail(other: CompletionStage<*>, action: Runnable): CompletableFuture = + CompletableFutureUtils.runAfterBothFastFail(this, other, action) + +/** + * Returns a new CompletableFuture that, when two given stages both complete normally, + * executes the given action using CompletableFuture's default asynchronous execution facility. + * if any of the given stage complete exceptionally, then the returned CompletableFuture also does so + * **without** waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.runAfterBothAsync] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterBothAsync + */ +fun CompletionStage<*>.runAfterBothFastFailAsync(other: CompletionStage<*>, action: Runnable): CompletableFuture = + CompletableFutureUtils.runAfterBothFastFailAsync(this, other, action) + +/** + * Returns a new CompletableFuture that, when two given stages both complete normally, + * executes the given action using the supplied executor. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so **without** waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.runAfterBothAsync] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterBothAsync + */ +fun CompletionStage<*>.runAfterBothFastFailAsync( + other: CompletionStage<*>, action: Runnable, executor: Executor +): CompletableFuture = + CompletableFutureUtils.runAfterBothFastFailAsync(this, other, action, executor) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed with the two results as arguments to the supplied action. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenAcceptBoth] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenAcceptBoth + */ +fun CompletionStage.thenAcceptBothFastFail( + cf2: CompletionStage, action: BiConsumer +): CompletableFuture = + CompletableFutureUtils.thenAcceptBothFastFail(this, cf2, action) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed using CompletableFuture's default asynchronous execution facility, + * with the two results as arguments to the supplied action. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenAcceptBothAsync] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenAcceptBothAsync + */ +fun CompletionStage.thenAcceptBothFastFailAsync( + cf2: CompletionStage, action: BiConsumer +): CompletableFuture = + CompletableFutureUtils.thenAcceptBothFastFailAsync(this, cf2, action) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed using the supplied executor, + * with the two results as arguments to the supplied action. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenAcceptBothAsync] except for the fast-fail behavior. + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenAcceptBothAsync + */ +fun CompletionStage.thenAcceptBothFastFailAsync( + cf2: CompletionStage, action: BiConsumer, executor: Executor +): CompletableFuture = + CompletableFutureUtils.thenAcceptBothFastFailAsync(this, cf2, action, executor) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed with the two results as arguments to the supplied function. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenCombine] except for the fast-fail behavior. + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenCombine + */ +fun CompletionStage.thenCombineFastFail( + cf2: CompletionStage, fn: BiFunction +): CompletableFuture = + CompletableFutureUtils.thenCombineFastFail(this, cf2, fn) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed using CompletableFuture's default asynchronous execution facility, + * with the two results as arguments to the supplied function. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenCombineAsync] except for the fast-fail behavior. + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenCombineAsync + */ +fun CompletionStage.thenCombineFastFailAsync( + cf2: CompletionStage, fn: BiFunction +): CompletableFuture = + CompletableFutureUtils.thenCombineFastFailAsync(this, cf2, fn) + +/** + * Returns a new CompletableFuture that, when tow given stage both complete normally, + * is executed using the supplied executor, + * with the two results as arguments to the supplied function. + * if any of the given stage complete exceptionally, then the returned CompletableFuture + * also does so *without* waiting other incomplete given CompletionStage, + * with a CompletionException holding this exception as its cause. + * + * This method is the same as [CompletableFuture.thenCombineAsync] except for the fast-fail behavior. + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.thenCombineAsync + */ +fun CompletionStage.thenCombineFastFailAsync( + cf2: CompletionStage, fn: BiFunction, executor: Executor +): CompletableFuture = + CompletableFutureUtils.thenCombineFastFailAsync(this, cf2, fn, executor) + +//////////////////////////////////////////////////////////////////////////////// +//# `then either(binary input)` methods with either(any)-success support: +// +// - runAfterEitherSuccess*(Runnable): Void, Void -> Void +// - acceptEitherSuccess*(Consumer): (T, T) -> Void +// - applyToEitherSuccess*(Function): (T, T) -> U +//////////////////////////////////////////////////////////////////////////////// + +/** + * Returns a new CompletableFuture that, when either given stage success, executes the given action. + * Otherwise, all two given CompletionStage complete exceptionally, + * the returned CompletableFuture also does so, with a CompletionException holding + * an exception from any of the given CompletionStage as its cause. + * + * This method is the same as [CompletableFuture.runAfterEither] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterEither + */ +fun CompletionStage<*>.runAfterEitherSuccess(cf2: CompletionStage<*>, action: Runnable): CompletableFuture = + CompletableFutureUtils.runAfterEitherSuccess(this, cf2, action) + +/** + * Returns a new CompletableFuture that, when either given stage success, executes the given action + * using CompletableFuture's default asynchronous execution facility. + * Otherwise, all two given CompletionStage complete exceptionally, + * the returned CompletableFuture also does so, with a CompletionException holding + * an exception from any of the given CompletionStage as its cause. + * + * This method is the same as [CompletableFuture.runAfterEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterEitherAsync + */ +fun CompletionStage<*>.runAfterEitherSuccessAsync(cf2: CompletionStage<*>, action: Runnable): CompletableFuture = + CompletableFutureUtils.runAfterEitherSuccessAsync(this, cf2, action) + +/** + * Returns a new CompletableFuture that, when either given stage success, executes the given action + * using the supplied executor. + * Otherwise, all two given CompletionStage complete exceptionally, + * the returned CompletableFuture also does so, with a CompletionException holding + * an exception from any of the given CompletionStage as its cause. + * + * This method is the same as [CompletableFuture.runAfterEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.runAfterEitherAsync + */ +fun CompletionStage<*>.runAfterEitherSuccessAsync( + cf2: CompletionStage<*>, action: Runnable, executor: Executor +): CompletableFuture = + CompletableFutureUtils.runAfterEitherSuccessAsync(this, cf2, action, executor) + +/** + * Returns a new CompletableFuture that, when either given stage success, + * is executed with the corresponding result as argument to the supplied action. + * + * This method is the same as [CompletableFuture.acceptEither] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.acceptEither + */ +fun CompletionStage.acceptEitherSuccess( + cf2: CompletionStage, action: Consumer +): CompletableFuture = + CompletableFutureUtils.acceptEitherSuccess(this, cf2, action) + +/** + * Returns a new CompletionStage that, when either given stage success, + * is executed using this stage's default asynchronous execution facility, + * with the corresponding result as argument to the supplied action. + * + * This method is the same as [CompletableFuture.acceptEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @return the new CompletableFuture + * @see CompletableFuture.acceptEitherAsync(CompletionStage, Consumer) + */ +fun CompletionStage.acceptEitherSuccessAsync( + cf2: CompletionStage, action: Consumer +): CompletableFuture = + CompletableFutureUtils.acceptEitherSuccessAsync(this, cf2, action) + +/** + * Returns a new CompletionStage that, when either given stage success, + * is executed using the supplied executor, with the corresponding result as argument to the supplied action. + * + * This method is the same as [CompletableFuture.acceptEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param action the action to perform before completing the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @return the new CompletableFuture + * @see CompletableFuture.acceptEitherAsync(CompletionStage, Consumer, Executor) + */ +fun CompletionStage.acceptEitherSuccessAsync( + cf2: CompletionStage, action: Consumer, executor: Executor +): CompletableFuture = + CompletableFutureUtils.acceptEitherSuccessAsync(this, cf2, action, executor) + +/** + * Returns a new CompletionStage that, when either given stage success, + * is executed with the corresponding result as argument to the supplied function. + * + * This method is the same as [CompletableFuture.applyToEither] + * except for the either-**success** behavior(not either-**complete**). + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see CompletableFuture.applyToEither + */ +fun CompletionStage.applyToEitherSuccess( + cf2: CompletionStage, fn: Function +): CompletableFuture = + CompletableFutureUtils.applyToEitherSuccess(this, cf2, fn) + +/** + * Returns a new CompletionStage that, when either given stage success, + * is executed using this stage's default asynchronous execution facility, + * with the corresponding result as argument to the supplied function. + * + * This method is the same as [CompletableFuture.applyToEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @param the function's return type + * @return the new CompletableFuture + * @see CompletableFuture.applyToEitherAsync + */ +fun CompletionStage.applyToEitherSuccessAsync( + cf2: CompletionStage, fn: Function +): CompletableFuture = + CompletableFutureUtils.applyToEitherSuccessAsync(this, cf2, fn) + +/** + * Returns a new CompletionStage that, when either given stage success, + * is executed using the supplied executor, with the corresponding result as argument to the supplied function. + * + * This method is the same as [CompletableFuture.applyToEitherAsync] + * except for the either-**success** behavior(not either-**complete**). + * + * @param fn the function to use to compute the value of the returned CompletableFuture + * @param executor the executor to use for asynchronous execution + * @param the function's return type + * @return the new CompletableFuture + * @see CompletableFuture.applyToEitherAsync + */ +fun CompletionStage.applyToEitherSuccessAsync( + cf2: CompletionStage, fn: Function, executor: Executor +): CompletableFuture = + CompletableFutureUtils.applyToEitherSuccessAsync(this, cf2, fn, executor) + //////////////////////////////////////////////////////////////////////////////// //# Backport CF instance methods // compatibility for low Java version @@ -554,7 +885,7 @@ fun CompletableFuture.resultNow(): T = * @return the exception thrown by the task * @throws IllegalStateException if the task has not completed, the task completed normally, * or the task was cancelled - * @see CompletableFuture#resultNow() + * @see CompletableFuture.resultNow() */ fun CompletableFuture.exceptionNow(): Throwable = CompletableFutureUtils.exceptionNow(this) diff --git a/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt b/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt index 95ac14fa..db75c724 100644 --- a/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt +++ b/cffu-kotlin/src/test/java/io/foldright/cffu/test/CompletableFutureExtensionsTest.kt @@ -8,6 +8,7 @@ import io.foldright.cffu.tuple.Tuple2 import io.foldright.cffu.tuple.Tuple3 import io.foldright.cffu.tuple.Tuple4 import io.foldright.cffu.tuple.Tuple5 +import io.foldright.test_utils.sleep import io.foldright.test_utils.testThreadPoolExecutor import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.FunSpec @@ -21,6 +22,9 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.ExecutionException import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException +import java.util.function.BiConsumer +import java.util.function.Consumer +import java.util.function.Function class CompletableFutureExtensionsTest : FunSpec({ test("allOf*") { @@ -206,7 +210,6 @@ class CompletableFutureExtensionsTest : FunSpec({ shouldThrow { arrayOf>().anyOfSuccessCompletableFuture().await() } - } //////////////////////////////////////// @@ -261,6 +264,74 @@ class CompletableFutureExtensionsTest : FunSpec({ ).get() shouldBe Tuple5.of(n, s, d, anotherN, n + n) } + + //////////////////////////////////////////////////////////////////////////////// + //# both methods + //////////////////////////////////////////////////////////////////////////////// + + test("both fastFail") { + val cf = CompletableFuture.supplyAsync { + sleep(2_000) + n + } + val failed = CompletableFuture.failedFuture(rte) + + val runnable = Runnable {} + shouldThrow { + cf.runAfterBothFastFail(failed, runnable)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.runAfterBothFastFailAsync(failed, runnable)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.runAfterBothFastFailAsync(failed, runnable, testThreadPoolExecutor)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + + val bc = BiConsumer { _: Int, _: Int -> } + shouldThrow { + cf.thenAcceptBothFastFail(failed, bc)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.thenAcceptBothFastFailAsync(failed, bc)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.thenAcceptBothFastFailAsync(failed, bc, testThreadPoolExecutor)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + + shouldThrow { + cf.thenCombineFastFail(failed, Integer::sum)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.thenCombineFastFailAsync(failed, Integer::sum)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + shouldThrow { + cf.thenCombineFastFailAsync(failed, Integer::sum, testThreadPoolExecutor)[1, TimeUnit.MILLISECONDS] + }.cause shouldBeSameInstanceAs rte + } + + //////////////////////////////////////////////////////////////////////////////// + //# either methods + //////////////////////////////////////////////////////////////////////////////// + + test("either success") { + val failed = CompletableFuture.failedFuture(rte) + val cf = CompletableFuture.completedFuture(n) + + val runnable = Runnable {} + failed.runAfterEitherSuccess(cf, runnable).get().shouldBeNull() + failed.runAfterEitherSuccessAsync(cf, runnable).get().shouldBeNull() + failed.runAfterEitherSuccessAsync(cf, runnable, testThreadPoolExecutor).get().shouldBeNull() + + val c = Consumer {} + failed.acceptEitherSuccess(cf, c).get().shouldBeNull() + failed.acceptEitherSuccessAsync(cf, c).get().shouldBeNull() + failed.acceptEitherSuccessAsync(cf, c, testThreadPoolExecutor).get().shouldBeNull() + + failed.applyToEitherSuccess(cf, Function.identity()).get() shouldBe n + failed.applyToEitherSuccessAsync(cf, Function.identity()).get() shouldBe n + failed.applyToEitherSuccessAsync(cf, Function.identity(), testThreadPoolExecutor).get() shouldBe n + } + //////////////////////////////////////////////////////////////////////////////// //# Backport CF instance methods // compatibility for low Java version