Skip to content

Commit

Permalink
更新 pub-sub.md
Browse files Browse the repository at this point in the history
  • Loading branch information
BERADQ authored Mar 25, 2024
1 parent e1884fc commit c12e54c
Showing 1 changed file with 25 additions and 45 deletions.
70 changes: 25 additions & 45 deletions content/docs/default-implement/pub-sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,83 +18,63 @@ toc = true
top = false
+++

## kokoro-flume-channel
既然发布订阅模式很容易实现动态特性,那么 Kokoro 也就没有理由不具备这种能力了。

所以我们有 `kokoro-flume-channel (下称 flume-channel)` 实现以 [flume](https://github.com/zesterer/flume) 为核心的发布-订阅模式。

[flume](https://github.com/zesterer/flume) 的具体特性,请查看其[仓库](https://github.com/zesterer/flume)

我们对 flume 进行了简单的包装,实现了简单的发布-订阅模式。

### 使用例
### kokoro-flume-channel
鉴于发布-订阅模式支持动态特性,Kokoro 也采用了这种模式。我们提供了 `kokoro-flume-channel`(以下简称 `flume-channel`),它基于 [flume](https://github.com/zesterer/flume) 库实现了进程内的发布-订阅模式。`flume` 库的详细特性可在其[官方仓库](https://github.com/zesterer/flume)中查看。我们对 `flume` 进行了封装,以简化发布-订阅模式的实现。

#### 示例用法
```rust
/// 该函数也是 flume-channel 所提供,
/// 用于实例化一个 `Mode` 为 `MPSC` 的 `Context`
/// 其 `Resources` 为 `()`
// flume-channel 提供的函数,用于创建一个 `Mode` 为 `MPSC` 的 `Context`,其 `Resources` 为 `()`
let ctx = channel_ctx();

/// 注册一个 `Subscriber`
ctx.subscribe(..)
// 注册一个 `Subscriber`
ctx.subscribe(..);
```

这里需要解释,何为 `Subscriber`

其为 `trait` ,所以实现了 `trait Subscriber` 的任何类型,都可以被注册为订阅者。

默认实现有:
`Subscriber` 是一个 `trait`,任何实现了 `trait Subscriber` 的类型都可以注册为订阅者。默认实现包括:

1. `FnMut()`
2. `FnMut(&Context)`
3. `FnMut(&Event)`
4. `FnMut(&Context, &Event)`
3. `FnMut(Query)`
4. `FnMut(&Context, Query)`

实现了 `trait Event``trait EventID` 的类型可被订阅,`Subscriber``sub(&Event)` 决定了是否订阅该类型。在 `FuMut` 中,参数类型决定了其订阅类型。
若未给出类型,则默认订阅了 `PhantomEvent`
订阅者被执行的时机由 `Query` 决定,有关 `Query` 的相关信息,请参阅 [关于订阅查询 (待补充)](#)

```rust
/// 事件 `Hello`
// 定义事件 `Hello`
#[derive(Event)]
struct Hello(String);

fn foo(e:& Hello){
fn foo(e: &Hello) {
println!("{}", e.0);
}

let ctx = channel_ctx();
ctx.subscribe(foo); /// 订阅了事件 `Hello`
ctx.publish(Hello("Hello World".to_string())); /// 发布了事件 `Hello`
ctx.subscribe(foo); // 订阅事件 `Hello`
ctx.publish(Hello("Hello World".to_string())); // 发布事件 `Hello`
```

此时订阅者并不会被执行,因为发布的原理其实就是 `sender.send`,很明显还需要调用 `receiver.recv`。所以就有:

1. `ctx.run()` 用于迭代 `receiver` (会阻塞线程)
2. `ctx.next()` 用于单次 `recv`
3. `ctx.run_no_block()` 用于非阻塞迭代 `receiver`
在上述代码中,订阅者不会立即执行,因为发布操作本质上是 `sender.send`,还需要调用 `receiver.recv`。因此,我们提供了以下方法:

2、3 暂未实现
1. `ctx.run()` - 迭代 `receiver`(会阻塞线程)
2. `ctx.next()` - 单次 `recv`(暂未实现)
3. `ctx.run_no_block()` - 非阻塞迭代 `receiver`(暂未实现)

最终我们实现了发布-订阅的 **Hello World**
最终,我们实现了一个简单的发布-订阅示例 **Hello World**

```rust
#[derive(Event)]
struct Hello(String);
fn foo(e:& Hello){
fn foo(e: &Hello) {
println!("{}", e.0);
}
let ctx = channel_ctx();
ctx.subscribe(foo);
ctx.publish(Hello("Hello World".to_string()));
/// 运行
ctx.run()
// 将会输出:Hello World
// 运行
ctx.run();
// 输出:Hello World
```

一个事件可以由多个发布者发布,也可以由多个订阅者订阅。需要注意,`kokoro-flume-channel` 是广播而不是单消费者。

**Hello World** 中的发布,并不是常用的发布方式。

发布者应工作在单独的线程中,用于和 `ctx.run()` 协作运行。
`kokoro-flume-channel` 中,一个事件可以由多个发布者发布,并且可以由多个订阅者订阅。需要注意的是,这是一个广播系统,而不是单一消费者模型。

至于线程应该什么时候生成,应该什么时候终止。详见 [**关于默认实现 (thread篇)**](#) (还没写)。
**Hello World** 示例中,发布操作并不是常规的发布方式。发布者应在单独的线程中工作,以便与 `ctx.run()` 协同运行。关于线程的生成和终止时机,请参阅 [**关于线程** (待补充)](#)

0 comments on commit c12e54c

Please sign in to comment.