diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4a23b64..4cc9c88 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -50,7 +50,7 @@ jobs: --request POST \ --url https://api.github.com/repos/${{ github.repository }}/releases \ --header "Accept: application/vnd.github+json" \ - --header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}"\ + --header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \ --header "X-GitHub-Api-Version: 2022-11-28" \ --data "{ \"tag_name\":\"v${version}\", diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d82c1a..d764b5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ Unreleased ---------- -- Bumped `tokio-tungstenite` dependency to `0.24` +- Bumped `tokio-tungstenite` dependency to `0.26` 0.13.0 diff --git a/Cargo.toml b/Cargo.toml index 02c2f32..0c6c0b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,14 @@ test = ["tokio-tungstenite/connect"] futures = {version = "0.3", default-features = false, features = ["async-await", "std"]} tokio = {version = "1.8", default-features = false, features = ["rt", "time"]} tracing = {version = "0.1", default-features = false, features = ["std"]} -tokio-tungstenite = {version = "0.24", default-features = false} +tokio-tungstenite = {version = "0.26", default-features = false} [dev-dependencies] rand = {version = "0.8", default-features = false, features = ["std", "std_rng"]} serde = {version = "1.0", features = ["derive"]} test-log = {version = "0.2.8", default-features = false, features = ["trace"]} tokio = {version = "1.8", default-features = false, features = ["rt", "macros", "net", "test-util"]} -tokio-tungstenite = {version = "0.24", default-features = false, features = ["connect", "url"]} +tokio-tungstenite = {version = "0.26", default-features = false, features = ["connect", "url"]} tracing-subscriber = {version = "0.3", default-features = false, features = ["ansi", "env-filter", "fmt", "local-time"]} url = "2.0" diff --git a/src/wrap.rs b/src/wrap.rs index 1de375e..0e1581a 100644 --- a/src/wrap.rs +++ b/src/wrap.rs @@ -6,6 +6,7 @@ use std::fmt::Formatter; use std::fmt::Result as FmtResult; use std::io; use std::marker::PhantomData; +use std::ops::Deref as _; use std::pin::Pin; use std::str::from_utf8 as str_from_utf8; use std::task::Poll; @@ -20,8 +21,10 @@ use futures::StreamExt as _; use tokio::time::interval; use tokio::time::Interval; use tokio::time::MissedTickBehavior; +use tokio_tungstenite::tungstenite::Bytes; use tokio_tungstenite::tungstenite::Error as WebSocketError; use tokio_tungstenite::tungstenite::Message as WebSocketMessage; +use tokio_tungstenite::tungstenite::Utf8Bytes; use tracing::debug; use tracing::error; @@ -58,8 +61,8 @@ pub enum Message { impl From for WebSocketMessage { fn from(message: Message) -> Self { match message { - Message::Text(data) => WebSocketMessage::Text(data), - Message::Binary(data) => WebSocketMessage::Binary(data), + Message::Text(data) => WebSocketMessage::Text(Utf8Bytes::from(data)), + Message::Binary(data) => WebSocketMessage::Binary(Bytes::from(data)), } } } @@ -180,9 +183,11 @@ impl Debug for DebugMessage<'_> { if let Ok(s) = str_from_utf8(data) { f.debug_tuple("Binary").field(&s).finish() } else { - Debug::fmt(self.message, f) + f.debug_tuple("Binary").field(&data.deref()).finish() } }, + WebSocketMessage::Ping(data) => f.debug_tuple("Ping").field(&data.deref()).finish(), + WebSocketMessage::Pong(data) => f.debug_tuple("Pong").field(&data.deref()).finish(), _ => Debug::fmt(self.message, f), } } @@ -260,7 +265,7 @@ impl Pinger { // to at the last interval. We need to make sure to actually // send a ping over the wire now to check whether our // connection is still alive. - let message = WebSocketMessage::Ping(Vec::new()); + let message = WebSocketMessage::Ping(Bytes::new()); let () = set_message(sink, &mut self.ping, message); self.ping.advance(sink, ctx)?; @@ -392,8 +397,12 @@ where let () = this.ping.as_mut().map(Pinger::activity).unwrap_or(()); match message { - WebSocketMessage::Text(data) => break Poll::Ready(Some(Ok(Message::Text(data)))), - WebSocketMessage::Binary(data) => break Poll::Ready(Some(Ok(Message::Binary(data)))), + WebSocketMessage::Text(data) => { + break Poll::Ready(Some(Ok(Message::Text(data.to_string())))) + }, + WebSocketMessage::Binary(data) => { + break Poll::Ready(Some(Ok(Message::Binary(data.to_vec())))) + }, WebSocketMessage::Ping(_) => { // Ping messages are automatically and transparently // responded to by tungstenite. @@ -485,16 +494,16 @@ mod tests { /// messages. #[test] fn debug_websocket_message() { - let message = WebSocketMessage::Binary(b"this is a test".to_vec()); + let message = WebSocketMessage::Binary(Bytes::from(b"this is a test".as_slice())); let expected = r#"Binary("this is a test")"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); // Also try with some invalid Unicode. - let message = WebSocketMessage::Binary([0xf0, 0x90, 0x80].to_vec()); + let message = WebSocketMessage::Binary(Bytes::from([0xf0, 0x90, 0x80].as_slice())); let expected = r#"Binary([240, 144, 128])"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); - let message = WebSocketMessage::Ping(Vec::new()); + let message = WebSocketMessage::Ping(Bytes::new()); let expected = r#"Ping([])"#; assert_eq!(format!("{:?}", debug_message(&message)), expected); } @@ -576,11 +585,11 @@ mod tests { let mut stream = stream.fuse(); // Ping. - stream.send(WebSocketMessage::Ping(Vec::new())).await?; + stream.send(WebSocketMessage::Ping(Bytes::new())).await?; // Expect Pong. assert_eq!( stream.next().await.unwrap()?, - WebSocketMessage::Pong(Vec::new()), + WebSocketMessage::Pong(Bytes::new()), ); let future = stream.select_next_some(); @@ -720,11 +729,11 @@ mod tests { async fn send_messages() { async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> { stream - .send(WebSocketMessage::Text("42".to_string())) + .send(WebSocketMessage::Text(Utf8Bytes::from_static("42"))) .await?; - stream.send(WebSocketMessage::Pong(Vec::new())).await?; + stream.send(WebSocketMessage::Pong(Bytes::new())).await?; stream - .send(WebSocketMessage::Text("43".to_string())) + .send(WebSocketMessage::Text(Utf8Bytes::from_static("43"))) .await?; stream.send(WebSocketMessage::Close(None)).await?; Ok(()) @@ -747,18 +756,18 @@ mod tests { #[ignore = "stress test; test takes a long time"] async fn stress_stream() { async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> { - fn random_buf() -> Vec { + fn random_buf() -> Bytes { let len = (0..32).choose(&mut thread_rng()).unwrap(); let mut vec = Vec::new(); vec.extend((0..len).map(|_| thread_rng().gen::())); - vec + Bytes::from(vec) } - fn random_str() -> String { + fn random_str() -> Utf8Bytes { let len = (0..32).choose(&mut thread_rng()).unwrap(); let mut string = String::new(); string.extend((0..len).map(|_| thread_rng().gen::())); - string + Utf8Bytes::from(string) } for _ in 0..50000 {