Skip to content

Latest commit

 

History

History
274 lines (211 loc) · 15 KB

cf-functions-intro.md

File metadata and controls

274 lines (211 loc) · 15 KB

🔧 CF的功能介绍 | 💪 CF方法分类说明



1. CF的创建

通过静态工厂方法(Factory)或构造函数(Constructor)来创建CompletableFuture。这些方法是CompletableFuture链的起始。

直接创建已完成的CompletableFuture的工厂方法 (x4)

  1. completedFuture(T value):返回CF<T>
  2. completedStage(T value)J9:返回CompletionStage<T>
  3. failedFuture(Throwable ex)J9:返回CF<T>
  4. failedStage(Throwable ex)J9:返回CompletionStage<T>

说明:

  • 因为通过正常完成结果(value)或异常完成的异常(ex)创建已完成的CompletableFuture,能很快完成创建,所以并不需要用于异步执行线程池(executor)。
  • 对于completedStage/failedStage方法返回的类型是CompletionStage接口,限止了调用CompletionStage接口之外的其它方法,通过抛UnsupportedOperationException异常表示不支持。
  • 对于通过异常完成的异常(ex)的CF<T>CompletionStage<T>,结果类型T可以是任意类型。

创建异步完成的CompletableFuture的工厂方法 (x4)

  1. runAsync(Runnable runnable):返回CF<Void>
  2. runAsync(Runnable runnable, Executor executor):返回CF<Void>
  3. supplyAsync(Supplier<T> supplier):返回CF<T>
  4. supplyAsync(Supplier<T> supplier, Executor executor):返回CF<T>

说明:

  • 因为要异步运行输入的任务(Runnable/Supplier),所以需要异步执行的线程池。
  • 如果不指定executor参数,缺省是ForkJoinPool.commonPool()

注:严格的说,CompletableFuture的缺省executor所使用的选择逻辑是:

ForkJoinPool.getCommonPoolParallelism() > 1时,即Runtime.getRuntime().availableProcessors() > 2时,使用ForkJoinPool.commonPool() (现在机器的处理器个数一般都不止2个,无论线上服务器还是个人电脑);

否则使用ThreadPerTaskExecutor,即为每个任务新建一个线程来执行 🤯

具体确定的缺省逻辑,还是去查看CompletableFutureForkJoinPool类的源码实现。

allOf/anyOf静态工厂方法 (x2)

  • allOf(CompletableFuture<?>... cfs):返回CF<Void>
    • 返回的CF,当多个输入CF全部成功完成时,才成功完成;
    • 如果输入CF有一个失败的,则返回的CF立即失败,不再需要依赖其它CF完成的状态
  • anyOf(CompletableFuture<?>... cfs):返回CF<Object>
    • 返回的CF,当多个输入CF有任一个完成(无论成功完成还是失败完成),返回这个完成的输入CF的结果,不会关注后续输入CF的完成情况
    • 赛马模式

说明:

  • 虽然这2个方法是静态工厂方法,但并不是CF链的起点,而是输入多个CF,用于编排多路的流程。
    • 在功能与使用的上,应该和下面【3. 流程编排】一节的方法归类在一起。
    • 这里列上,只是为了体现出是静态工厂方法这个特点。
  • 这2个方法是在组合输入的多个CF的结果,本身复杂业务执行逻辑,逻辑简单无阻塞,所以无需Executor
  • 这2个方法所返回的CF,在结果获取上,有不方便的地方: 😔
    • 对于allOf方法,返回CF结果是Void即无内容,并没有持有多个输入CF的结果
      • allOf方法的文档给的解决方法是,再通过调用各个输入CF的结果读取方法(如join())来获得:
      • the results of the given CompletableFutures are not reflected in the returned CompletableFuture, but may be obtained by inspecting them individually.

    • 对于anyOf方法,返回CF结果类型是Object,要使用这个结果一定要做强制类型转换
    • 这些不方便的地方,在cffu库中,提供了对应的加强解决 💗


anyOf方法的赛马模式,任一个失败完成的输入CF也会导致返回CF失败完成,即使后续有成功完成的输入CF,这样的效果可能不是业务希望的 😔

业务会希望有这样的赛马模式:

  • 当多个输入CF有任一个成功完成,返回这个完成的输入CF的结果
  • 否则当所有的输入CF都失败时,返回失败
  • cffu库中,可以考虑是否要提供这种赛马模式的支持 💗

构造函数CompletableFuture<T>() (x1)

返回一个没有完成的CompletableFuture;后续可以通过显式的写方法来写入结果以完成,如complete(T value)completeExceptionally(Throwable ex)
可以后续完成体现出命名CompletableFuture(可完成的Future)。

在日常的业务开发中,更推荐使用CF来编排业务流程,几乎一定不应该使用这个构造方法。

  • 构造函数创建的CF的使用场景:
    • 在用户自己的业务逻辑线程中异步处理,并通过显式调用CF对象的写方法设置完成的结果;
    • 无需由CF关联的Executor来执行用户的业务逻辑。
  • 往往是在中间件中会有必要这样使用,比如
    • 在网络IO框架的回调(线程)中完成处理后设置CF结果。
  • 显式给CompletableFuture写入结果的方式,体现出极强灵活性与复杂性。
    • CompletableFuture编排的使用方式下层也是通过「显式写入结果的方式」来实现的。

2. CF的显式读写方法

