通过静态工厂方法(Factory
)或构造函数(Constructor
)来创建CompletableFuture
。这些方法是CompletableFuture
链的起始。
completedFuture(T value)
:返回CF<T>
completedStage(T value)
J9:返回CompletionStage<T>
failedFuture(Throwable ex)
J9:返回CF<T>
failedStage(Throwable ex)
J9:返回CompletionStage<T>
说明:
- 因为通过正常完成结果(
value
)或异常完成的异常(ex
)创建已完成的CompletableFuture
,能很快完成创建,所以并不需要用于异步执行线程池(executor
)。 - 对于
completedStage
/failedStage
方法返回的类型是CompletionStage
接口,限止了调用CompletionStage
接口之外的其它方法,通过抛UnsupportedOperationException
异常表示不支持。 - 对于通过异常完成的异常(
ex
)的CF<T>
或CompletionStage<T>
,结果类型T
可以是任意类型。
runAsync(Runnable runnable)
:返回CF<Void>
runAsync(Runnable runnable, Executor executor)
:返回CF<Void>
supplyAsync(Supplier<T> supplier)
:返回CF<T>
supplyAsync(Supplier<T> supplier, Executor executor)
:返回CF<T>
说明:
- 因为要异步运行输入的任务(
Runnable
/Supplier
),所以需要异步执行的线程池。 - 如果不指定
executor
参数,缺省是ForkJoinPool.commonPool()
。
注:严格的说,
CompletableFuture
的缺省executor
所使用的选择逻辑是:当
ForkJoinPool.getCommonPoolParallelism() > 1
时,即Runtime.getRuntime().availableProcessors() > 2
时,使用ForkJoinPool.commonPool()
(现在机器的处理器个数一般都不止2个,无论线上服务器还是个人电脑);否则使用
ThreadPerTaskExecutor
,即为每个任务新建一个线程来执行 🤯具体确定的缺省逻辑,还是去查看
CompletableFuture
与ForkJoinPool
类的源码实现。
allOf(CompletableFuture<?>... cfs)
:返回CF<Void>
- 返回的
CF
,当多个输入CF
全部成功完成时,才成功完成; - 如果输入
CF
有一个失败的,则返回的CF
立即失败,不再需要依赖其它CF
完成的状态
- 返回的
anyOf(CompletableFuture<?>... cfs)
:返回CF<Object>
- 返回的
CF
,当多个输入CF
有任一个完成(无论成功完成还是失败完成),返回这个完成的输入CF
的结果,不会关注后续输入CF
的完成情况 - 赛马模式
- 返回的
说明:
- 虽然这2个方法是静态工厂方法,但并不是
CF
链的起点,而是输入多个CF
,用于编排多路的流程。- 在功能与使用的上,应该和下面【3. 流程编排】一节的方法归类在一起。
- 这里列上,只是为了体现出是静态工厂方法这个特点。
- 这2个方法是在组合输入的多个
CF
的结果,本身复杂业务执行逻辑,逻辑简单无阻塞,所以无需Executor
。 - 这2个方法所返回的
CF
,在结果获取上,有不方便的地方: 😔- 对于
allOf
方法,返回CF
结果是Void
即无内容,并没有持有多个输入CF
的结果allOf
方法的文档给的解决方法是,再通过调用各个输入CF
的结果读取方法(如join()
)来获得:-
the results of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually.
- 对于
anyOf
方法,返回CF
结果类型是Object
,要使用这个结果一定要做强制类型转换 -
这些不方便的地方,在
cffu
库中,提供了对应的加强解决 💗
- 对于
anyOf
方法的赛马模式,任一个失败完成的输入CF
也会导致返回CF
失败完成,即使后续有成功完成的输入CF
,这样的效果可能不是业务希望的 😔业务会希望有这样的赛马模式:
- 当多个输入
CF
有任一个成功完成,返回这个完成的输入CF
的结果- 否则当所有的输入
CF
都失败时,返回失败- 在
cffu
库中,可以考虑是否要提供这种赛马模式的支持 💗
返回一个没有完成的CompletableFuture
;后续可以通过显式的写方法来写入结果以完成,如complete(T value)
、completeExceptionally(Throwable ex)
。
可以后续完成体现出命名CompletableFuture
(可完成的Future
)。
在日常的业务开发中,更推荐使用CF
来编排业务流程,几乎一定不应该使用这个构造方法。
- 构造函数创建的
CF
的使用场景:- 在用户自己的业务逻辑线程中异步处理,并通过显式调用
CF
对象的写方法设置完成的结果; - 无需由
CF
关联的Executor
来执行用户的业务逻辑。
- 在用户自己的业务逻辑线程中异步处理,并通过显式调用
- 往往是在中间件中会有必要这样使用,比如
- 在网络
IO
框架的回调(线程)中完成处理后设置CF
结果。
- 在网络
- 显式给
CompletableFuture
写入结果的方式,体现出极强灵活性与复杂性。CompletableFuture
编排的使用方式下层也是通过「显式写入结果的方式」来实现的。
T get()
阻塞❗,属于Future
接口- 返回成功完成的结果;对于执行失败的情况,抛出
ExecutionException
异常,cause 是失败异常
- 返回成功完成的结果;对于执行失败的情况,抛出
T get(long timeout, TimeUnit unit)
阻塞❗〚1〛,属于Future
接口- 同上
- 如果等待超时,则抛出
TimeoutException
异常
T join()
阻塞❗️- 功能与
T get()
一样,区别是抛的不是受检异常 - 对于执行失败的情况,抛出
CompletionException
异常,cause 是失败异常
- 功能与
T getNow(T valueIfAbsent)
- 返回已正常完成
CF
的正常结果;如果CF
不是正常完成(未完成/被取消/异常完成)则抛出IllegalStateException
异常
- 返回已正常完成
T resultNow()
J19,属于Future
接口- 返回已异常完成
CF
的出错异常;如果CF
不是异常完成(未完成/被取消/正常完成)则抛出IllegalStateException
异常
- 返回已异常完成
Throwable exceptionNow()
J19,属于Future
接口
注:
- 〚1〛:
T get(long timeout, TimeUnit unit)
如果设置的超时是0
,不会BLOCKING;这个情况下往往应该调用T getNow(T valueIfAbsent)
。
boolean isDone()
,属于Future
接口- 是否 完成状态
- 注意:对于「取消」,这个方法也是返回
true
的;即不是运行中,则是完成的。
boolean isCompletedExceptionally()
- 是否是 异常完成状态
- 注意:对于「取消」,这个方法也是返回
true
的。即不是运行中或完成完成,则是异常完成
boolean isCancelled()
,属于Future
接口- 是否是 取消状态
State state()
J19,属于Future
接口- 获取完成状态
- 对应4个枚举值:
RUNNING
、SUCCESS
、FAILED
、CANCELLED
- 显式写入 成功结果 (x3)
boolean complete(T value)
completeAsync(Supplier<T> supplier)
J9- 在缺省线程池中计算结果(
Supplier
)
- 在缺省线程池中计算结果(
completeAsync(Supplier<T> supplier, Executor executor)
J9- 在指定的线程池中计算结果(
Supplier
)
- 在指定的线程池中计算结果(
- 显式写入 失败结果/异常 (x1)
boolean completeExceptionally(Throwable ex)
- 取消任务 (x1)
boolean cancel(boolean mayInterruptIfRunning)
,属于Future
接口
WIP...
thenRun(Runnable action)
thenRunAsync(Runnable action)
thenRunAsync(Runnable action, Executor executor)
thenAccept(Consumer<? super T> action)
thenAcceptAsync(Consumer<? super T> action)
thenAcceptAsync(Consumer<? super T> action, Executor executor)
thenApply(Function<? super T, ? extends U> fn)
thenApplyAsync(Function<? super T, ? extends U> fn)
thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBothAsync(CompletionStage<?> other, Runnable action)
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)
runAfterEither*
acceptEither*
applyToEither*
exceptionally(Function<Throwable, ? extends T> fn)
exceptionallyAsync(Function<Throwable, ? extends T> fn)
exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)
completeOnTimeout(T value, long timeout, TimeUnit unit)
J9- 如果超时了,返回的
CF
会成功,结果是指定的值
- 如果超时了,返回的
orTimeout(long timeout, TimeUnit unit)
J9- 如果超时了,返回的
CF
会失败,失败异常是TimeoutException
- 如果超时了,返回的
延时执行与超时控制强相关,放在这一节里。 (实现超时控制 使用了延时执行功能)
delayedExecutor(long delay, TimeUnit unit, Executor executor)
J9- 返回延时执行的包装
Executor
- 返回延时执行的包装
thenCompose*
exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn)
exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn)
exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)
whenComplete*
handle*
从CF
的功能使用上,这些方法不是必须的。
但通过这些CF
的非功能方法可以
- 提升实现的安全性
- 如防御式拷贝防止被使用方意外写结果
- 获取额外信息
- 如用于监控
- ……
toCompletableFuture()
,属于CompletionStage
接口- 转换一个普通的
CF
,比如- 不再是
MinimalCompletionStage
,可以做显式的写操作
- 不再是
- 如果对象已经是普通的
CF
,则会返回this
- 转换一个普通的
CompletionStage<T> minimalCompletionStage()
J9〚1〛- 转换一个
MinimalCompletionStage
,限制CompletionStage
接口之外的方法,不能做显式写操作
- 转换一个
CompletableFuture<T> copy()
- 生成一个(防御性)拷贝
- 对返回的
CF
做写操作,不会影响原来的CF
int getNumberOfDependents()
- 返回依赖这个
CF
的CF
个数,可以用于监控
- 返回依赖这个
Executor defaultExecutor()
J9- 返回缺省的
Executor
- 主要是用于
CompletableFuture
子类的模板方法,扩展用
- 返回缺省的
CompletableFuture<U> newIncompleteFuture()
J9〚1〛- 主要是用于
CompletableFuture
子类的模板方法,扩展用 - 业务使用中,不需要使用这个方法;如果要用,推荐使用
new CompletableFuture()
- 主要是用于
注:
- 〚1〛:
CompletableFuture<U> newIncompleteFuture()
功能与CompletableFuture<T>()
是一样,实际上代码实现就只是调用构造函数。- 相比构造函数,工厂方法形式的一个好处是可以无需指定泛型参数;在很多库的
API
中都可以看到这样的设计方式。
- 相比构造函数,工厂方法形式的一个好处是可以无需指定泛型参数;在很多库的
void obtrudeValue(T value)
- 强制设置成功结果为
value
,可以多次改写
- 强制设置成功结果为
void obtrudeException(Throwable ex)
- 强制设置失败异常为
ex
,可以多次改写
- 强制设置失败异常为