-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpubsub.rs
85 lines (73 loc) · 2.53 KB
/
pubsub.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use std::{sync::{Arc, Mutex}, marker::PhantomData};
use anymap::{Map, any::Any};
use crossbeam::channel;
type AnyMap = Map<dyn Any + Send + Sync>;
#[derive(Clone)]
pub struct Pubsub {
subscribers: Arc<Mutex<AnyMap>>,
rx_channels: Arc<Mutex<AnyMap>>,
updaters: Arc<Mutex<Vec<Box<dyn PubsubUpdaterTrait + Send + Sync + 'static>>>>,
}
impl Pubsub {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(AnyMap::new())),
rx_channels: Arc::new(Mutex::new(AnyMap::new())),
updaters : Arc::new(Mutex::new(Vec::new())),
}
}
pub fn subscribe<M: Send + Sync + Clone + 'static>(&self) -> channel::Receiver<M> {
let (tx, rx) = channel::unbounded();
self.subscribers
.lock()
.unwrap()
.entry::<Vec<channel::Sender<M>>>()
.or_insert_with(|| {
Vec::new()
})
.push(tx)
;
rx
}
pub fn create_channel<M: Send + Sync + Clone + 'static>(&self) -> channel::Sender<M> {
let (tx, rx) = channel::unbounded();
self.rx_channels.lock().unwrap().insert::<channel::Receiver<M>>(rx);
self.updaters.lock().unwrap().push(Box::new(PubsubUpdater::<M>::new()));
tx
}
pub fn update(&mut self) {
// use clone() to satisfy the borrow checker
for updater in self.updaters.clone().lock().unwrap().iter() {
updater.update(self);
}
}
fn update_channel<M: Send + Sync + Clone + 'static>(&self) {
if let anymap::Entry::Occupied(mut entry) = self.rx_channels.lock().unwrap().entry::<channel::Receiver<M>>() {
while let Ok(msg) = entry.get_mut().try_recv() {
if let anymap::Entry::Occupied(mut subs) = self.subscribers.lock().unwrap().entry::<Vec<channel::Sender<M>>>() {
subs.get_mut().retain_mut(|sub| sub.try_send(msg.clone()).is_ok());
}
}
}
}
}
// Tried doing something like having a dyn Trait encapsulate a
trait PubsubUpdaterTrait {
fn update(&self, pubsub: &mut Pubsub);
}
#[derive(Default)]
struct PubsubUpdater<M: Send + Sync + Clone + 'static> {
_p: PhantomData<M>
}
impl<M: Sync + Send + Clone + 'static> PubsubUpdater<M> {
fn new() -> Self {
Self {
_p: PhantomData::default(),
}
}
}
impl<M: Sync + Send + Clone + 'static> PubsubUpdaterTrait for PubsubUpdater<M> {
fn update(&self, pubsub: &mut Pubsub) {
pubsub.update_channel::<M>();
}
}