显式结果读取方法 (x5)

  • T get() 阻塞❗,属于Future接口
    • 返回成功完成的结果;对于执行失败的情况,抛出ExecutionException异常,cause 是失败异常
  • T get(long timeout, TimeUnit unit) 阻塞❗〚1〛,属于Future接口
    • 同上
    • 如果等待超时,则抛出TimeoutException异常
  • T join() 阻塞❗️
    • 功能与T get()一样,区别是抛的不是受检异常
    • 对于执行失败的情况,抛出CompletionException异常,cause 是失败异常
  • T getNow(T valueIfAbsent)
    • 返回已正常完成CF的正常结果;如果CF不是正常完成(未完成/被取消/异常完成)则抛出IllegalStateException异常
  • T resultNow()J19,属于Future接口
    • 返回已异常完成CF的出错异常;如果CF不是异常完成(未完成/被取消/正常完成)则抛出IllegalStateException异常
  • Throwable exceptionNow()J19,属于Future接口

注:

  • 〚1〛:T get(long timeout, TimeUnit unit)如果设置的超时是0,不会BLOCKING;这个情况下往往应该调用T getNow(T valueIfAbsent)

获取任务状态的方法 (x4)

  • boolean isDone(),属于Future接口
    • 是否 完成状态
    • 注意:对于「取消」,这个方法也是返回true的;即不是运行中,则是完成的。
  • boolean isCompletedExceptionally()
    • 是否是 异常完成状态
    • 注意:对于「取消」,这个方法也是返回true的。即不是运行中或完成完成,则是异常完成
  • boolean isCancelled(),属于Future接口
    • 是否是 取消状态
  • State state()J19,属于Future接口
    • 获取完成状态
    • 对应4个枚举值:RUNNINGSUCCESSFAILEDCANCELLED

显式结果写入方法 (x5)

  • 显式写入 成功结果 (x3)
    • boolean complete(T value)
    • completeAsync(Supplier<T> supplier)J9
      • 在缺省线程池中计算结果(Supplier
    • completeAsync(Supplier<T> supplier, Executor executor)J9
      • 在指定的线程池中计算结果(Supplier
  • 显式写入 失败结果/异常 (x1)
    • boolean completeExceptionally(Throwable ex)
  • 取消任务 (x1)
    • boolean cancel(boolean mayInterruptIfRunning),属于Future接口

3. CF的流程编排

WIP...

简单then方法 (x9)

  • thenRun(Runnable action)
    • thenRunAsync(Runnable action)
    • thenRunAsync(Runnable action, Executor executor)
  • thenAccept(Consumer<? super T> action)
    • thenAcceptAsync(Consumer<? super T> action)
    • thenAcceptAsync(Consumer<? super T> action, Executor executor)
  • thenApply(Function<? super T, ? extends U> fn)
    • thenApplyAsync(Function<? super T, ? extends U> fn)
    • thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

两个都完成 - Both (x9)

  • runAfterBoth(CompletionStage<?> other, Runnable action)
    • runAfterBothAsync(CompletionStage<?> other, Runnable action)
    • runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
  • thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    • thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
    • thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
  • thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
    • thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
    • thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)

两个任一完成 -Either (x9)

  • runAfterEither*
  • acceptEither*
  • applyToEither*

错误处理 (x6)

  • exceptionally(Function<Throwable, ? extends T> fn)
    • exceptionallyAsync(Function<Throwable, ? extends T> fn)
    • exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor)

超时控制 (x2)

  • completeOnTimeout(T value, long timeout, TimeUnit unit)J9
    • 如果超时了,返回的CF会成功,结果是指定的值
  • orTimeout(long timeout, TimeUnit unit)J9
    • 如果超时了,返回的CF会失败,失败异常是TimeoutException

延时执行与超时控制强相关,放在这一节里。 (实现超时控制 使用了延时执行功能)

  • delayedExecutor(long delay, TimeUnit unit, Executor executor)J9
    • 返回延时执行的包装Executor

高阶方法 (x12)

  • thenCompose*
  • exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn)
    • exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn)
    • exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)
  • whenComplete*
  • handle*

4. 其它辅助类方法

CF的功能使用上,这些方法不是必须的。

但通过这些CF的非功能方法可以

  • 提升实现的安全性
    • 如防御式拷贝防止被使用方意外写结果
  • 获取额外信息
    • 如用于监控
  • ……

转换方法

  • toCompletableFuture(),属于CompletionStage接口
    • 转换一个普通的CF,比如
      • 不再是MinimalCompletionStage,可以做显式的写操作
    • 如果对象已经是普通的CF,则会返回this
  • CompletionStage<T> minimalCompletionStage()J9〚1〛
    • 转换一个MinimalCompletionStage,限制CompletionStage接口之外的方法,不能做显式写操作
  • CompletableFuture<T> copy()
    • 生成一个(防御性)拷贝
    • 对返回的CF做写操作,不会影响原来的CF

属性查看/子类扩展方法

  • int getNumberOfDependents()
    • 返回依赖这个CFCF个数,可以用于监控
  • Executor defaultExecutor()J9
    • 返回缺省的Executor
    • 主要是用于 CompletableFuture子类的模板方法,扩展用
  • CompletableFuture<U> newIncompleteFuture()J9〚1〛
    • 主要是用于 CompletableFuture子类的模板方法,扩展用
    • 业务使用中,不需要使用这个方法;如果要用,推荐使用new CompletableFuture()

注:

  • 〚1〛:CompletableFuture<U> newIncompleteFuture()功能与CompletableFuture<T>()是一样,实际上代码实现就只是调用构造函数。
    • 相比构造函数,工厂方法形式的一个好处是可以无需指定泛型参数;在很多库的API中都可以看到这样的设计方式。

强制改写完成结果的后门方法

  • void obtrudeValue(T value)
    • 强制设置成功结果为value,可以多次改写
  • void obtrudeException(Throwable ex)
    • 强制设置失败异常为ex,可以多次改写