From ac6552ecb70f058d81dd8fafaecd4b1daba24685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Proch=C3=A1zka?= Date: Thu, 24 Oct 2024 14:55:13 +0200 Subject: [PATCH] Fix blocking receiver sleeping after every read (#48) Currently, the `ws_receiver_blocking` function would always sleep after every read. Additionally, it did not use the value from `options.delay_blocking` for the sleep delay. This would result in that thread being blocked for 10ms _after every read_ no matter what the user configured it to, meaning it would receive data at an unnecessarily slow rate. In this PR: - Instead of using `set_nonblocking` or calling `std::thread::sleep` after every read, we now use `set_read_timeout` in the non-tokio native impls. - The `delay_blocking` option is removed in favor of `read_timeout` - The default for `read_timeout` is `Some(10ms)` - The ideal thing to do here would be to not block at all and instead use native polling APIs. This work is left as a TODO for another PR. - https://github.com/rerun-io/ewebsock/issues/49 --- ewebsock/src/lib.rs | 12 ++- ewebsock/src/native_tungstenite.rs | 160 +++++++++++++++-------------- example_app/src/lib.rs | 2 + 3 files changed, 92 insertions(+), 82 deletions(-) diff --git a/ewebsock/src/lib.rs b/ewebsock/src/lib.rs index e2be0c3..98fd4df 100644 --- a/ewebsock/src/lib.rs +++ b/ewebsock/src/lib.rs @@ -147,8 +147,12 @@ pub struct Options { /// Currently only supported on native. pub subprotocols: Vec, - /// Delay blocking in ms - default 10ms - pub delay_blocking: std::time::Duration, + /// Socket read timeout. + /// + /// Reads will block forever if this is set to `None` or `Some(Duration::ZERO)`. + /// + /// Defaults to 10ms. + pub read_timeout: Option, } impl Default for Options { @@ -157,7 +161,9 @@ impl Default for Options { max_incoming_frame_size: 64 * 1024 * 1024, additional_headers: vec![], subprotocols: vec![], - delay_blocking: std::time::Duration::from_millis(10), // default value 10ms, + // let the OS schedule something else, otherwise busy-loop + // TODO: use polling on native instead + read_timeout: Some(std::time::Duration::from_millis(10)), } } } diff --git a/ewebsock/src/native_tungstenite.rs b/ewebsock/src/native_tungstenite.rs index f7f11c7..3e0b2dc 100644 --- a/ewebsock/src/native_tungstenite.rs +++ b/ewebsock/src/native_tungstenite.rs @@ -1,10 +1,14 @@ //! Native implementation of the WebSocket client using the `tungstenite` crate. +use std::net::TcpStream; use std::{ ops::ControlFlow, sync::mpsc::{Receiver, TryRecvError}, }; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::WebSocket; + use crate::tungstenite_common::into_requester; use crate::{EventHandler, Options, Result, WsEvent, WsMessage}; @@ -76,6 +80,7 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler let config = tungstenite::protocol::WebSocketConfig::from(options.clone()); let max_redirects = 3; // tungstenite default + let read_timeout = options.read_timeout; let (mut socket, response) = match tungstenite::client::connect_with_config( into_requester(uri, options), Some(config), @@ -87,6 +92,8 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler } }; + set_read_timeout(&mut socket, read_timeout)?; + log::debug!("WebSocket HTTP response code: {}", response.status()); log::trace!( "WebSocket response contains the following headers: {:?}", @@ -102,31 +109,7 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler } loop { - let control = match socket.read() { - Ok(incoming_msg) => match incoming_msg { - tungstenite::protocol::Message::Text(text) => { - on_event(WsEvent::Message(WsMessage::Text(text))) - } - tungstenite::protocol::Message::Binary(data) => { - on_event(WsEvent::Message(WsMessage::Binary(data))) - } - tungstenite::protocol::Message::Ping(data) => { - on_event(WsEvent::Message(WsMessage::Ping(data))) - } - tungstenite::protocol::Message::Pong(data) => { - on_event(WsEvent::Message(WsMessage::Pong(data))) - } - tungstenite::protocol::Message::Close(close) => { - on_event(WsEvent::Closed); - log::debug!("WebSocket close received: {close:?}"); - return Ok(()); - } - tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), - }, - Err(err) => { - return Err(format!("read: {err}")); - } - }; + let control = read_from_socket(&mut socket, on_event)?; if control.is_break() { log::trace!("Closing connection due to Break"); @@ -135,7 +118,7 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler .map_err(|err| format!("Failed to close connection: {err}")); } - std::thread::sleep(std::time::Duration::from_millis(10)); + std::thread::yield_now(); } } @@ -172,12 +155,13 @@ pub fn ws_connect_blocking( on_event: &EventHandler, rx: &Receiver, ) -> Result<()> { - let delay = options.delay_blocking; let config = tungstenite::protocol::WebSocketConfig::from(options.clone()); let max_redirects = 3; // tungstenite default let uri: tungstenite::http::Uri = url .parse() .map_err(|err| format!("Failed to parse URL {url:?}: {err}"))?; + + let read_timeout = options.read_timeout; let (mut socket, response) = match tungstenite::client::connect_with_config( into_requester(uri, options), Some(config), @@ -189,6 +173,8 @@ pub fn ws_connect_blocking( } }; + set_read_timeout(&mut socket, read_timeout)?; + log::debug!("WebSocket HTTP response code: {}", response.status()); log::trace!( "WebSocket response contains the following headers: {:?}", @@ -203,26 +189,9 @@ pub fn ws_connect_blocking( .map_err(|err| format!("Failed to close connection: {err}")); } - match socket.get_mut() { - tungstenite::stream::MaybeTlsStream::Plain(stream) => stream.set_nonblocking(true), - - // tungstenite::stream::MaybeTlsStream::NativeTls(stream) => { - // stream.get_mut().set_nonblocking(true) - // } - #[cfg(feature = "tls")] - tungstenite::stream::MaybeTlsStream::Rustls(stream) => { - stream.get_mut().set_nonblocking(true) - } - _ => return Err(format!("Unknown tungstenite stream {:?}", socket.get_mut())), - } - .map_err(|err| format!("Failed to make WebSocket non-blocking: {err}"))?; - loop { - let mut did_work = false; - match rx.try_recv() { Ok(outgoing_message) => { - did_work = true; let outgoing_message = match outgoing_message { WsMessage::Text(text) => tungstenite::protocol::Message::Text(text), WsMessage::Binary(data) => tungstenite::protocol::Message::Binary(data), @@ -245,39 +214,7 @@ pub fn ws_connect_blocking( Err(TryRecvError::Empty) => {} }; - let control = match socket.read() { - Ok(incoming_msg) => { - did_work = true; - match incoming_msg { - tungstenite::protocol::Message::Text(text) => { - on_event(WsEvent::Message(WsMessage::Text(text))) - } - tungstenite::protocol::Message::Binary(data) => { - on_event(WsEvent::Message(WsMessage::Binary(data))) - } - tungstenite::protocol::Message::Ping(data) => { - on_event(WsEvent::Message(WsMessage::Ping(data))) - } - tungstenite::protocol::Message::Pong(data) => { - on_event(WsEvent::Message(WsMessage::Pong(data))) - } - tungstenite::protocol::Message::Close(close) => { - on_event(WsEvent::Closed); - log::debug!("Close received: {close:?}"); - return Ok(()); - } - tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), - } - } - Err(tungstenite::Error::Io(io_err)) - if io_err.kind() == std::io::ErrorKind::WouldBlock => - { - ControlFlow::Continue(()) // Ignore - } - Err(err) => { - return Err(format!("read: {err}")); - } - }; + let control = read_from_socket(&mut socket, on_event)?; if control.is_break() { log::trace!("Closing connection due to Break"); @@ -286,10 +223,75 @@ pub fn ws_connect_blocking( .map_err(|err| format!("Failed to close connection: {err}")); } - if !did_work { - std::thread::sleep(delay); + std::thread::yield_now(); + } +} + +fn read_from_socket( + socket: &mut WebSocket>, + on_event: &EventHandler, +) -> Result> { + let control = match socket.read() { + Ok(incoming_msg) => match incoming_msg { + tungstenite::protocol::Message::Text(text) => { + on_event(WsEvent::Message(WsMessage::Text(text))) + } + tungstenite::protocol::Message::Binary(data) => { + on_event(WsEvent::Message(WsMessage::Binary(data))) + } + tungstenite::protocol::Message::Ping(data) => { + on_event(WsEvent::Message(WsMessage::Ping(data))) + } + tungstenite::protocol::Message::Pong(data) => { + on_event(WsEvent::Message(WsMessage::Pong(data))) + } + tungstenite::protocol::Message::Close(close) => { + on_event(WsEvent::Closed); + log::debug!("WebSocket close received: {close:?}"); + ControlFlow::Break(()) + } + tungstenite::protocol::Message::Frame(_) => ControlFlow::Continue(()), + }, + // If we get `WouldBlock`, then the read timed out. + // Windows may emit `TimedOut` instead. + Err(tungstenite::Error::Io(io_err)) + if io_err.kind() == std::io::ErrorKind::WouldBlock + || io_err.kind() == std::io::ErrorKind::TimedOut => + { + ControlFlow::Continue(()) // Ignore } + Err(err) => { + return Err(format!("read: {err}")); + } + }; + + Ok(control) +} + +fn set_read_timeout( + s: &mut WebSocket>, + value: Option, +) -> Result<()> { + // zero timeout is the same as no timeout + if value.is_none() || value.is_some_and(|value| value.is_zero()) { + return Ok(()); } + + match s.get_mut() { + MaybeTlsStream::Plain(s) => { + s.set_read_timeout(value) + .map_err(|err| format!("failed to set read timeout: {err}"))?; + } + #[cfg(feature = "tls")] + MaybeTlsStream::Rustls(s) => { + s.get_mut() + .set_read_timeout(value) + .map_err(|err| format!("failed to set read timeout: {err}"))?; + } + _ => {} + }; + + Ok(()) } #[test] diff --git a/example_app/src/lib.rs b/example_app/src/lib.rs index 38e3d8a..19f4211 100644 --- a/example_app/src/lib.rs +++ b/example_app/src/lib.rs @@ -1,3 +1,5 @@ +//! Example application. + mod app; pub use app::ExampleApp;