Skip to content

Commit

Permalink
WIP fix hopAsyncIf
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed May 31, 2024
1 parent 7c96dc5 commit fa1e7e0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(
requireNonNull(unit, "unit is null");

final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1576,7 +1576,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l
requireNonNull(unit, "unit is null");

final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1621,12 +1621,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
}

@SuppressWarnings("unchecked")
private static <C extends CompletionStage<?>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage<?>) x)
: cf
).thenCompose(x -> x);
public static <C extends CompletionStage<?>>
C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) {
final CompletionStage<Object> f = (CompletionStage<Object>) cf;

return (C) f.handle((r, ex) -> null).thenCompose(unused -> {
if (!IS_IN_CF_DELAYER_THREAD.getAsBoolean()) return f;

CompletableFuture<Void> signal = new CompletableFuture<>();
delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor)
.execute(() -> signal.complete(null));

return signal.thenCompose(v -> f);
});
}

//# Advanced methods of CompletionStage
Expand Down Expand Up @@ -2136,7 +2143,7 @@ private static Executor _asyncPool0() {
final CompletableFuture<Integer> cf = completedFuture(42);
try {
// `exceptionallyCompose` is the new method of CompletableFuture since java 12
cf.exceptionallyCompose(v -> cf);
cf.exceptionallyCompose(ex -> cf);
b = true;
} catch (NoSuchMethodError e) {
b = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,39 @@ import kotlin.random.Random
import kotlin.random.nextULong


const val THREAD_COUNT_OF_POOL = 5
val THREAD_COUNT_OF_POOL = 4.coerceAtLeast(Runtime.getRuntime().availableProcessors() * 2)

@JvmOverloads
fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService {
val counter = AtomicLong()
val prefix = "${threadNamePrefix}_${Random.nextULong()}-"
val prefix = "${threadNamePrefix}_${Random.nextULong()}"

val executorService = if (!isForkJoin)
ThreadPoolExecutor(
THREAD_COUNT_OF_POOL, THREAD_COUNT_OF_POOL, 1, TimeUnit.DAYS,
ArrayBlockingQueue(5000)
/* corePoolSize = */ THREAD_COUNT_OF_POOL, /* maximumPoolSize = */ THREAD_COUNT_OF_POOL,
/* keepAliveTime = */ 1, /* unit = */ TimeUnit.DAYS,
/* workQueue = */ ArrayBlockingQueue(5000)
) { r ->
Thread(r).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
isDaemon = true
}
}
else
ForkJoinPool(
THREAD_COUNT_OF_POOL, { fjPool ->
/* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool ->
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
}
},
null, false
}, /* handler = */ null, /* asyncMode = */ false
)

return object : ExecutorService by executorService, ThreadPoolAcquaintance {
override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix)

override fun unwrap(): ExecutorService = executorService

override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix"
}
}

Expand All @@ -57,8 +59,11 @@ private interface ThreadPoolAcquaintance {
fun unwrap(): ExecutorService
}

fun isRunInExecutor(executor: Executor): Boolean =
executor.doesOwnThread(currentThread())

fun assertRunInExecutor(executor: Executor) {
executor.doesOwnThread(currentThread()).shouldBeTrue()
isRunInExecutor(executor).shouldBeTrue()
}

fun assertNotRunInExecutor(executor: Executor) {
Expand Down
2 changes: 1 addition & 1 deletion cffu-core/src/test/resources/kotest.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
kotest.framework.classpath.scanning.config.disable=true
kotest.framework.classpath.scanning.autoscan.disable=true
# https://kotest.io/docs/framework/project-config.html#runtime-detection
kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig
#kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig

0 comments on commit fa1e7e0

Please sign in to comment.