Skip to content

Commit 806bb03

Browse files
committed
Add method to terminate a worker
1 parent 6e63094 commit 806bb03

File tree

3 files changed

+137
-66
lines changed

3 files changed

+137
-66
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -360,10 +360,11 @@
360360

361361
## `worker`
362362

363-
### Version 0.4.1
363+
### Version 0.5.0
364364

365365
- Remove Cloning on WorkerBridge (#388)
366366
- Add an example of processing transferrable types with worker (#371)
367+
- Add method to terminate a worker (#420)
367368

368369
### Version 0.4.0
369370

crates/worker/src/actor/bridge.rs

+128-19
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,102 @@ use std::rc::Weak;
88
use serde::{Deserialize, Serialize};
99

1010
use super::handler_id::HandlerId;
11+
use super::messages::FromWorker;
1112
use super::messages::ToWorker;
13+
use super::native_worker::DedicatedWorker;
1214
use super::native_worker::NativeWorkerExt;
1315
use super::traits::Worker;
14-
use super::{Callback, Shared};
16+
use super::Callback;
1517
use crate::codec::Codec;
1618

1719
pub(crate) type ToWorkerQueue<W> = Vec<ToWorker<W>>;
1820
pub(crate) type CallbackMap<W> = HashMap<HandlerId, Weak<dyn Fn(<W as Worker>::Output)>>;
21+
type PostMsg<W> = Box<dyn Fn(&DedicatedWorker, ToWorker<W>)>;
1922

20-
struct WorkerBridgeInner<W>
23+
pub(crate) struct WorkerBridgeInner<W>
2124
where
2225
W: Worker,
2326
{
2427
// When worker is loaded, queue becomes None.
25-
pending_queue: Shared<Option<ToWorkerQueue<W>>>,
26-
callbacks: Shared<CallbackMap<W>>,
27-
post_msg: Rc<dyn Fn(ToWorker<W>)>,
28+
pending_queue: RefCell<Option<ToWorkerQueue<W>>>,
29+
callbacks: RefCell<CallbackMap<W>>,
30+
native_worker: RefCell<Option<DedicatedWorker>>,
31+
post_msg: PostMsg<W>,
32+
}
33+
34+
impl<W> WorkerBridgeInner<W>
35+
where
36+
W: Worker + 'static,
37+
{
38+
pub(crate) fn new<CODEC>(native_worker: DedicatedWorker, callbacks: CallbackMap<W>) -> Rc<Self>
39+
where
40+
CODEC: Codec,
41+
W::Input: Serialize + for<'de> Deserialize<'de>,
42+
W::Output: Serialize + for<'de> Deserialize<'de>,
43+
{
44+
let worker = native_worker.clone();
45+
46+
let pending_queue = RefCell::new(Some(Vec::new()));
47+
let callbacks = RefCell::new(callbacks);
48+
let native_worker = RefCell::new(Some(native_worker));
49+
let post_msg = move |worker: &DedicatedWorker, msg: ToWorker<W>| {
50+
worker.post_packed_message::<_, CODEC>(msg)
51+
};
52+
53+
let self_ = Self {
54+
pending_queue,
55+
callbacks,
56+
native_worker,
57+
post_msg: Box::new(post_msg),
58+
};
59+
let self_ = Rc::new(self_);
60+
61+
let handler = {
62+
let bridge_inner = Rc::downgrade(&self_);
63+
// If all bridges are dropped then `self_` is dropped and `upgrade` returns `None`.
64+
move |msg: FromWorker<W>| {
65+
if let Some(bridge_inner) = Weak::upgrade(&bridge_inner) {
66+
match msg {
67+
FromWorker::WorkerLoaded => {
68+
// Set pending queue to `None`. Unless `WorkerLoaded` is
69+
// sent twice, this will always be `Some`.
70+
if let Some(pending_queue) = bridge_inner.take_queue() {
71+
// Will be `None` if the worker has been terminated.
72+
if let Some(worker) =
73+
bridge_inner.native_worker.borrow_mut().as_ref()
74+
{
75+
// Send all pending messages.
76+
for to_worker in pending_queue.into_iter() {
77+
(bridge_inner.post_msg)(worker, to_worker);
78+
}
79+
}
80+
}
81+
}
82+
FromWorker::ProcessOutput(id, output) => {
83+
let mut callbacks = bridge_inner.callbacks.borrow_mut();
84+
85+
if let Some(m) = callbacks.get(&id) {
86+
if let Some(m) = Weak::upgrade(m) {
87+
m(output);
88+
} else {
89+
// The bridge has been dropped.
90+
callbacks.remove(&id);
91+
}
92+
}
93+
}
94+
}
95+
}
96+
}
97+
};
98+
99+
worker.set_on_packed_message::<_, CODEC, _>(handler);
100+
101+
self_
102+
}
103+
104+
fn take_queue(&self) -> Option<ToWorkerQueue<W>> {
105+
self.pending_queue.borrow_mut().take()
106+
}
28107
}
29108

30109
impl<W> fmt::Debug for WorkerBridgeInner<W>
@@ -49,10 +128,24 @@ where
49128
m.push(msg);
50129
}
51130
None => {
52-
(self.post_msg)(msg);
131+
if let Some(worker) = self.native_worker.borrow().as_ref() {
132+
(self.post_msg)(worker, msg);
133+
}
53134
}
54135
}
55136
}
137+
138+
/// Terminate the worker, no more messages can be sent after this.
139+
fn terminate(&self) {
140+
if let Some(worker) = self.native_worker.borrow_mut().take() {
141+
worker.terminate();
142+
}
143+
}
144+
145+
/// Returns true if the worker is terminated.
146+
fn is_terminated(&self) -> bool {
147+
self.native_worker.borrow().is_none()
148+
}
56149
}
57150

