diff --git a/cffu-core/src/test/java/io/foldright/cffu/CheckExecutorTests.kt b/cffu-core/src/test/java/io/foldright/cffu/CheckExecutorTests.kt new file mode 100644 index 00000000..4fe51ec7 --- /dev/null +++ b/cffu-core/src/test/java/io/foldright/cffu/CheckExecutorTests.kt @@ -0,0 +1,479 @@ +package io.foldright.cffu + +import io.foldright.test_utils.* +import io.kotest.core.spec.style.FunSpec +import java.lang.Thread.currentThread +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.FutureTask +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.function.* +import java.util.function.Function + +class CheckExecutorTests : FunSpec({ + (1..3).forEach { count -> + test("Multi-Actions(M*) Methods with $count actions") { + val sInCP = createSuppliers(count) + val sInTE = createSuppliers(count, testExecutor) + + val rInCP = createRunnables(count) + val rInTE = createRunnables(count, testExecutor) + + val fs = listOf( + CompletableFutureUtils.mSupplyFailFastAsync(*sInCP), + CompletableFutureUtils.mSupplyFailFastAsync(testExecutor, *sInTE), + + CompletableFutureUtils.mSupplyAllSuccessAsync(null, *sInCP), + CompletableFutureUtils.mSupplyAllSuccessAsync(null, testExecutor, *sInTE), + + CompletableFutureUtils.mSupplyMostSuccessAsync(null, LONG_WAIT_MS, MILLISECONDS, *sInCP), + CompletableFutureUtils.mSupplyMostSuccessAsync(null, testExecutor, LONG_WAIT_MS, MILLISECONDS, *sInTE), + + CompletableFutureUtils.mSupplyAsync(*sInCP), + CompletableFutureUtils.mSupplyAsync(testExecutor, *sInTE), + + CompletableFutureUtils.mSupplyAnySuccessAsync(*sInCP), + CompletableFutureUtils.mSupplyAnySuccessAsync(testExecutor, *sInTE), + + CompletableFutureUtils.mSupplyAnyAsync(*sInCP), + CompletableFutureUtils.mSupplyAnyAsync(testExecutor, *sInTE), + + CompletableFutureUtils.mRunFailFastAsync(*rInCP), + CompletableFutureUtils.mRunFailFastAsync(testExecutor, *rInTE), + + CompletableFutureUtils.mRunAsync(*rInCP), + CompletableFutureUtils.mRunAsync(testExecutor, *rInTE), + + CompletableFutureUtils.mRunAnySuccessAsync(*rInCP), + CompletableFutureUtils.mRunAnySuccessAsync(testExecutor, *rInTE), + + CompletableFutureUtils.mRunAnyAsync(*rInCP), + CompletableFutureUtils.mRunAnyAsync(testExecutor, *rInTE), + ) + + fs.forEach { it.get() } + } + } + + test("Multi-Actions-Tuple(MTuple*) Methods(create by actions)") { + val (sInCP1, sInCP2, sInCP3, sInCP4, sInCP5) = createSuppliers(5) + val (sInTE1, sInTE2, sInTE3, sInTE4, sInTE5) = createSuppliers(5, testExecutor) + + val fs = listOf( + CompletableFutureUtils.mSupplyTupleFailFastAsync(sInCP1, sInCP2), + CompletableFutureUtils.mSupplyTupleFailFastAsync(sInCP1, sInCP2, sInCP3), + CompletableFutureUtils.mSupplyTupleFailFastAsync(sInCP1, sInCP2, sInCP3, sInCP4), + CompletableFutureUtils.mSupplyTupleFailFastAsync(sInCP1, sInCP2, sInCP3, sInCP4, sInCP5), + CompletableFutureUtils.mSupplyTupleFailFastAsync(testExecutor, sInTE1, sInTE2), + CompletableFutureUtils.mSupplyTupleFailFastAsync(testExecutor, sInTE1, sInTE2, sInTE3), + CompletableFutureUtils.mSupplyTupleFailFastAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4), + CompletableFutureUtils.mSupplyTupleFailFastAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4, sInTE5), + + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(sInCP1, sInCP2), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(sInCP1, sInCP2, sInCP3), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(sInCP1, sInCP2, sInCP3, sInCP4), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(sInCP1, sInCP2, sInCP3, sInCP4, sInCP5), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(testExecutor, sInTE1, sInTE2), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4), + CompletableFutureUtils.mSupplyAllSuccessTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4, sInTE5), + + CompletableFutureUtils.mSupplyMostSuccessTupleAsync(LONG_WAIT_MS, MILLISECONDS, sInCP1, sInCP2), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync(LONG_WAIT_MS, MILLISECONDS, sInCP1, sInCP2, sInCP3), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + LONG_WAIT_MS, + MILLISECONDS, + sInCP1, + sInCP2, + sInCP3, + sInCP4 + ), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + LONG_WAIT_MS, + MILLISECONDS, + sInCP1, + sInCP2, + sInCP3, + sInCP4, + sInCP5 + ), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + sInTE1, + sInTE2 + ), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + sInTE1, + sInTE2, + sInTE3 + ), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + sInTE1, + sInTE2, + sInTE3, + sInTE4 + ), + CompletableFutureUtils.mSupplyMostSuccessTupleAsync( + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + sInTE1, + sInTE2, + sInTE3, + sInTE4, + sInTE5 + ), + + CompletableFutureUtils.mSupplyTupleAsync(sInCP1, sInCP2), + CompletableFutureUtils.mSupplyTupleAsync(sInCP1, sInCP2, sInCP3), + CompletableFutureUtils.mSupplyTupleAsync(sInCP1, sInCP2, sInCP3, sInCP4), + CompletableFutureUtils.mSupplyTupleAsync(sInCP1, sInCP2, sInCP3, sInCP4, sInCP5), + CompletableFutureUtils.mSupplyTupleAsync(testExecutor, sInTE1, sInTE2), + CompletableFutureUtils.mSupplyTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3), + CompletableFutureUtils.mSupplyTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4), + CompletableFutureUtils.mSupplyTupleAsync(testExecutor, sInTE1, sInTE2, sInTE3, sInTE4, sInTE5), + ) + + fs.forEach { it.get() } + } + + test("Delay Execution") { + val fs = listOf( + createFutureTask().also { + CompletableFutureUtils.delayedExecutor(1, MILLISECONDS).execute(it) + }, + createFutureTask(testExecutor).also { + CompletableFutureUtils.delayedExecutor(1, MILLISECONDS, testExecutor).execute(it) + }, + ) + + fs.forEach { it.get() } + } + + val cfThis = CompletableFuture.completedFuture(n) + + (1..3).forEach { count -> + test("Then-Multi-Actions(thenM*) Methods with $count actions") { + val fInCP = createFunctions(count) + val fInTE = createFunctions(count, testExecutor) + val cInCP = createConsumers(count) + val cInTE = createConsumers(count, testExecutor) + val rInCP = createRunnables(count) + val rInTE = createRunnables(count, testExecutor) + + val fs = listOf( + CompletableFutureUtils.thenMApplyFailFastAsync(cfThis, *fInCP), + CompletableFutureUtils.thenMApplyFailFastAsync(cfThis, testExecutor, *fInTE), + + CompletableFutureUtils.thenMApplyAllSuccessAsync(cfThis, null, *fInCP), + CompletableFutureUtils.thenMApplyAllSuccessAsync(cfThis, null, testExecutor, *fInTE), + + CompletableFutureUtils.thenMApplyMostSuccessAsync(cfThis, null, LONG_WAIT_MS, MILLISECONDS, *fInCP), + CompletableFutureUtils.thenMApplyMostSuccessAsync( + cfThis, + null, + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + *fInTE + ), + + CompletableFutureUtils.thenMApplyAsync(cfThis, *fInCP), + CompletableFutureUtils.thenMApplyAsync(cfThis, testExecutor, *fInTE), + + CompletableFutureUtils.thenMApplyAnySuccessAsync(cfThis, *fInCP), + CompletableFutureUtils.thenMApplyAnySuccessAsync(cfThis, testExecutor, *fInTE), + + CompletableFutureUtils.thenMApplyAnyAsync(cfThis, *fInCP), + CompletableFutureUtils.thenMApplyAnyAsync(cfThis, testExecutor, *fInTE), + + CompletableFutureUtils.thenMAcceptFailFastAsync(cfThis, *cInCP), + CompletableFutureUtils.thenMAcceptFailFastAsync(cfThis, testExecutor, *cInTE), + + CompletableFutureUtils.thenMAcceptAsync(cfThis, *cInCP), + CompletableFutureUtils.thenMAcceptAsync(cfThis, testExecutor, *cInTE), + + CompletableFutureUtils.thenMAcceptAnySuccessAsync(cfThis, *cInCP), + CompletableFutureUtils.thenMAcceptAnySuccessAsync(cfThis, testExecutor, *cInTE), + + CompletableFutureUtils.thenMAcceptAnyAsync(cfThis, *cInCP), + CompletableFutureUtils.thenMAcceptAnyAsync(cfThis, testExecutor, *cInTE), + + CompletableFutureUtils.thenMRunFailFastAsync(cfThis, *rInCP), + CompletableFutureUtils.thenMRunFailFastAsync(cfThis, testExecutor, *rInTE), + + CompletableFutureUtils.thenMRunAsync(cfThis, *rInCP), + CompletableFutureUtils.thenMRunAsync(cfThis, testExecutor, *rInTE), + + CompletableFutureUtils.thenMRunAnySuccessAsync(cfThis, *rInCP), + CompletableFutureUtils.thenMRunAnySuccessAsync(cfThis, testExecutor, *rInTE), + + CompletableFutureUtils.thenMRunAnyAsync(cfThis, *rInCP), + CompletableFutureUtils.thenMRunAnyAsync(cfThis, testExecutor, *rInTE), + ) + + fs.forEach { it.get() } + } + } + + test("Then-Multi-Actions-Tuple(thenMTuple*) Methods") { + val (fInCP1, fInCP2, fInCP3, fInCP4, fInCP5) = createFunctions(5) + val (fInTE1, fInTE2, fInTE3, fInTE4, fInTE5) = createFunctions(5, testExecutor) + + val fs = listOf( + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, fInCP1, fInCP2), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, fInCP1, fInCP2, fInCP3), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4, fInCP5), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, testExecutor, fInTE1, fInTE2), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3), + CompletableFutureUtils.thenMApplyTupleFailFastAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3, fInTE4), + CompletableFutureUtils.thenMApplyTupleFailFastAsync( + cfThis, + testExecutor, + fInTE1, + fInTE2, + fInTE3, + fInTE4, + fInTE5 + ), + + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, fInCP1, fInCP2), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, fInCP1, fInCP2, fInCP3), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4, fInCP5), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, testExecutor, fInTE1, fInTE2), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3, fInTE4), + CompletableFutureUtils.thenMApplyAllSuccessTupleAsync( + cfThis, + testExecutor, + fInTE1, + fInTE2, + fInTE3, + fInTE4, + fInTE5 + ), + + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync(cfThis, LONG_WAIT_MS, MILLISECONDS, fInCP1, fInCP2), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + LONG_WAIT_MS, + MILLISECONDS, + fInCP1, + fInCP2, + fInCP3 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + LONG_WAIT_MS, + MILLISECONDS, + fInCP1, + fInCP2, + fInCP3, + fInCP4 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + LONG_WAIT_MS, + MILLISECONDS, + fInCP1, + fInCP2, + fInCP3, + fInCP4, + fInCP5 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + fInTE1, + fInTE2 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + fInTE1, + fInTE2, + fInTE3 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + fInTE1, + fInTE2, + fInTE3, + fInTE4 + ), + CompletableFutureUtils.thenMApplyMostSuccessTupleAsync( + cfThis, + testExecutor, + LONG_WAIT_MS, + MILLISECONDS, + fInTE1, + fInTE2, + fInTE3, + fInTE4, + fInTE5 + ), + + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, fInCP1, fInCP2), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, fInCP1, fInCP2, fInCP3), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, fInCP1, fInCP2, fInCP3, fInCP4, fInCP5), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, testExecutor, fInTE1, fInTE2), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3, fInTE4), + CompletableFutureUtils.thenMApplyTupleAsync(cfThis, testExecutor, fInTE1, fInTE2, fInTE3, fInTE4, fInTE5), + ) + + fs.forEach { it.get() } + } + + val other = CompletableFuture.completedFuture(anotherN) + + test("thenBoth* Methods(binary input) with fail-fast support") { + val fInCP = createBiFunction() + val fInTE = createBiFunction(testExecutor) + val cInCP = createBiConsumer() + val cInTE = createBiConsumer(testExecutor) + val rInCP = createFutureTask() + val rInTE = createFutureTask(testExecutor) + + val fs = listOf( + CompletableFutureUtils.thenCombineFailFastAsync(cfThis, other, fInCP), + CompletableFutureUtils.thenCombineFailFastAsync(cfThis, other, fInTE, testExecutor), + + CompletableFutureUtils.thenAcceptBothFailFastAsync(cfThis, other, cInCP), + CompletableFutureUtils.thenAcceptBothFailFastAsync(cfThis, other, cInTE, testExecutor), + + CompletableFutureUtils.runAfterBothFailFastAsync(cfThis, other, rInCP), + CompletableFutureUtils.runAfterBothFailFastAsync(cfThis, other, rInTE, testExecutor), + ) + + fs.forEach { it.get() } + } + + test("thenEither* Methods(binary input) with either(any)-success support") { + val (fInCP) = createFunctions(1) + val (fInTE) = createFunctions(1, testExecutor) + val (cInCP) = createConsumers(1) + val (cInTE) = createConsumers(1, testExecutor) + val rInCP = createFutureTask() + val rInTE = createFutureTask(testExecutor) + + val fs = listOf( + CompletableFutureUtils.applyToEitherSuccessAsync(cfThis, other, fInCP), + CompletableFutureUtils.applyToEitherSuccessAsync(cfThis, other, fInTE, testExecutor), + + CompletableFutureUtils.acceptEitherSuccessAsync(cfThis, other, cInCP), + CompletableFutureUtils.acceptEitherSuccessAsync(cfThis, other, cInTE, testExecutor), + + CompletableFutureUtils.runAfterEitherSuccessAsync(cfThis, other, rInCP), + CompletableFutureUtils.runAfterEitherSuccessAsync(cfThis, other, rInTE, testExecutor), + ) + + fs.forEach { it.get() } + } + + test("Error Handling Methods of CompletionStage") { + val testingThread = currentThread() + val fInCP = Function { + assertRunningByFjCommonPool(testingThread) + n + } + val fInTE = Function { + assertRunningInExecutor(testExecutor) + n + } + val failedCf = CompletableFutureUtils.failedFuture(RuntimeException("Failed")) + + val fs = listOf( + CompletableFutureUtils.catchingAsync(failedCf, RuntimeException::class.java, fInCP), + CompletableFutureUtils.catchingAsync(failedCf, RuntimeException::class.java, fInTE, testExecutor), + + CompletableFutureUtils.exceptionallyAsync(cfThis, fInCP), + CompletableFutureUtils.exceptionallyAsync(cfThis, fInTE, testExecutor), + ) + + fs.forEach { it.get() } + } +}) + +private val commonPool = ForkJoinPool.commonPool() + +private fun createSuppliers(size: Int, executor: Executor = commonPool): Array> = Array(size) { idx -> + val testingThread = currentThread() + Supplier { + if (executor === commonPool) assertRunningByFjCommonPool(testingThread) + else assertRunningInExecutor(executor) + idx + } +} + +private fun createRunnables(size: Int, executor: Executor = commonPool): Array = Array(size) { _ -> + val testingThread = currentThread() + Runnable { + if (executor === commonPool) assertRunningByFjCommonPool(testingThread) + else assertRunningInExecutor(executor) + } +} + +private fun createFunctions( + size: Int, executor: Executor = commonPool +): Array> = Array(size) { idx -> + val callingThread = currentThread() + Function { + if (executor === commonPool) assertRunningByFjCommonPool(callingThread) + else assertRunningInExecutor(executor) + idx + it + } +} + +private fun createConsumers(size: Int, executor: Executor = commonPool): Array> = Array(size) { _ -> + val callingThread = currentThread() + Consumer { + if (executor === commonPool) assertRunningByFjCommonPool(callingThread) + else assertRunningInExecutor(executor) + } +} + +private fun createFutureTask(executor: Executor = commonPool): FutureTask { + val callingThread = currentThread() + return FutureTask { + if (executor === commonPool) assertRunningByFjCommonPool(callingThread) + else assertRunningInExecutor(executor) + n + } +} + +private fun createBiFunction(executor: Executor = commonPool): BiFunction { + val callingThread = currentThread() + return BiFunction { x, y -> + if (executor === commonPool) assertRunningByFjCommonPool(callingThread) + else assertRunningInExecutor(executor) + x + y + } +} + +private fun createBiConsumer(executor: Executor = commonPool): BiConsumer { + val callingThread = currentThread() + return BiConsumer { _, _ -> + if (executor === commonPool) assertRunningByFjCommonPool(callingThread) + else assertRunningInExecutor(executor) + } +} diff --git a/cffu-core/src/test/java/io/foldright/test_utils/TestingExecutorUtils.kt b/cffu-core/src/test/java/io/foldright/test_utils/TestingExecutorUtils.kt index d955ae0d..1cb260e6 100644 --- a/cffu-core/src/test/java/io/foldright/test_utils/TestingExecutorUtils.kt +++ b/cffu-core/src/test/java/io/foldright/test_utils/TestingExecutorUtils.kt @@ -3,6 +3,7 @@ package io.foldright.test_utils import io.foldright.cffu.CffuFactory +import io.kotest.assertions.fail import io.kotest.core.config.AbstractProjectConfig import io.kotest.core.listeners.BeforeProjectListener import io.kotest.matchers.booleans.shouldBeFalse @@ -89,6 +90,30 @@ private fun isRunningInExecutor(executor: Executor): Boolean = private fun Thread.belongsTo(executor: Executor): Boolean = (executor as ThreadPoolAcquaintance).own(this) +fun assertRunningByFjCommonPool(callingThread: Thread) { + val runningThread = currentThread() + + val runInCallingThread = runningThread == callingThread + val runInCpThread = runningThread.name.startsWith("ForkJoinPool.commonPool-worker-") + + val actualMsg = "actual" + + (if (!runInCallingThread) " not" else "") + + "running in calling thread" + + (if (!runInCpThread) " not" else "") + + " running in common pool thread" + + val isCpParallel = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism") != "1" + val (expected, expectedMsg) = if (isCpParallel) + runInCpThread to "expect running in common pool thread" + else (!runInCallingThread && !runInCpThread) to "expect not running in calling thread" + + "and not in common pool thread(because common pool is not parallel)" + + if (!expected) fail( + "assertRunningByFjCommonPool failed.\n$expectedMsg $actualMsg.\ncontext info:\n" + + " running thread: $runningThread\n calling thread: $callingThread" + ) +} + // endregion //////////////////////////////////////////////////////////////////////////////// // region# util method for executors