Skip to content

Commit

Permalink
Start work on the web backend
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Apr 3, 2024
1 parent 03833e8 commit 04f635e
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions ewebsock/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{ops::ControlFlow, rc::Rc};

use crate::{EventHandler, Options, Result, WsEvent, WsMessage};

#[allow(clippy::needless_pass_by_value)]
Expand All @@ -14,7 +16,7 @@ fn string_from_js_string(s: js_sys::JsString) -> String {
///
/// When this is dropped, the connection is closed.
pub struct WsSender {
ws: Option<web_sys::WebSocket>,
socket: Option<Rc<web_sys::WebSocket>>,
}

impl Drop for WsSender {
Expand All @@ -28,13 +30,13 @@ impl Drop for WsSender {
impl WsSender {
/// Send the message to the server.
pub fn send(&mut self, msg: WsMessage) {
if let Some(ws) = &mut self.ws {
if let Some(socket) = &mut self.socket {
let result = match msg {
WsMessage::Binary(data) => {
ws.set_binary_type(web_sys::BinaryType::Blob);
ws.send_with_u8_array(&data)
socket.set_binary_type(web_sys::BinaryType::Blob);
socket.send_with_u8_array(&data)
}
WsMessage::Text(text) => ws.send_with_str(&text),
WsMessage::Text(text) => socket.send_with_str(&text),
unknown => {
panic!("Don't know how to send message: {:?}", unknown);
}
Expand All @@ -49,17 +51,17 @@ impl WsSender {
///
/// This is called automatically when the sender is dropped.
pub fn close(&mut self) -> Result<()> {
if let Some(ws) = self.ws.take() {
if let Some(socket) = self.socket.take() {
log::debug!("Closing WebSocket");
ws.close().map_err(string_from_js_value)
socket.close().map_err(string_from_js_value)
} else {
Ok(())
}
}

/// Forget about this sender without closing the connection.
pub fn forget(mut self) {
self.ws = None;
self.socket = None;
}
}

Expand All @@ -78,53 +80,72 @@ pub(crate) fn ws_connect_impl(
use wasm_bindgen::JsCast as _;

// Connect to an server
let ws = web_sys::WebSocket::new(&url).map_err(string_from_js_value)?;
let socket = web_sys::WebSocket::new(&url).map_err(string_from_js_value)?;
let socket = Rc::new(socket);

// For small binary messages, like CBOR, Arraybuffer is more efficient than Blob handling
ws.set_binary_type(web_sys::BinaryType::Arraybuffer);
socket.set_binary_type(web_sys::BinaryType::Arraybuffer);

// Allow it to be shared by the different callbacks:
let on_event: std::rc::Rc<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>> =
on_event.into();
let on_event: Rc<dyn Send + Fn(WsEvent) -> std::ops::ControlFlow<()>> = on_event.into();

// onmessage callback
{
let on_event = on_event.clone();
let (mut socket, on_event) = (socket, on_event).clone();
let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| {

Check failure on line 95 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`

Check failure on line 95 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
let (mut socket, mut on_event) = (socket, on_event).clone();

// Handle difference Text/Binary,...
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let control = if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(&abuf);
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())))
} else if let Ok(blob) = e.data().dyn_into::<web_sys::Blob>() {
// better alternative to juggling with FileReader is to use https://crates.io/crates/gloo-file
let file_reader = web_sys::FileReader::new().expect("Failed to create FileReader");
let file_reader_clone = file_reader.clone();
// create onLoadEnd callback
let on_event = on_event.clone();
let (mut socket, mut on_event) = (socket, on_event).clone();
let onloadend_cb = Closure::wrap(Box::new(move |_e: web_sys::ProgressEvent| {

Check failure on line 108 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`

Check failure on line 108 in ewebsock/src/web.rs

View workflow job for this annotation

GitHub Actions / Check wasm32

expected a closure that implements the `FnMut` trait, but this closure only implements `FnOnce`
let (mut socket, mut on_event) = (socket, on_event).clone();
let array = js_sys::Uint8Array::new(&file_reader_clone.result().unwrap());
on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
let control = on_event(WsEvent::Message(WsMessage::Binary(array.to_vec())));
if control.is_break() {
log::debug!("Closing WebSocket");
let mut socket = socket.clone();
socket.close();
}
})
as Box<dyn FnMut(web_sys::ProgressEvent)>);
file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref()));
file_reader
.read_as_array_buffer(&blob)
.expect("blob not readable");
onloadend_cb.forget();
ControlFlow::Continue(())
} else if let Ok(txt) = e.data().dyn_into::<js_sys::JsString>() {
on_event(WsEvent::Message(WsMessage::Text(string_from_js_string(
txt,
))));
))))
} else {
log::debug!("Unknown websocket message received: {:?}", e.data());
on_event(WsEvent::Message(WsMessage::Unknown(string_from_js_value(
e.data(),
))));
))))
};

if control.is_break() {
socket.close();
e.target()
.unwrap()
.dyn_into::<web_sys::WebSocket>()
.unwrap()
.close()
.unwrap();
}
}) as Box<dyn FnMut(web_sys::MessageEvent)>);

// set message event handler on WebSocket
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
socket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));

// forget the callback to keep it alive
onmessage_callback.forget();
Expand All @@ -140,26 +161,31 @@ pub(crate) fn ws_connect_impl(
);
on_event(WsEvent::Error(error_event.message()));
}) as Box<dyn FnMut(web_sys::ErrorEvent)>);
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
socket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
}

{
let on_event = on_event.clone();
let (socket, on_event) = (socket, on_event).clone();
let onopen_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Opened);
let control = on_event(WsEvent::Opened);
if control.is_break() {
socket.close();
}
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
socket.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
}

{
let onclose_callback = Closure::wrap(Box::new(move |_| {
on_event(WsEvent::Closed);
}) as Box<dyn FnMut(wasm_bindgen::JsValue)>);
ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
socket.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
}

Ok(WsSender { ws: Some(ws) })
Ok(WsSender {
socket: Some(socket),
})
}

0 comments on commit 04f635e

Please sign in to comment.