Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Aug 9, 2024
1 parent e2899f8 commit 34c4d7e
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import java.util.concurrent.*;
import java.util.function.Supplier;

import static io.foldright.cffu.CffuTestHelper.unwrapMadeExecutor;
import static io.foldright.cffu.CompletableFutureUtils.failedFuture;
import static io.foldright.cffu.CompletableFutureUtils.toCompletableFutureArray;
import static io.foldright.cffu.DefaultExecutorTestUtils.unwrapMadeExecutor;
import static io.foldright.test_utils.CffuTestConstants.*;
import static io.foldright.test_utils.TestUtils.*;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ForkJoinPool.commonPool;
Expand Down
5 changes: 3 additions & 2 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import static io.foldright.cffu.DefaultExecutorTestUtils.unwrapMadeExecutor;
import static io.foldright.test_utils.TestUtils.*;
import static io.foldright.cffu.CffuTestHelper.unwrapMadeExecutor;
import static io.foldright.test_utils.CffuTestConstants.*;
import static io.foldright.test_utils.TestUtils.sleep;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.*;
Expand Down
16 changes: 16 additions & 0 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTestHelper.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
@file:JvmName("CffuTestHelper")

package io.foldright.cffu

import io.foldright.cffu.CffuFactoryBuilder.CffuMadeExecutor
import java.util.concurrent.Executor

fun CffuFactory.unwrapMadeExecutor(): Executor {
val executor = defaultExecutor() as CffuMadeExecutor
return executor.unwrap()
}

