From 1f830334c90b0e8790dc48c8dbd16eba90481b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20M=C3=BCller?= Date: Sun, 29 Dec 2024 09:33:58 -0800 Subject: [PATCH] Update tokio-tungstenite dependency to 0.26 This change updates the tokio-tungstenite dependency we rely on to version 0.26. Upstream switched to a more efficient representation of message contents. However, in the theme of abstracting over their implementation details, we keep our Message enum the as it has been, which results in some arguably avoidable allocations. We can revisit this decision in the future, should it become an issue. --- .github/workflows/publish.yml | 2 +- CHANGELOG.md | 2 +- Cargo.toml | 4 ++-- src/wrap.rs | 45 +++++++++++++++++++++-------------- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4a23b64..4cc9c88 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -50,7 +50,7 @@ jobs: --request POST \ --url https://api.github.com/repos/${{ github.repository }}/releases \ --header "Accept: application/vnd.github+json" \ - --header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}"\ + --header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ --header "X-GitHub-Api-Version: 2022-11-28" \ --data "{ \"tag_name\":\"v${version}\", diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d82c1a..d764b5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ Unreleased ---------- -- Bumped `tokio-tungstenite` dependency to `0.24` +- Bumped `tokio-tungstenite` dependency to `0.26` 0.13.0 diff --git a/Cargo.toml b/Cargo.toml index 02c2f32..0c6c0b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,14 @@ test = ["tokio-tungstenite/connect"] futures = {version = "0.3", default-features = false, features = ["async-await", "std"]} tokio = {version = "1.8", default-features = false, features = ["rt", "time"]} tracing = {version = "0.1", default-features = false, features = ["std"]} -tokio-tungstenite = {version = "0.24", default-features = false} +tokio-tungstenite = {version = "0.26", default-features = false} [dev-dependencies] rand = {version = "0.8", default-features = false, features = ["std", "std_rng"]} serde = {version = "1.0", features = ["derive"]} test-log = {version = "0.2.8", default-features = false, features = ["trace"]} tokio = {version = "1.8", default-features = false, features = ["rt", "macros", "net", "test-util"]} -tokio-tungstenite = {version = "0.24", default-features = false, features = ["connect", "url"]} +tokio-tungstenite = {version = "0.26", default-features = false, features = ["connect", "url"]} tracing-subscriber = {version = "0.3", default-features = false, features = ["ansi", "env-filter", "fmt", "local-time"]} url = "2.0" diff --git a/src/wrap.rs b/src/wrap.rs index 1de375e..0e1581a 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -6,6 +6,7 @@ use std::fmt::Formatter; use std::fmt::Result as FmtResult; use std::io; use std::marker::PhantomData; +use std::ops::Deref as _; use std::pin::Pin; use std::str::from_utf8 as str_from_utf8; use std::task::Poll; @@ -20,8 +21,10 @@ use futures::StreamExt as _; use tokio::time::interval; use tokio::time::Interval; use tokio::time::MissedTickBehavior; +use tokio_tungstenite::tungstenite::Bytes; use tokio_tungstenite::tungstenite::Error as WebSocketError; use tokio_tungstenite::tungstenite::Message as WebSocketMessage; +use tokio_tungstenite::tungstenite::Utf8Bytes; use tracing::debug; use tracing::error; @@ -58,8 +61,8 @@ pub enum Message { impl From for WebSocketMessage { fn from(message: Message) -> Self { match message { - Message::Text(data) => WebSocketMessage::Text(data), - Message::Binary(data) => WebSocketMessage::Binary(data), + Message::Text(data) => WebSocketMessage::Text(Utf8Bytes::from(data)), + Message::Binary(data) => WebSocketMessage::Binary(Bytes::from(data)), } } } @@ -180,9 +183,11 @@ impl Debug for DebugMessage<'_> { if let Ok(s) = str_from_utf8(data) { f.debug_tuple("Binary").field(&s).finish() } else { - Debug::fmt(self.message, f) + f.debug_tuple("Binary").field(&data.deref()).finish() } }, + WebSocketMessage::Ping(data) => f.debug_tuple("Ping").field(&data.deref()).finish(), + WebSocketMessage::Pong(data) => f.debug_tuple("Pong").field(&data.deref()).finish(), _ => Debug::fmt(self.message, f), } } @@ -260,7 +265,7 @@ impl Pinger { // to at the last interval. We need to make sure to actually // send a ping over the wire now to check whether our // connection is still alive. - let message = WebSocketMessage::Ping(Vec::new()); + let message = WebSocketMessage::Ping(Bytes::new()); let () = set_message(sink, &mut self.ping, message); self.ping.advance(sink, ctx)?; @@ -392,8 +397,12 @@ where let () = this.ping.as_mut().map(Pinger::activity).unwrap_or(()); match message { - WebSocketMessage::Text(data) => break Poll::Ready(Some(Ok(Message::Text(data)))), - WebSocketMessage::Binary(data) => break Poll::Ready(Some(Ok(Message::Binary(data)))), + WebSocketMessage::Text(data) => { + break Poll::Ready(Some(Ok(Message::Text(data.to_string())))) + }, + WebSocketMessage::Binary(data) => { + break Poll::Ready(Some(Ok(Message::Binary(data.to_vec())))) + }, WebSocketMessage::Ping(_) => { // Ping messages are automatically and transparently // responded to by tungstenite. @@ -485,16 +494,16 @@ mod tests { /// messages. #[test] fn debug_websocket_message() { - let message = WebSocketMessage::Binary(b"this is a test".to_vec()); + let message = WebSocketMessage::Binary(Bytes::from(b"this is a test".as_slice())); let expected = r#"Binary("this is a test")"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); // Also try with some invalid Unicode. - let message = WebSocketMessage::Binary([0xf0, 0x90, 0x80].to_vec()); + let message = WebSocketMessage::Binary(Bytes::from([0xf0, 0x90, 0x80].as_slice())); let expected = r#"Binary([240, 144, 128])"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); - let message = WebSocketMessage::Ping(Vec::new()); + let message = WebSocketMessage::Ping(Bytes::new()); let expected = r#"Ping([])"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); } @@ -576,11 +585,11 @@ mod tests { let mut stream = stream.fuse(); // Ping. - stream.send(WebSocketMessage::Ping(Vec::new())).await?; + stream.send(WebSocketMessage::Ping(Bytes::new())).await?; // Expect Pong. assert_eq!( stream.next().await.unwrap()?, - WebSocketMessage::Pong(Vec::new()), + WebSocketMessage::Pong(Bytes::new()), ); let future = stream.select_next_some(); @@ -720,11 +729,11 @@ mod tests { async fn send_messages() { async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> { stream - .send(WebSocketMessage::Text("42".to_string())) + .send(WebSocketMessage::Text(Utf8Bytes::from_static("42"))) .await?; - stream.send(WebSocketMessage::Pong(Vec::new())).await?; + stream.send(WebSocketMessage::Pong(Bytes::new())).await?; stream - .send(WebSocketMessage::Text("43".to_string())) + .send(WebSocketMessage::Text(Utf8Bytes::from_static("43"))) .await?; stream.send(WebSocketMessage::Close(None)).await?; Ok(()) @@ -747,18 +756,18 @@ mod tests { #[ignore = "stress test; test takes a long time"] async fn stress_stream() { async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> { - fn random_buf() -> Vec { + fn random_buf() -> Bytes { let len = (0..32).choose(&mut thread_rng()).unwrap(); let mut vec = Vec::new(); vec.extend((0..len).map(|_| thread_rng().gen::())); - vec + Bytes::from(vec) } - fn random_str() -> String { + fn random_str() -> Utf8Bytes { let len = (0..32).choose(&mut thread_rng()).unwrap(); let mut string = String::new(); string.extend((0..len).map(|_| thread_rng().gen::())); - string + Utf8Bytes::from(string) } for _ in 0..50000 {