Skip to content

Commit

Permalink
feat: implement peek methods 🫣
Browse files Browse the repository at this point in the history
  • Loading branch information
oldratlee committed Apr 27, 2024
1 parent 9c4a041 commit 4aed358
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 21 deletions.
108 changes: 88 additions & 20 deletions cffu-core/src/main/java/io/foldright/cffu/Cffu.java
Original file line number Diff line number Diff line change
Expand Up @@ -1315,8 +1315,9 @@ public Cffu<T> completeOnTimeout(@Nullable T value, long timeout, TimeUnit unit)
// - thenCompose*: T -> CompletionStage<T>
// - exceptionallyCompose*: throwable -> CompletionStage<T>
//
// - whenComplete*: (T, throwable) -> Void
// - handle*: (T, throwable) -> U
// - handle*: (T, throwable) -> U
// - whenComplete*: (T, throwable) -> Void
// - peek*: (T, throwable) -> Void
//
// NOTE about advanced meaning:
// - `compose` methods, input function argument return CompletionStage
Expand Down Expand Up @@ -1504,18 +1505,85 @@ public Cffu<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action
* @return the new Cffu
*/
@Override
public Cffu<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
public Cffu<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return reset0(cf.whenCompleteAsync(action, executor));
}

/**
* Returns a new Cffu that, when this stage completes either normally or exceptionally,
* is executed with this stage's result and exception as arguments to the supplied function.
* Peeks the result by executing the given action when this cffu completes, returns this cffu.
* <p>
* When this stage is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this stage as arguments,
* and the function's result is used to complete the returned stage.
* When this cffu is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this cffu as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect this cffu.
*
* @param action the action to perform
* @return this Cffu
* @see CompletionStage#whenComplete(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public Cffu<T> peek(BiConsumer<? super T, ? super Throwable> action) {
cf.whenComplete(action);
return this;
}

/**
* Peeks the result by executing the given action when this cffu completes,
* executes the given action using {@link #defaultExecutor()}, returns this cffu.
* <p>
* When this cffu is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this cffu as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect this cffu.
*
* @param action the action to perform
* @return this Cffu
* @see CompletionStage#whenCompleteAsync(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public Cffu<T> peekAsync(BiConsumer<? super T, ? super Throwable> action) {
cf.whenCompleteAsync(action, fac.defaultExecutor());
return this;
}

/**
* Peeks the result by executing the given action when this cffu completes,
* that executes the given action using the supplied Executor when this cffu completes, returns this cffu.
* <p>
* When this cffu is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this cffu as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect this cffu.
*
* @param action the action to perform
* @return this Cffu
* @see CompletionStage#whenCompleteAsync(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public Cffu<T> peekAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
cf.whenCompleteAsync(action, executor);
return this;
}

/**
* Returns a new Cffu that, when this cffu completes either normally or exceptionally,
* is executed with this cffu's result and exception as arguments to the supplied function.
* <p>
* When this cffu is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this cffu as arguments,
* and the function's result is used to complete the returned cffu.
*
* @param fn the function to use to compute the value of the returned Cffu
* @param <U> the function's return type
Expand All @@ -1528,13 +1596,13 @@ public <U> Cffu<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
}

/**
* Returns a new Cffu that, when this stage completes either normally or exceptionally,
* Returns a new Cffu that, when this cffu completes either normally or exceptionally,
* is executed using {@link #defaultExecutor()},
* with this stage's result and exception as arguments to the supplied function.
* with this cffu's result and exception as arguments to the supplied function.
* <p>
* When this stage is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this stage as arguments,
* and the function's result is used to complete the returned stage.
* When this Cffu is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this Cffu as arguments,
* and the function's result is used to complete the returned Cffu.
*
* @param fn the function to use to compute the value of the returned Cffu
* @param <U> the function's return type
Expand All @@ -1547,15 +1615,15 @@ public <U> Cffu<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
}

/**
* Returns a new Cffu that, when this stage completes either normally or exceptionally,
* is executed using the supplied executor, with this stage's result and exception
* Returns a new Cffu that, when this cffu completes either normally or exceptionally,
* is executed using the supplied executor, with this cffu's result and exception
* as arguments to the supplied function.
* <p>
* When this stage is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this stage as arguments,
* and the function's result is used to complete the returned stage.
* When this cffu is complete, the given function is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of this cffu as arguments,
* and the function's result is used to complete the returned cffu.
*
* @param fn the function to use to compute the value of the returned Cffu
* @param fn the function to use to compute the value of the returned cffu
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new Cffu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,82 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
return anyOfSuccess(cf1.toCompletableFuture(), cf2.toCompletableFuture()).thenApplyAsync(fn, executor);
}

////////////////////////////////////////////////////////////////////////////////
//# More new enhanced methods
////////////////////////////////////////////////////////////////////////////////

/**
* Peeks the result by executing the given action when given stage completes, returns the given stage.
* <p>
* When the given stage is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of given stage as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect the given stage.
*
* @param action the action to perform
* @return the given stage
* @see CompletionStage#whenComplete(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peek(
C cf, BiConsumer<? super T, ? super Throwable> action) {
cf.whenComplete(action);
return cf;
}

/**
* Peeks the result by executing the given action when given stage completes,
* executes the given action using given stage's default asynchronous execution facility,
* returns the given stage.
* <p>
* When the given stage is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of given stage as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect the given stage.
*
* @param action the action to perform
* @return the given stage
* @see CompletionStage#whenCompleteAsync(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peekAsync(
C cf, BiConsumer<? super T, ? super Throwable> action) {
cf.whenCompleteAsync(action);
return cf;
}

/**
* Peeks the result by executing the given action when given stage completes,
* executes the given action using the supplied Executor, returns the given stage.
* <p>
* When the given stage is complete, the given action is invoked with the result (or {@code null} if none)
* and the exception (or {@code null} if none) of given stage as arguments.
* <p>
* Unlike method {@link CompletionStage#handle handle} and like method
* {@link CompletionStage#whenComplete(BiConsumer) whenComplete},
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not,
* do <strong>NOT</strong> affect the given stage.
*
* @param action the action to perform
* @return the given stage
* @see CompletionStage#whenCompleteAsync(BiConsumer, Executor)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peekAsync(
C cf, BiConsumer<? super T, ? super Throwable> action, Executor executor) {
cf.whenCompleteAsync(action, executor);
return cf;
}

////////////////////////////////////////////////////////////////////////////////
//# Backport CF static methods
// compatibility for low Java versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* <p>
* The core class is {@link io.foldright.cffu.Cffu}. And utils for {@link java.util.concurrent.CompletableFuture},
* the key user class is {@link io.foldright.cffu.CompletableFutureUtils}
* which contains the enhanced methods for {@link java.util.concurrent.CompletableFuture}.
* which contains the enhanced and backport methods for {@link java.util.concurrent.CompletableFuture}.
*
* @see io.foldright.cffu.Cffu
* @see io.foldright.cffu.CffuFactory
Expand Down
25 changes: 25 additions & 0 deletions cffu-core/src/test/java/io/foldright/cffu/CffuTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,31 @@ void test_cffuState() {
assertEquals(CffuState.CANCELLED, incomplete.cffuState());
}

@Test
void test_peek() throws Exception {
BiConsumer<Object, Throwable> c = (v, ex) -> {
};
BiConsumer<Object, Throwable> ec = (v, ex) -> {
throw anotherRte;
};

Cffu<Object> failed = cffuFactory.failedFuture(rte);
assertSame(failed.peek(c), failed);
assertSame(failed.peekAsync(c), failed);
assertSame(failed.peekAsync(c, executorService), failed);
assertSame(failed.peek(ec), failed);
assertSame(failed.peekAsync(ec), failed);
assertSame(failed.peekAsync(ec, executorService), failed);

Cffu<Integer> success = cffuFactory.completedFuture(n);
assertEquals(n, success.peek(c).get());
assertEquals(n, success.peekAsync(c).get());
assertEquals(n, success.peekAsync(c).get());
assertEquals(n, success.peek(ec).get());
assertEquals(n, success.peekAsync(ec).get());
assertEquals(n, success.peekAsync(ec).get());
}

////////////////////////////////////////////////////////////////////////////////
//# Cffu Re-Config methods
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,35 @@ void test_either_success() throws Exception {
assertEquals(n, applyToEitherSuccessAsync(failed, cf, Function.identity(), executorService).get());
}

////////////////////////////////////////////////////////////////////////////////
//# More new enhanced methods
////////////////////////////////////////////////////////////////////////////////

@Test
void test_peek() throws Exception {
BiConsumer<Object, Throwable> c = (v, ex) -> {
};
BiConsumer<Object, Throwable> ec = (v, ex) -> {
throw anotherRte;
};

CompletableFuture<Object> failed = CompletableFuture.failedFuture(rte);
assertSame(peek(failed, c), failed);
assertSame(peekAsync(failed, c), failed);
assertSame(peekAsync(failed, c, executorService), failed);
assertSame(peek(failed, ec), failed);
assertSame(peekAsync(failed, ec), failed);
assertSame(peekAsync(failed, ec, executorService), failed);

CompletableFuture<Integer> success = completedFuture(n);
assertEquals(n, peek(success, c).get());
assertEquals(n, peekAsync(success, c).get());
assertEquals(n, peekAsync(success, c).get());
assertEquals(n, peek(success, ec).get());
assertEquals(n, peekAsync(success, ec).get());
assertEquals(n, peekAsync(success, ec).get());
}

////////////////////////////////////////////////////////////////////////////////
//# Backport CF methods
// compatibility for low Java version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,66 @@ fun <T, U> CompletionStage<out T>.applyToEitherSuccessAsync(
): CompletableFuture<U> =
CompletableFutureUtils.applyToEitherSuccessAsync(this, cf2, fn, executor)

////////////////////////////////////////////////////////////////////////////////
//# More new enhanced methods
////////////////////////////////////////////////////////////////////////////////

/**
* Peeks the result by executing the given action when this stage completes, returns this stage.
*
* When this stage is complete, the given action is invoked with the result (or `null` if none)
* and the exception (or `null` if none) of this stage as arguments.
*
* Unlike method [handle][CompletionStage.handle] and like method [whenComplete][CompletionStage.whenComplete],
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not, do **NOT** affect this stage.
*
* @param action the action to perform
* @return this stage
* @see CompletionStage.whenComplete
* @see java.util.stream.Stream.peek
*/
fun <T, C : CompletionStage<out T>> C.peek(action: BiConsumer<in T, in Throwable>): C =
CompletableFutureUtils.peek(this, action)

