Skip to content

Commit

Permalink
Update 5.多线程通信.md
Browse files Browse the repository at this point in the history
  • Loading branch information
fyhhub authored Nov 12, 2023
1 parent 2e5e1d2 commit 7775c96
Showing 1 changed file with 97 additions and 31 deletions.
128 changes: 97 additions & 31 deletions src/rust-learn/5.多线程通信.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,75 +26,104 @@ fn main() {
+ **接收消息的操作rx.recv()会阻塞当前线程,直到读取到值,或者通道被关闭**
+ **需要使用move将tx的所有权转移到子线程的闭包中**

## 二、多发送者,多接收者
## 二、单发送者,多接收者
可以直接对接收者 for 循环,这样就不需要一个个的rec了
```rust
use std::{sync::{Arc, Weak, Barrier, mpsc}, cell::RefCell, thread::{self, JoinHandle}, time::Duration};

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
let (p, s) = mpsc::channel();
let handle = thread::spawn(move || {
p.send("你好").unwrap();
p.send("你好1").unwrap();
});
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];

for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});

println!("主线程接收:{}", s.recv().unwrap());
println!("主线程接收:{}", s.recv().unwrap());
println!("主线程结束");
for received in rx {
println!("Got: {}", received);
}
}
```

## 三、传输具有所有权的数据
## 三、多发送者,多接收者
使用发送者的 `clone`方法,就可以拷贝一个发送者
```rust
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
});

thread::spawn(move || {
let s = String::from("我,飞走咯!");
tx.send(s).unwrap();
println!("val is {}", s);
tx1.send(String::from("hi from cloned tx")).unwrap();
});

let received = rx.recv().unwrap();
println!("Got: {}", received);
for received in rx {
println!("Got: {}", received);
}
}
```

以上代码中,String底层的字符串是存储在堆上,并没有实现Copy特征,当它被发送后,会将所有权从发送端的s转移给接收端的received,之后s将无法被使用。
## 四、不阻塞的 try_recv 方法
在下面的代码中,`try_recv`并不会阻塞主线程的执行,只有当接收到消息才会打印值。增加一点延时,就可以正常接收到消息了,有点类似异步?

也就是说发送的数据,需要实现`Copy`特征


## 四、使用多发送者
```rust
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();

thread::spawn(move || {
tx.send(String::from("hi from raw tx")).unwrap();
tx.send(1).unwrap();
});
// 这里可以增加一点延时,就可以正常接收到消息了,有点类似异步?
// thread::sleep(Duration::from_millis(200));
println!("receive {:?}", rx.try_recv());
}
```

## 五、传输具有所有权的数据
```rust
use std::sync::mpsc;
use std::thread;

fn main() {
let (tx, rx) = mpsc::channel();

thread::spawn(move || {
tx1.send(String::from("hi from cloned tx")).unwrap();
let s = String::from("我,飞走咯!");
tx.send(s).unwrap();
println!("val is {}", s);
});

for received in rx {
println!("Got: {}", received);
}
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
```

如果使用多发送者,需要对发送者进行拷贝,只有这样,接受者才是同一个。否则不是同一个接收者
以上代码中,String底层的字符串是存储在堆上,并没有实现Copy特征,当它被发送后,会将所有权从发送端的s转移给接收端的received,之后s将无法被使用

也就是说发送的数据,需要实现`Copy`特征

## 五、同步channel

## 六、同步channel
**之前我们使用的都是异步通道:无论接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞**

使用`mpsc::sync_channel`可以创建同步管道。必须等待接收到消息后,才可以继续执行:

Expand All @@ -120,7 +149,44 @@ fn main() {
}
```

## 六、关闭channel
但是我们可以发现,`sync_channel`有一个参数 0, 这是用来干嘛的呢?
**该值可以用来指定同步通道的消息缓存条数,当你设定为N时,发送者就可以无阻塞的往通道中发送N条消息,当消息缓冲队列满了后,新的消息发送将被阻塞(如果没有接收者消费缓冲队列中的消息,那么第N+1条消息就将触发发送阻塞**

**因此,使用异步消息虽然能非常高效且不会造成发送线程的阻塞,但是存在消息未及时消费,最终内存过大的问题。在实际项目中,可以考虑使用一个带缓冲值的同步通道来避免这种风险。**

## 七、关闭channel
通道关闭的两个条件:发送者全部drop或接收者被drop,要结束for循环显然是要求发送者全部drop,但是由于send自身没有被drop,会导致该循环永远无法结束,最终主线程会一直阻塞。

`drop(send)` 可以关闭通道

## 八、如何传输多种类型的数据?
`let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel()` 通过这一行,我们指定了,发送和接收的类型为一个枚举值,也就代表我们可以放进去任何类型的值

```rust
use std::sync::mpsc::{self, Receiver, Sender};

enum Fruit {
Apple(u8),
Orange(String)
}

fn main() {
let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();

tx.send(Fruit::Orange("sweet".to_string())).unwrap();
tx.send(Fruit::Apple(2)).unwrap();

for _ in 0..2 {
match rx.recv().unwrap() {
Fruit::Apple(count) => println!("received {} apples", count),
Fruit::Orange(flavor) => println!("received {} oranges", flavor),
}
}
}
```

## 九、常用的开源库
如果你需要 mpmc(多发送者,多接收者)或者需要更高的性能,可以考虑第三方库:

- [**crossbeam-channel**](https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel), 老牌强库,功能较全,性能较强,之前是独立的库,但是后面合并到了`crossbeam`主仓库中
- [**flume**](https://github.com/zesterer/flume), 官方给出的性能数据某些场景要比 crossbeam 更好些

0 comments on commit 7775c96

Please sign in to comment.