58151
impl<W> Drop for WorkerBridgeInner<W>
@@ -66,6 +159,15 @@ where
66159
}
67160

68161
/// A connection manager for components interaction with workers.
162+
///
163+
/// Dropping this object will send a disconnect message to the worker and drop
164+
/// the callback if set, but will have no effect on forked bridges. Note that
165+
/// the worker will still receive and process any messages sent over the bridge
166+
/// up to that point, but the reply will not trigger a callback. If all forked
167+
/// bridges for a worker are dropped, the worker will be sent a destroy message.
168+
///
169+
/// To terminate the worker and stop execution immediately, use
170+
/// [`terminate`](#method.terminate).
69171
pub struct WorkerBridge<W>
70172
where
71173
W: Worker,
@@ -84,26 +186,16 @@ where
84186
self.inner.send_message(ToWorker::Connected(self.id));
85187
}
86188

87-
pub(crate) fn new<CODEC>(
189+
pub(crate) fn new(
88190
id: HandlerId,
89-
native_worker: web_sys::Worker,
90-
pending_queue: Rc<RefCell<Option<ToWorkerQueue<W>>>>,
91-
callbacks: Rc<RefCell<CallbackMap<W>>>,
191+
inner: Rc<WorkerBridgeInner<W>>,
92192
callback: Option<Callback<W::Output>>,
93193
) -> Self
94194
where
95-
CODEC: Codec,
96195
W::Input: Serialize + for<'de> Deserialize<'de>,
97196
{
98-
let post_msg = move |msg: ToWorker<W>| native_worker.post_packed_message::<_, CODEC>(msg);
99-
100197
let self_ = Self {
101-
inner: WorkerBridgeInner {
102-
pending_queue,
103-
callbacks,
104-
post_msg: Rc::new(post_msg),
105-
}
106-
.into(),
198+
inner,
107199
id,
108200
_worker: PhantomData,
109201
_cb: callback,
@@ -146,6 +238,23 @@ where
146238

147239
self_
148240
}
241+
242+
/// Immediately terminates the worker and stops any execution in progress,
243+
/// for this and all forked bridges. All messages will be dropped without
244+
/// the worker receiving them. No disconnect or destroy message is sent. Any
245+
/// messages sent after this point are dropped (from this bridge or any
246+
/// forks).
247+
///
248+
/// For more details see
249+
/// [`web_sys::Worker::terminate`](https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Worker.html#method.terminate).
250+
pub fn terminate(&self) {
251+
self.inner.terminate()
252+
}
253+
254+
/// Returns true if the worker is terminated.
255+
pub fn is_terminated(&self) -> bool {
256+
self.inner.is_terminated()
257+
}
149258
}
150259

151260
impl<W> Drop for WorkerBridge<W>

crates/worker/src/actor/spawner.rs

+7-46
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
use std::cell::RefCell;
21
use std::collections::HashMap;
32
use std::fmt;
43
use std::marker::PhantomData;
5-
use std::rc::{Rc, Weak};
4+
use std::rc::Rc;
65

76
use gloo_utils::window;
87
use js_sys::Array;
98
use serde::de::Deserialize;
109
use serde::ser::Serialize;
1110
use web_sys::{Blob, BlobPropertyBag, Url};
1211

13-
use super::bridge::{CallbackMap, WorkerBridge};
12+
use super::bridge::{WorkerBridge, WorkerBridgeInner};
1413
use super::handler_id::HandlerId;
15-
use super::messages::FromWorker;
16-
use super::native_worker::{DedicatedWorker, NativeWorkerExt};
14+
use super::native_worker::DedicatedWorker;
1715
use super::traits::Worker;
18-
use super::{Callback, Shared};
16+
use super::Callback;
1917
use crate::codec::{Bincode, Codec};
2018

2119
fn create_worker(path: &str) -> DedicatedWorker {
@@ -110,53 +108,16 @@ where
110108
W::Input: Serialize + for<'de> Deserialize<'de>,
111109
W::Output: Serialize + for<'de> Deserialize<'de>,
112110
{
113-
let pending_queue = Rc::new(RefCell::new(Some(Vec::new())));
114111
let handler_id = HandlerId::new();
115112
let mut callbacks = HashMap::new();
116113

117114
if let Some(m) = self.callback.as_ref().map(Rc::downgrade) {
118115
callbacks.insert(handler_id, m);
119116
}
120117

121-
let callbacks: Shared<CallbackMap<W>> = Rc::new(RefCell::new(callbacks));
122-
123-
let handler = {
124-
let pending_queue = pending_queue.clone();
125-
let callbacks = callbacks.clone();
126-
127-
let worker = worker.clone();
128-
129-
move |msg: FromWorker<W>| match msg {
130-
FromWorker::WorkerLoaded => {
131-
if let Some(pending_queue) = pending_queue.borrow_mut().take() {
132-
for to_worker in pending_queue.into_iter() {
133-
worker.post_packed_message::<_, CODEC>(to_worker);
134-
}
135-
}
136-
}
137-
FromWorker::ProcessOutput(id, output) => {
138-
let mut callbacks = callbacks.borrow_mut();
139-
140-
if let Some(m) = callbacks.get(&id) {
141-
if let Some(m) = Weak::upgrade(m) {
142-
m(output);
143-
} else {
144-
callbacks.remove(&id);
145-
}
146-
}
147-
}
148-
}
149-
};
150-
151-
worker.set_on_packed_message::<_, CODEC, _>(handler);
152-
153-
WorkerBridge::<W>::new::<CODEC>(
154-
handler_id,
155-
worker,
156-
pending_queue,
157-
callbacks,
158-
self.callback.clone(),
159-
)
118+
let inner = WorkerBridgeInner::<W>::new::<CODEC>(worker, callbacks);
119+
120+
WorkerBridge::<W>::new(handler_id, inner, self.callback.clone())
160121
}
161122

162123
/// Spawns a Worker.

0 commit comments

Comments
 (0)