Skip to content

Commit

Permalink
fix the safe behavior of timeout* methods ⏳ 🐞
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Jun 3, 2024
1 parent ffedf19 commit 81f7cfe
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
10 changes: 10 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,11 @@ public Cffu<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor
* if not otherwise completed before the given timeout.
* <p>
* Uses {@link #defaultExecutor()} as {@code executorWhenTimeout}.
* <p>
* <strong>CAUTION:<br></strong>
* If the wait timed out, the returned Cffu complete exceptionally with a CompletionException holding
* the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException}
* like {@link #unsafeOrTimeout(long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
Expand All @@ -959,6 +964,11 @@ public Cffu<T> orTimeout(long timeout, TimeUnit unit) {
/**
* Exceptionally completes this Cffu with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong>
* If the wait timed out, the returned Cffu complete exceptionally with a CompletionException holding
* the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException}
* like {@link #unsafeOrTimeout(long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}.
*
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.IS_IN_CF_DELAYER_THREAD;
import static io.foldright.cffu.Delayer.atCfDelayerThread;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

Expand Down Expand Up @@ -1487,6 +1487,11 @@ C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executo
* if not otherwise completed before the given timeout.
* <p>
* Uses CompletableFuture's default asynchronous execution facility as {@code executorWhenTimeout}.
* <p>
* <strong>CAUTION:<br></strong>
* If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException holding
* the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException}
* like {@link #orTimeout(CompletableFuture, long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}.
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
* @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter
Expand All @@ -1500,6 +1505,11 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(C cf, long timeou
/**
* Exceptionally completes given CompletableFuture with a {@link TimeoutException}
* if not otherwise completed before the given timeout.
* <p>
* <strong>CAUTION:<br></strong>
* If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException holding
* the {@link TimeoutException} as its cause; NOT a direct {@link TimeoutException}
* like {@link #orTimeout(CompletableFuture, long, TimeUnit)}/{@link CompletableFuture#orTimeout(long, TimeUnit)}.
*
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of {@code unit}
Expand All @@ -1513,7 +1523,7 @@ public static <C extends CompletableFuture<?>> C cffuOrTimeout(
requireNonNull(unit, "unit is null");

final C f = orTimeout(cf, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1589,7 +1599,7 @@ C cffuCompleteOnTimeout(C cf, @Nullable T value, Executor executorWhenTimeout, l
requireNonNull(unit, "unit is null");

final C f = completeOnTimeout(cf, value, timeout, unit);
return hopAsyncIf(f, IS_IN_CF_DELAYER_THREAD, executorWhenTimeout);
return hopExecutorIfAtCfDelayerThread(f, executorWhenTimeout);
}

/**
Expand Down Expand Up @@ -1635,12 +1645,19 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
}

@SuppressWarnings("unchecked")
private static <C extends CompletionStage<?>>
C hopAsyncIf(C cf, BooleanSupplier condition, Executor asyncExecutor) {
return (C) cf.handle((r, ex) -> condition.getAsBoolean()
? cf.handleAsync((r1, ex1) -> cf, asyncExecutor).thenCompose(x -> (CompletionStage<?>) x)
: cf
).thenCompose(x -> x);
public static <C extends CompletionStage<?>>
C hopExecutorIfAtCfDelayerThread(C cf, Executor asyncExecutor) {
final CompletionStage<Object> f = (CompletionStage<Object>) cf;

return (C) f.handle((v, ex) -> null).thenCompose(unused -> {
if (!atCfDelayerThread()) return f;

CompletableFuture<Void> signal = new CompletableFuture<>();
delayedExecutor(0, TimeUnit.SECONDS, asyncExecutor)
.execute(() -> signal.complete(null));

return signal.thenCompose(v -> f);
});
}

//# Advanced methods of CompletionStage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;


Expand All @@ -22,20 +21,13 @@
*/
@SuppressWarnings("JavadocReference")
final class Delayer {
private static final ScheduledThreadPoolExecutor delayer;

static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}

/**
* @return a Future that can be used to cancel the delayed task
* @see FutureCanceller
* @see DelayedExecutor#execute(Runnable)
*/
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
return DelayerHolder.delayer.schedule(command, delay, unit);
}

/**
Expand All @@ -55,7 +47,34 @@ public static <T> ScheduledFuture<?> delayToCompleteCf(
return delay(new CfCompleter<>(cf, value), delay, unit);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuDelayScheduler";
/**
* Checks whether execution is at the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static boolean atCfDelayerThread() {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
}

private static final String THREAD_NAME_OF_CFFU_DELAY_SCHEDULER = "CffuBuiltinDelayScheduler";

/**
* Holds {@link #delayer} scheduler as field of static inner class for lazy loading(init only when needed).
* <p>
* The lazy loading is need because {@link #atCfDelayerThread()} method of
* class {@link Delayer} is used on {@code Java 9+}.
*/
private static class DelayerHolder {
static final ScheduledThreadPoolExecutor delayer;

static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
}

private static final class DaemonThreadFactory implements ThreadFactory {
@Override
Expand All @@ -67,18 +86,6 @@ public Thread newThread(@NonNull Runnable r) {
}
}

/**
* Checks whether the action is executed in the thread of CompletableFuture/Cffu delayer.
* <p>
* The constant {@code "CompletableFutureDelayScheduler"} is defined
* at {@link CompletableFuture.Delayer.DaemonThreadFactory}.
*/
@SuppressWarnings("JavadocReference")
static final BooleanSupplier IS_IN_CF_DELAYER_THREAD = () -> {
final String name = Thread.currentThread().getName();
return "CompletableFutureDelayScheduler".equals(name) || THREAD_NAME_OF_CFFU_DELAY_SCHEDULER.equals(name);
};

private Delayer() {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,9 @@ fun <T, C : CompletionStage<in T>> C.exceptionallyAsync(fn: Function<Throwable,
*
* Uses CompletableFuture's default asynchronous execution facility as `executorWhenTimeout`.
*
* **CAUTION:** If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException
* holding the [TimeoutException] as its cause; NOT a direct [TimeoutException] like [orTimeout]/[CompletableFuture.orTimeout].
*
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
* @return the new CompletableFuture
Expand All @@ -782,6 +785,9 @@ fun <T, C : CompletableFuture<out T>> C.cffuOrTimeout(timeout: Long, unit: TimeU
* Exceptionally completes this CompletableFuture with a TimeoutException
* if not otherwise completed before the given timeout.
*
* **CAUTION:** If the wait timed out, the returned CompletableFuture complete exceptionally with a CompletionException
* holding the [TimeoutException] as its cause; NOT a direct [TimeoutException] like [orTimeout]/[CompletableFuture.orTimeout].
*
* @param executorWhenTimeout the async executor when triggered by timeout
* @param timeout how long to wait before completing exceptionally with a TimeoutException, in units of `unit`
* @param unit a `TimeUnit` determining how to interpret the `timeout` parameter
Expand Down Expand Up @@ -930,8 +936,8 @@ fun <T, C : CompletionStage<in T>> C.exceptionallyComposeAsync(
* .join();
* ```
*
* **CAUTION:** if the wait timed out, this method throws an (unchecked) CompletionException with the TimeoutException as its cause;
* NOT throws a (checked) TimeoutException like [CompletableFuture.get].
* **CAUTION:** If the wait timed out, this method throws an (unchecked) CompletionException
* with the TimeoutException as its cause; NOT throws a (checked) TimeoutException like [CompletableFuture.get].
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
Expand Down

0 comments on commit 81f7cfe

Please sign in to comment.