Skip to content

Commit

Permalink
test: add test cases for the safe behavior of timeout* methods ⏳ te…
Browse files Browse the repository at this point in the history
…st should FAIL 💥
  • Loading branch information
oldratlee committed Jun 3, 2024
1 parent 0f58ff6 commit 3fe4478
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static io.foldright.cffu.CompletableFutureUtils.*;
import static io.foldright.test_utils.TestUtils.*;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -1031,6 +1032,58 @@ void test_timeout() throws Exception {
assertEquals(n, cffuCompleteOnTimeout(completedFuture(n), anotherN, defaultExecutor(), 1, TimeUnit.MILLISECONDS).get());
}

@Test
void test_safeBehavior_orTimeout() {
final Thread testThread = currentThread();

assertEquals(n, orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertTrue(Delayer.atCfDelayerThread());
return n;
}).join());

assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
// FIXME: Not TimeoutException, not compatible with orTimeout method
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(TimeoutException.class, ex.getCause());
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return n;
}).join());
assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
// FIXME: Not TimeoutException, not compatible with orTimeout method
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(TimeoutException.class, ex.getCause());
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return n;
}).join());
}

@Test
void test_safeBehavior_completeOnTimeout() {
final Thread testThread = currentThread();

assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertTrue(Delayer.atCfDelayerThread());
return r;
}).join());

assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertNotSame(testThread, currentThread());
return r;
}).join());
assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertFalse(Delayer.atCfDelayerThread());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return r;
}).join());
}

@Test
void test_exceptionallyCompose() throws Exception {
CompletableFuture<Object> completed = completedFuture(n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,39 @@ import kotlin.random.Random
import kotlin.random.nextULong


const val THREAD_COUNT_OF_POOL = 5
val THREAD_COUNT_OF_POOL: Int = (Runtime.getRuntime().availableProcessors() * 2).coerceAtLeast(4).coerceAtMost(15)

@JvmOverloads
fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService {
val counter = AtomicLong()
val prefix = "${threadNamePrefix}_${Random.nextULong()}-"
val prefix = "${threadNamePrefix}_${Random.nextULong()}"

val executorService = if (!isForkJoin)
ThreadPoolExecutor(
THREAD_COUNT_OF_POOL, THREAD_COUNT_OF_POOL, 1, TimeUnit.DAYS,
ArrayBlockingQueue(5000)
/* corePoolSize = */ THREAD_COUNT_OF_POOL, /* maximumPoolSize = */ THREAD_COUNT_OF_POOL,
/* keepAliveTime = */ 1, /* unit = */ TimeUnit.DAYS,
/* workQueue = */ ArrayBlockingQueue(5000)
) { r ->
Thread(r).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
isDaemon = true
}
}
else
ForkJoinPool(
THREAD_COUNT_OF_POOL, { fjPool ->
/* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool ->
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
}
},
null, false
}, /* handler = */ null, /* asyncMode = */ false
)

return object : ExecutorService by executorService, ThreadPoolAcquaintance {
override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix)

override fun unwrap(): ExecutorService = executorService

override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix"
}
}

Expand All @@ -57,8 +59,11 @@ private interface ThreadPoolAcquaintance {
fun unwrap(): ExecutorService
}

fun isRunInExecutor(executor: Executor): Boolean =
executor.doesOwnThread(currentThread())

fun assertRunInExecutor(executor: Executor) {
executor.doesOwnThread(currentThread()).shouldBeTrue()
isRunInExecutor(executor).shouldBeTrue()
}

fun assertNotRunInExecutor(executor: Executor) {
Expand Down

0 comments on commit 3fe4478

Please sign in to comment.