Skip to content

Commit

Permalink
WIP feat: implement methods completeExceptionallyAsync in `Completa…
Browse files Browse the repository at this point in the history
…bleFutureUtils` 💣 ✨

TODO add method for cffu and CFExtensions
  • Loading branch information
oldratlee committed May 23, 2024
1 parent 6bbf0af commit 027f42f
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,7 @@ C completeAsync(C cf, Supplier<? extends T> supplier, Executor executor) {
if (IS_JAVA9_PLUS) {
cf.completeAsync(supplier, executor);
} else {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletableFuture#completeAsync with small adoption
Expand All @@ -1709,6 +1710,37 @@ C completeAsync(C cf, Supplier<? extends T> supplier, Executor executor) {
return cf;
}

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the default executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return the given CompletableFuture
* @see CompletableFuture#completeExceptionally(Throwable)
*/
public static <C extends CompletableFuture<?>>
C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier) {
return completeExceptionallyAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL);
}

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the given executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the given CompletableFuture
* @see CompletableFuture#completeExceptionally(Throwable)
*/
public static <C extends CompletableFuture<?>>
C completeExceptionallyAsync(C cf, Supplier<? extends Throwable> supplier, Executor executor) {
requireNonNull(cf, "cf is null");
requireNonNull(supplier, "supplier is null");
requireNonNull(executor, "executor is null");
executor.execute(new CfExCompleterBySupplier(cf, supplier));
return cf;
}

//# Re-Config methods

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

/**
* Singleton delay scheduler, used only for starting and cancelling tasks
* <p>
* code is copied from {@link CompletableFuture.Delayer} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class Delayer {
private static final ScheduledThreadPoolExecutor delayer;

Expand Down Expand Up @@ -82,7 +85,10 @@ private Delayer() {

/**
* An executor wrapper with delayed execution.
* <p>
* code is copied from {@link CompletableFuture.DelayedExecutor} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class DelayedExecutor implements Executor {
private final long delay;
private final TimeUnit unit;
Expand All @@ -106,8 +112,11 @@ public void execute(@NonNull Runnable r) {
////////////////////////////////////////////////////////////////////////////////

/**
* Action to submit task(Runnable) to executor
* Action to submit task(Runnable) to executor.
* <p>
* code is copied from {@link CompletableFuture.TaskSubmitter} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class TaskSubmitter implements Runnable {
private final Executor executor;
private final Runnable action;
Expand All @@ -124,8 +133,11 @@ public void run() {
}

/**
* Action to cf.completeExceptionally with TimeoutException
* Action to cf.completeExceptionally with TimeoutException.
* <p>
* code is copied from {@link CompletableFuture.Timeout} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class CfTimeout implements Runnable {
private final CompletableFuture<?> cf;

Expand All @@ -141,8 +153,11 @@ public void run() {
}

/**
* Action to complete cf
* Action to complete cf.
* <p>
* code is copied from {@link CompletableFuture.DelayedCompleter} with small adoption.
*/
@SuppressWarnings("JavadocReference")
final class CfCompleter<T> implements Runnable {
private final CompletableFuture<? super T> cf;
private final T value;
Expand All @@ -159,12 +174,15 @@ public void run() {
}

/**
* Action to cancel unneeded scheduled task by Future (for example timeouts)
* Action to cancel unneeded scheduled task by Future (for example timeouts).
* <p>
* code is copied from {@link CompletableFuture.Canceller} with small adoption.
*
* @see Delayer#delay(Runnable, long, TimeUnit)
* @see Delayer#delayToTimoutCf(CompletableFuture, long, TimeUnit)
* @see Delayer#delayToCompleteCf(CompletableFuture, Object, long, TimeUnit)
*/
@SuppressWarnings("JavadocReference")
final class FutureCanceller implements BiConsumer<Object, Throwable> {
private final Future<?> f;

Expand All @@ -180,9 +198,8 @@ public void accept(Object ignore, Throwable ex) {
}

/**
* code is copied from {@code CompletableFuture#AsyncSupply} with small adoption.
* code is copied from {@code CompletableFuture.AsyncSupply} with small adoption.
*/
@SuppressWarnings("serial")
@SuppressFBWarnings("SE_BAD_FIELD")
final class CfCompleterBySupplier<T> extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
Expand Down Expand Up @@ -226,3 +243,50 @@ public void run() {
}
}
}

/**
* code is adopted from {@code CompletableFuture.AsyncSupply}.
*/
@SuppressFBWarnings("SE_BAD_FIELD")
final class CfExCompleterBySupplier extends ForkJoinTask<Void>
implements Runnable, CompletableFuture.AsynchronousCompletionTask {
private CompletableFuture<?> dep;
private Supplier<? extends Throwable> fn;

CfExCompleterBySupplier(CompletableFuture<?> dep, Supplier<? extends Throwable> fn) {
this.dep = dep;
this.fn = fn;
}

@Override
public Void getRawResult() {
return null;
}

@Override
public void setRawResult(Void v) {
}

@Override
public boolean exec() {
run();
return false;
}

@Override
public void run() {
CompletableFuture<?> d;
Supplier<? extends Throwable> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null;
fn = null;
if (!d.isDone()) {
try {
d.completeExceptionally(f.get());
} catch (Throwable ex) {
d.completeExceptionally(ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,6 @@ fun <T> CompletableFuture<T>.resultNow(): T =
* @return the exception thrown by the task
* @throws IllegalStateException if the task has not completed, the task completed normally,
* or the task was cancelled
* @see CompletableFuture.resultNow
*/
fun CompletableFuture<*>.exceptionNow(): Throwable =
CompletableFutureUtils.exceptionNow(this)
Expand Down Expand Up @@ -1152,6 +1151,29 @@ fun <T, C : CompletableFuture<in T>> C.completeAsync(supplier: Supplier<out T>):
fun <T, C : CompletableFuture<in T>> C.completeAsync(supplier: Supplier<out T>, executor: Executor): C =
CompletableFutureUtils.completeAsync(this, supplier, executor)

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the default executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return the given CompletableFuture
* @see CompletableFuture.completeExceptionally
*/
fun <C : CompletableFuture<*>> C.completeExceptionallyAsync(supplier: Supplier<out Throwable>): C =
CompletableFutureUtils.completeExceptionallyAsync(this, supplier)

/**
* If not already completed, completes given CompletableFuture with the exception result
* of the given Supplier function invoked from an asynchronous task using the given executor.
*
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @param executor the executor to use for asynchronous execution
* @return the given CompletableFuture
* @see CompletableFuture.completeExceptionally
*/
fun <C : CompletableFuture<*>> C.completeExceptionallyAsync(supplier: Supplier<out Throwable>, executor: Executor): C =
CompletableFutureUtils.completeExceptionallyAsync(this, supplier, executor)

//# Re-Config methods

/**
Expand Down

0 comments on commit 027f42f

Please sign in to comment.