/**
* Peeks the result by executing the given action when this stage completes,
* executes the given action using this stage's default asynchronous execution facility, returns this stage.
*
* When this stage is complete, the given action is invoked with the result (or `null` if none)
* and the exception (or `null` if none) of this stage as arguments.
*
* Unlike method [handle][CompletionStage.handle] and like method [whenComplete][CompletionStage.whenComplete],
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not, do **NOT** affect this stage.
*
* @param action the action to perform
* @return this stage
* @see CompletionStage.whenCompleteAsync
* @see java.util.stream.Stream.peek
*/
fun <T, C : CompletionStage<out T>> C.peekAsync(action: BiConsumer<in T, in Throwable>): C =
CompletableFutureUtils.peekAsync(this, action)

/**
* Peeks the result by executing the given action when this stage completes,
* executes the given action using the supplied Executor, returns this stage.
*
* When this stage is complete, the given action is invoked with the result (or `null` if none)
* and the exception (or `null` if none) of this stage as arguments.
*
* Unlike method [handle][CompletionStage.handle] and like method [whenComplete][CompletionStage.whenComplete],
* this method is not designed to translate completion outcomes;
* whether the supplied action should not throw an exception or not, do **NOT** affect this stage.
*
* @param action the action to perform
* @return this stage
* @see CompletionStage.whenCompleteAsync
* @see java.util.stream.Stream.peek
*/
fun <T, C : CompletionStage<out T>> C.peekAsync(action: BiConsumer<in T, in Throwable>, executor: Executor): C =
CompletableFutureUtils.peekAsync(this, action, executor)

////////////////////////////////////////////////////////////////////////////////
//# Backport CF instance methods
// compatibility for low Java version
Expand Down

0 comments on commit 4aed358

Please sign in to comment.