Skip to content

Commit

Permalink
fix receive_impl ignoring options.delay_blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk committed Oct 23, 2024
1 parent dc40f02 commit 4081659
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 3 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["ewebsock", "example_app", "echo_server"]
members = ["ewebsock", "example_app", "echo_server", "stress_test"]

[patch.crates-io]
# If you want to use the bleeding edge version of egui/eframe:
Expand Down
15 changes: 13 additions & 2 deletions ewebsock/src/native_tungstenite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHand
/// # Errors
/// All errors are returned to the caller, and NOT reported via `on_event`.
pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler) -> Result<()> {
let delay = options.delay_blocking;
let uri: tungstenite::http::Uri = url
.parse()
.map_err(|err| format!("Failed to parse URL {url:?}: {err}"))?;
Expand Down Expand Up @@ -135,7 +136,12 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler
.map_err(|err| format!("Failed to close connection: {err}"));
}

std::thread::sleep(std::time::Duration::from_millis(10));
// without the check we wouldn't yield at all on some platforms
if delay == std::time::Duration::ZERO {
std::thread::yield_now();
} else {
std::thread::sleep(delay);
}
}
}

Expand Down Expand Up @@ -287,7 +293,12 @@ pub fn ws_connect_blocking(
}

if !did_work {
std::thread::sleep(delay);
// without the check we wouldn't yield at all on some platforms
if delay == std::time::Duration::ZERO {
std::thread::yield_now();
} else {
std::thread::sleep(delay);
}
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions stress_test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "stress_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = "1"
tungstenite = { workspace = true }

[lints]
workspace = true
94 changes: 94 additions & 0 deletions stress_test/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#![allow(clippy::unwrap_used, clippy::disallowed_methods)] // We are just testing here.

use std::net::TcpListener;
use std::net::TcpStream;
use std::time::Instant;

use tungstenite::Message;

fn main() {
let args = std::env::args().skip(1).collect::<Vec<_>>();
let [kind, action, address] = &args[..] else {
eprintln!("Usage: stress_test <tcp|ws> <send|recv> <address>");
return;
};
match (kind.as_str(), action.as_str()) {
("tcp", "send") => send_tcp(address),
("tcp", "recv") => recv_tcp(address),
("ws", "send") => send_ws(address),
("ws", "recv") => recv_ws(address),
_ => {
eprintln!("Usage: stress_test <tcp|ws> <send|recv> <address>");
}
}
}

fn send_tcp(address: &str) {
use std::io::Write as _;

println!("Connecting to: {address}");
let mut stream = TcpStream::connect(address).unwrap();

println!("Sending 1M messages");
let start = Instant::now();
for i in 0..1_000_000 {
stream.write_all(&vec![i as u8; 4 * 1024]).unwrap();
}
let duration = start.elapsed();
println!("Sent all messages in {}ms", duration.as_millis());
}

fn recv_tcp(address: &str) {
use std::io::Read as _;

let server = TcpListener::bind(address).unwrap();
println!("Listening on: {address}");
println!("Waiting for client");
let (mut client, _) = server.accept().unwrap();
let mut buf = vec![0; 4 * 1024];
println!("Client connected");
let start = Instant::now();
for i in 0..1_000_000 {
client.read_exact(&mut buf).unwrap();
assert_eq!(buf[0], i as u8, "Invalid message");
}
let duration = start.elapsed();
println!("Received all messages in {}ms", duration.as_millis());
}

fn send_ws(address: &str) {
println!("Connecting to: ws://{address}");
let stream = TcpStream::connect(address).unwrap();
let (mut stream, _) = tungstenite::client(format!("ws://{address}"), stream).unwrap();
println!("{:?}", stream.get_config());

println!("Sending 1M messages");
let start = Instant::now();
for i in 0..1_000_000 {
stream
.send(Message::Binary(vec![i as u8; 4 * 1024]))
.unwrap();
}
let duration = start.elapsed();
println!("Sent all messages in {}ms", duration.as_millis());
}

fn recv_ws(address: &str) {
let server = TcpListener::bind(address).unwrap();
println!("Listening on: ws://{address}");
println!("Waiting for client");
let (client, _) = server.accept().unwrap();
let mut client = tungstenite::accept(client).unwrap();
println!("{:?}", client.get_config());
println!("Client connected");
let start = Instant::now();
for _ in 0..1_000_000 {
let message = client.read().unwrap();
assert!(
matches!(message, Message::Binary(_)),
"unexpected message: {message:?}"
);
}
let duration = start.elapsed();
println!("Received all messages in {}ms", duration.as_millis());
}

0 comments on commit 4081659

Please sign in to comment.