fun Cffu<*>.unwrapMadeExecutor(): Executor {
val executor = defaultExecutor() as CffuMadeExecutor
return executor.unwrap()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.stream.IntStream;

import static io.foldright.cffu.CompletableFutureUtils.*;
import static io.foldright.test_utils.CffuTestConstants.*;
import static io.foldright.test_utils.TestThreadPoolManager.assertRunningInExecutor;
import static io.foldright.test_utils.TestUtils.*;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -1507,7 +1509,7 @@ void test_safeBehavior_orTimeout() {
cffuOrTimeout(createIncompleteFuture(), executorService, 100, TimeUnit.MILLISECONDS).handle((v, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
assertRunningInExecutor(executorService);
return i;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
Expand Down Expand Up @@ -1538,7 +1540,7 @@ void test_safeBehavior_completeOnTimeout() {
cffuCompleteOnTimeout(createIncompleteFuture(), i, executorService, 100, TimeUnit.MILLISECONDS).handle((v, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
assertRunningInExecutor(executorService);
return v;
})
).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList()));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import java.util.concurrent.*;

import static io.foldright.cffu.ListenableFutureUtils.*;
import static io.foldright.test_utils.TestUtils.*;
import static io.foldright.test_utils.CffuTestConstants.*;
import static io.foldright.test_utils.TestUtils.sleep;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.*;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.foldright.cffu.spi

import io.foldright.cffu.CffuFactory
import io.foldright.cffu.DefaultExecutorTestUtils.unwrapMadeExecutor
import io.foldright.cffu.unwrapMadeExecutor
import io.foldright.test_utils.testThreadPoolExecutor
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.types.shouldBeSameInstanceAs
Expand All @@ -11,15 +11,15 @@ class ExecutorWrapperProviderTest : FunSpec({
test("disable TestExecutorWrapper") {
val factory = CffuFactory.builder(testThreadPoolExecutor).build()
val cffu = factory.runAsync {}
unwrapMadeExecutor(cffu).shouldBeSameInstanceAs(testThreadPoolExecutor)
cffu.unwrapMadeExecutor() shouldBeSameInstanceAs testThreadPoolExecutor
}

test("enable TestExecutorWrapper") {
enableTestExecutorWrapper()

val factory = CffuFactory.builder(testThreadPoolExecutor).build()
val cffu = factory.runAsync {}
unwrapMadeExecutor(cffu).shouldNotBeSameInstanceAs(testThreadPoolExecutor)
cffu.unwrapMadeExecutor() shouldNotBeSameInstanceAs testThreadPoolExecutor
}

beforeTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static org.junit.jupiter.api.Assertions.*;


@SuppressWarnings({"AssertBetweenInconvertibleTypes", "EqualsWithItself", "SimplifiableAssertion", "ConstantValue", "EqualsBetweenInconvertibleTypes"})
@SuppressWarnings({"EqualsWithItself", "SimplifiableAssertion", "ConstantValue", "EqualsBetweenInconvertibleTypes"})
public class TupleTest {
int e1 = 1;
String e2 = "2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static io.foldright.cffu.CompletableFutureUtils.*;


@SuppressWarnings({"ThrowablePrintedToSystemOut", "SameParameterValue"})
public class ConcurrencyStrategyDemo {
public static final ExecutorService myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = CffuFactory.builder(myBizExecutor).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean

class CompletableFutureUsageShowcaseTest : FunSpec({
val n = 42
val anotherN = 424242
val rte = RuntimeException("Bang")

test("execution thread/executor behavior: then*(non-Async) operations chained after COMPLETED CF(completed by immediate value), trigger by then* invocation and run in place SEQUENTIALLY").config(
invocations = 100
Expand Down Expand Up @@ -71,7 +68,7 @@ class CompletableFutureUsageShowcaseTest : FunSpec({
sequenceChecker.assertSeq("supplyAsync completed CF", seq++)

currentThread() shouldNotBe mainThread
assertRunInExecutor(testThreadPoolExecutor)
assertRunningInExecutor(testThreadPoolExecutor)

n
}, testThreadPoolExecutor)
Expand Down Expand Up @@ -123,12 +120,12 @@ class CompletableFutureUsageShowcaseTest : FunSpec({
blocker.block()

currentThread() shouldNotBe mainThread
assertNotRunInExecutor(testThreadPoolExecutor)
assertNotRunningInExecutor(testThreadPoolExecutor)
}
.thenRunAsync({
thenNonAsyncOpThread = currentThread()

assertRunInExecutor(testThreadPoolExecutor)
assertRunningInExecutor(testThreadPoolExecutor)
}, testThreadPoolExecutor) // ! switch executor !
.thenApply {
// when NOT async,
Expand All @@ -152,15 +149,15 @@ class CompletableFutureUsageShowcaseTest : FunSpec({
//
// - executor is NOT inherited after switch!
// - use the DEFAULT EXECUTOR of CompletableFuture, if no executor specified.
assertNotRunInExecutor(testThreadPoolExecutor)
assertNotRunningInExecutor(testThreadPoolExecutor)
}
}
f.join()
}

fun checkThreadSwitchBehaviorThenApplyAsync(executor: ExecutorService) {
val f0 = CompletableFuture.supplyAsync({
assertRunInExecutor(executor)
assertRunningInExecutor(executor)
emptyList<String>()
}, executor)

Expand Down Expand Up @@ -245,7 +242,7 @@ class CompletableFutureUsageShowcaseTest : FunSpec({
sequenceChecker.assertSeq("create exceptionallyAsync", 1)

currentThread() shouldNotBe mainThread
assertRunInExecutor(testThreadPoolExecutor)
assertRunningInExecutor(testThreadPoolExecutor)

it.shouldBeTypeOf<CompletionException>()
it.cause shouldBeSameInstanceAs rte
Expand Down Expand Up @@ -312,7 +309,7 @@ class CompletableFutureUsageShowcaseTest : FunSpec({
f0.completeAsync({
sequenceChecker.assertSeq("in completeAsync", 2)

assertRunInExecutor(testThreadPoolExecutor)
assertRunningInExecutor(testThreadPoolExecutor)

"done"
}, testThreadPoolExecutor)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@file:JvmName("CffuTestConstants")

package io.foldright.test_utils


////////////////////////////////////////////////////////////////////////////////
// constants for testing
////////////////////////////////////////////////////////////////////////////////

const val n = 42
const val anotherN = 424242
const val s = "S42"
const val d = 42.1

@JvmField
val rte = RuntimeException("Bang")

@JvmField
val anotherRte = RuntimeException("AnotherBang")
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,26 @@ import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
import kotlin.random.nextULong

////////////////////////////////////////////////////////////////////////////////
// region## test executors for kotest
////////////////////////////////////////////////////////////////////////////////

val THREAD_COUNT_OF_POOL: Int = (Runtime.getRuntime().availableProcessors() * 2).coerceAtLeast(4).coerceAtMost(15)

val testThreadPoolExecutor: ExecutorService =
createThreadPool("CompletableFutureUseTest_ThreadPool")

val testCffuFactory: CffuFactory = CffuFactory.builder(testThreadPoolExecutor).build()

val testForkJoinPoolExecutor: ExecutorService =
createThreadPool("CompletableFutureUseTest_ForkJoinPool", true)

val testForkJoinCffuFactory: CffuFactory = CffuFactory.builder(testForkJoinPoolExecutor).build()

@JvmOverloads
fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService {
val counter = AtomicLong()
val prefix = "${threadNamePrefix}_${Random.nextULong()}"
val prefix = "${threadNamePrefix}_${Random.nextULong()}_"

val executorService = if (!isForkJoin)
ThreadPoolExecutor(
Expand All @@ -29,47 +42,59 @@ fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): Exe
/* workQueue = */ ArrayBlockingQueue(5000)
) { r ->
Thread(r).apply {
name = "${prefix}_${counter.getAndIncrement()}"
name = "${prefix}${counter.getAndIncrement()}"
isDaemon = true
}
}
else
ForkJoinPool(
/* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool ->
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply {
name = "${prefix}_${counter.getAndIncrement()}"
name = "${prefix}${counter.getAndIncrement()}"
}
}, /* handler = */ null, /* asyncMode = */ false
)

return object : ExecutorService by executorService, ThreadPoolAcquaintance {
override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix)
override fun own(thread: Thread): Boolean = thread.name.startsWith(prefix)

override fun unwrap(): ExecutorService = executorService

override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix"
override fun toString(): String =
"test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} with prefix $prefix"
}
}

private fun Executor.doesOwnThread(thread: Thread): Boolean = (this as ThreadPoolAcquaintance).isMyThread(thread)

private interface ThreadPoolAcquaintance {
fun isMyThread(thread: Thread): Boolean
fun own(thread: Thread): Boolean

fun unwrap(): ExecutorService
}

fun isRunInExecutor(executor: Executor): Boolean =
executor.doesOwnThread(currentThread())
// endregion
////////////////////////////////////////////////////////////////////////////////
// region# check methods for thread/executor relationship
////////////////////////////////////////////////////////////////////////////////

fun assertRunInExecutor(executor: Executor) {
isRunInExecutor(executor).shouldBeTrue()
fun assertRunningInExecutor(executor: Executor) {
isRunningInExecutor(executor).shouldBeTrue()
}

fun assertNotRunInExecutor(executor: Executor) {
executor.doesOwnThread(currentThread()).shouldBeFalse()
fun assertNotRunningInExecutor(executor: Executor) {
isRunningInExecutor(executor).shouldBeFalse()
}

private fun isRunningInExecutor(executor: Executor): Boolean =
currentThread().belongsTo(executor)

private fun Thread.belongsTo(executor: Executor): Boolean =
(executor as ThreadPoolAcquaintance).own(this)

// endregion
////////////////////////////////////////////////////////////////////////////////
// region# util method for executors
////////////////////////////////////////////////////////////////////////////////

fun warmupExecutorService(vararg executors: ExecutorService) {
executors.flatMap { executor ->
(0 until THREAD_COUNT_OF_POOL * 2).map {
Expand All @@ -90,18 +115,11 @@ fun shutdownExecutorService(vararg executors: ExecutorService) {
}
}

// endregion
////////////////////////////////////////////////////////////////////////////////
// executors for kotest
// region# Kotest Listener
////////////////////////////////////////////////////////////////////////////////

val testThreadPoolExecutor: ExecutorService =
createThreadPool("CompletableFutureUseTest_ThreadPool")

val testCffuFactory: CffuFactory = CffuFactory.builder(testThreadPoolExecutor).build()

val testForkJoinPoolExecutor: ExecutorService =
createThreadPool("CompletableFutureUseTest_ForkJoinPool", true)

/**
* https://kotest.io/docs/framework/project-config.html
*/
Expand Down
21 changes: 3 additions & 18 deletions cffu-core/src/test/java/io/foldright/test_utils/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,6 @@ import org.apache.commons.lang3.SystemUtils.isJavaVersionAtLeast
import java.util.concurrent.*


////////////////////////////////////////////////////////////////////////////////
// constants for testing
////////////////////////////////////////////////////////////////////////////////

const val n = 42
const val anotherN = 424242
const val s = "S42"
const val d = 42.1

@JvmField
val rte = RuntimeException("Bang")

@JvmField
val anotherRte = RuntimeException("AnotherBang")

////////////////////////////////////////////////////////////////////////////////
// util methods for testing
////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -84,15 +69,15 @@ fun assertCompletableFutureRunInDefaultThread(executorService: ExecutorService)
}

fun assertCompletableFutureRunInThreadOf(executorService: ExecutorService) {
assertRunInExecutor(executorService)
assertRunningInExecutor(executorService)
}

fun assertCffuRunInDefaultThread(executorService: ExecutorService) {
assertRunInExecutor(executorService)
assertRunningInExecutor(executorService)
}

fun assertCffuRunInThreadOf(executorService: ExecutorService) {
assertRunInExecutor(executorService)
assertRunningInExecutor(executorService)
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 34c4d7e

Please sign in to comment.