Skip to content

Commit

Permalink
WIP fix hopAsyncIf
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Jun 3, 2024
1 parent d857f8d commit b122487
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD;
import static io.foldright.cffu.Delayer.atCfDelayerThread;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

Expand Down Expand Up @@ -1513,7 +1513,7 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(
requireNonNull(unit, "unit is null");

final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1589,7 +1589,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l
requireNonNull(unit, "unit is null");

final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1635,12 +1635,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
}

@SuppressWarnings("unchecked")
private static <C extends CompletionStage<?>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage<?>) x)
: cf
).thenCompose(x -> x);
public static <C extends CompletionStage<?>>
C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) {
final CompletionStage<Object> f = (CompletionStage<Object>) cf;

return (C) f.handle((v, ex) -> null).thenCompose(unused -> {
if (!atCfDelayerThread()) return f;

CompletableFuture<Void> signal = new CompletableFuture<>();
delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor)
.execute(() -> signal.complete(null));

return signal.thenCompose(v -> f);
});
}

//# Advanced methods of CompletionStage
Expand Down Expand Up @@ -2119,7 +2126,7 @@ private static class AsyncPoolHolder {
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot support parallelism.
*/
private static final Executor ASYNC_POOL = _asyncPool0();
static final Executor ASYNC_POOL = _asyncPool0();

private static Executor _asyncPool0() {
if (IS_JAVA9_PLUS) return completedFuture(null).defaultExecutor();
Expand Down Expand Up @@ -2153,7 +2160,7 @@ private static Executor _asyncPool0() {
final CompletableFuture<Integer> cf = completedFuture(42);
try {
// `exceptionallyCompose` is the new method of CompletableFuture since java 12
cf.exceptionallyCompose(v -> cf);
cf.exceptionallyCompose(ex -> cf);
b = true;
} catch (NoSuchMethodError e) {
b = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;


Expand All @@ -22,20 +21,13 @@
*/
@SuppressWarnings("JavadocReference")
final class Delayer {
private static final ScheduledThreadPoolExecutor delayer;

static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}

/**
* @return a Future that can be used to cancel the delayed task
* @see FutureCanceller
* @see DelayedExecutor#execute(Runnable)
*/
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
return DelayerHolder.delayer.schedule(command, delay, unit);
}

/**
Expand All @@ -55,7 +47,34 @@ public static <T> ScheduledFuture<?> delayToCompleteCf(
return delay(new CfCompleter<>(cf, value), delay, unit);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler";
/**
* Checks whether execution is at the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static boolean atCfDelayerThread() {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuBuiltinDelayScheduler";

/**
* Holds {@link #delayer} scheduler as field of static inner class for lazy loading(init only when needed).
* <p>
* The lazy loading is need because {@link #atCfDelayerThread()} method of
* class {@link Delayer} is used on {@code Java 9+}.
*/
private static class DelayerHolder {
static final ScheduledThreadPoolExecutor delayer;

static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}

private static final class DaemonThreadFactory implements ThreadFactory {
@Override
Expand All @@ -67,18 +86,6 @@ public Thread newThread(@NonNull Runnable r) {
}
}

/**
* Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
};

private Delayer() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static io.foldright.cffu.CompletableFutureUtils.*;
import static io.foldright.test_utils.TestUtils.*;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.ForkJoinPool.commonPool;
import static java.util.function.Function.identity;
Expand Down Expand Up @@ -1031,6 +1032,52 @@ void test_timeout() throws Exception {
assertEquals(n, cffuCompleteOnTimeout(completedFuture(n), anotherN, defaultExecutor(), 1, TimeUnit.MILLISECONDS).get());
}

@Test
void test_safeBehavior_orTimeout() {
final Thread testThread = currentThread();

assertEquals(n, orTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertInstanceOf(TimeoutException.class, ex);
assertEquals("CompletableFutureDelayScheduler", currentThread().getName());
return n;
}).join());
assertEquals(n, cffuOrTimeout(createIncompleteFuture(), 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
// FIXME: Not TimeoutException, not compatible with orTimeout method
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(TimeoutException.class, ex.getCause());
assertNotSame(testThread, currentThread());
return n;
}).join());
assertEquals(n, cffuOrTimeout(createIncompleteFuture(), executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
// FIXME: Not TimeoutException, not compatible with orTimeout method
assertInstanceOf(CompletionException.class, ex);
assertInstanceOf(TimeoutException.class, ex.getCause());
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return n;
}).join());
}

@Test
void test_safeBehavior_completeOnTimeout() {
final Thread testThread = currentThread();

assertEquals(n, completeOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertEquals("CompletableFutureDelayScheduler", currentThread().getName());
return r;
}).join());
assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertNotSame(testThread, currentThread());
return r;
}).join());
assertEquals(n, cffuCompleteOnTimeout(createIncompleteFuture(), n, executorService, 1, TimeUnit.MILLISECONDS).handle((r, ex) -> {
assertNull(ex);
assertTrue(TestThreadPoolManager.isRunInExecutor(executorService));
return r;
}).join());
}

@Test
void test_exceptionallyCompose() throws Exception {
CompletableFuture<Object> completed = completedFuture(n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,39 @@ import kotlin.random.Random
import kotlin.random.nextULong


const val THREAD_COUNT_OF_POOL = 5
val THREAD_COUNT_OF_POOL = 4.coerceAtLeast(Runtime.getRuntime().availableProcessors() * 2)

@JvmOverloads
fun createThreadPool(threadNamePrefix: String, isForkJoin: Boolean = false): ExecutorService {
val counter = AtomicLong()
val prefix = "${threadNamePrefix}_${Random.nextULong()}-"
val prefix = "${threadNamePrefix}_${Random.nextULong()}"

val executorService = if (!isForkJoin)
ThreadPoolExecutor(
THREAD_COUNT_OF_POOL, THREAD_COUNT_OF_POOL, 1, TimeUnit.DAYS,
ArrayBlockingQueue(5000)
/* corePoolSize = */ THREAD_COUNT_OF_POOL, /* maximumPoolSize = */ THREAD_COUNT_OF_POOL,
/* keepAliveTime = */ 1, /* unit = */ TimeUnit.DAYS,
/* workQueue = */ ArrayBlockingQueue(5000)
) { r ->
Thread(r).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
isDaemon = true
}
}
else
ForkJoinPool(
THREAD_COUNT_OF_POOL, { fjPool ->
/* parallelism = */ THREAD_COUNT_OF_POOL,/* factory = */ { fjPool ->
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(fjPool).apply {
name = "$prefix${counter.getAndIncrement()}"
name = "${prefix}_${counter.getAndIncrement()}"
}
},
null, false
}, /* handler = */ null, /* asyncMode = */ false
)

return object : ExecutorService by executorService, ThreadPoolAcquaintance {
override fun isMyThread(thread: Thread): Boolean = thread.name.startsWith(prefix)

override fun unwrap(): ExecutorService = executorService

override fun toString(): String = "test ${if (isForkJoin) "ForkJoinPool" else "ThreadPoolExecutor"} $prefix"
}
}

Expand All @@ -57,8 +59,11 @@ private interface ThreadPoolAcquaintance {
fun unwrap(): ExecutorService
}

fun isRunInExecutor(executor: Executor): Boolean =
executor.doesOwnThread(currentThread())

fun assertRunInExecutor(executor: Executor) {
executor.doesOwnThread(currentThread()).shouldBeTrue()
isRunInExecutor(executor).shouldBeTrue()
}

fun assertNotRunInExecutor(executor: Executor) {
Expand Down
2 changes: 1 addition & 1 deletion cffu-core/src/test/resources/kotest.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
kotest.framework.classpath.scanning.config.disable=true
kotest.framework.classpath.scanning.autoscan.disable=true
# https://kotest.io/docs/framework/project-config.html#runtime-detection
kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig
#kotest.framework.config.fqn=io.foldright.test_utils.CffuKotestProjectConfig

0 comments on commit b122487

Please sign in to comment.