Skip to content

Commit

Permalink
Update tokio-tungstenite dependency to 0.26
Browse files Browse the repository at this point in the history
This change updates the tokio-tungstenite dependency we rely on to
version 0.26. Upstream switched to a more efficient representation of
message contents. However, in the theme of abstracting over their
implementation details, we keep our Message enum the as it has been,
which results in some arguably avoidable allocations. We can revisit
this decision in the future, should it become an issue.
  • Loading branch information
d-e-s-o committed Dec 29, 2024
1 parent ea2d352 commit 1f83033
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
--request POST \
--url https://api.github.com/repos/${{ github.repository }}/releases \
--header "Accept: application/vnd.github+json" \
--header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}"\
--header "Authorization: Bearer ${{ secrets.GITHUB_TOKEN }}" \
--header "X-GitHub-Api-Version: 2022-11-28" \
--data "{
\"tag_name\":\"v${version}\",
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Unreleased
----------
- Bumped `tokio-tungstenite` dependency to `0.24`
- Bumped `tokio-tungstenite` dependency to `0.26`


0.13.0
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ test = ["tokio-tungstenite/connect"]
futures = {version = "0.3", default-features = false, features = ["async-await", "std"]}
tokio = {version = "1.8", default-features = false, features = ["rt", "time"]}
tracing = {version = "0.1", default-features = false, features = ["std"]}
tokio-tungstenite = {version = "0.24", default-features = false}
tokio-tungstenite = {version = "0.26", default-features = false}

[dev-dependencies]
rand = {version = "0.8", default-features = false, features = ["std", "std_rng"]}
serde = {version = "1.0", features = ["derive"]}
test-log = {version = "0.2.8", default-features = false, features = ["trace"]}
tokio = {version = "1.8", default-features = false, features = ["rt", "macros", "net", "test-util"]}
tokio-tungstenite = {version = "0.24", default-features = false, features = ["connect", "url"]}
tokio-tungstenite = {version = "0.26", default-features = false, features = ["connect", "url"]}
tracing-subscriber = {version = "0.3", default-features = false, features = ["ansi", "env-filter", "fmt", "local-time"]}
url = "2.0"

Expand Down
45 changes: 27 additions & 18 deletions src/wrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt::Formatter;
use std::fmt::Result as FmtResult;
use std::io;
use std::marker::PhantomData;
use std::ops::Deref as _;
use std::pin::Pin;
use std::str::from_utf8 as str_from_utf8;
use std::task::Poll;
Expand All @@ -20,8 +21,10 @@ use futures::StreamExt as _;
use tokio::time::interval;
use tokio::time::Interval;
use tokio::time::MissedTickBehavior;
use tokio_tungstenite::tungstenite::Bytes;
use tokio_tungstenite::tungstenite::Error as WebSocketError;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tokio_tungstenite::tungstenite::Utf8Bytes;

use tracing::debug;
use tracing::error;
Expand Down Expand Up @@ -58,8 +61,8 @@ pub enum Message {
impl From<Message> for WebSocketMessage {
fn from(message: Message) -> Self {
match message {
Message::Text(data) => WebSocketMessage::Text(data),
Message::Binary(data) => WebSocketMessage::Binary(data),
Message::Text(data) => WebSocketMessage::Text(Utf8Bytes::from(data)),
Message::Binary(data) => WebSocketMessage::Binary(Bytes::from(data)),
}
}
}
Expand Down Expand Up @@ -180,9 +183,11 @@ impl Debug for DebugMessage<'_> {
if let Ok(s) = str_from_utf8(data) {
f.debug_tuple("Binary").field(&s).finish()
} else {
Debug::fmt(self.message, f)
f.debug_tuple("Binary").field(&data.deref()).finish()
}
},
WebSocketMessage::Ping(data) => f.debug_tuple("Ping").field(&data.deref()).finish(),
WebSocketMessage::Pong(data) => f.debug_tuple("Pong").field(&data.deref()).finish(),
_ => Debug::fmt(self.message, f),
}
}
Expand Down Expand Up @@ -260,7 +265,7 @@ impl Pinger {
// to at the last interval. We need to make sure to actually
// send a ping over the wire now to check whether our
// connection is still alive.
let message = WebSocketMessage::Ping(Vec::new());
let message = WebSocketMessage::Ping(Bytes::new());
let () = set_message(sink, &mut self.ping, message);

self.ping.advance(sink, ctx)?;
Expand Down Expand Up @@ -392,8 +397,12 @@ where
let () = this.ping.as_mut().map(Pinger::activity).unwrap_or(());

match message {
WebSocketMessage::Text(data) => break Poll::Ready(Some(Ok(Message::Text(data)))),
WebSocketMessage::Binary(data) => break Poll::Ready(Some(Ok(Message::Binary(data)))),
WebSocketMessage::Text(data) => {
break Poll::Ready(Some(Ok(Message::Text(data.to_string()))))
},
WebSocketMessage::Binary(data) => {
break Poll::Ready(Some(Ok(Message::Binary(data.to_vec()))))
},
WebSocketMessage::Ping(_) => {
// Ping messages are automatically and transparently
// responded to by tungstenite.
Expand Down Expand Up @@ -485,16 +494,16 @@ mod tests {
/// messages.
#[test]
fn debug_websocket_message() {
let message = WebSocketMessage::Binary(b"this is a test".to_vec());
let message = WebSocketMessage::Binary(Bytes::from(b"this is a test".as_slice()));
let expected = r#"Binary("this is a test")"#;
assert_eq!(format!("{:?}", debug_message(&message)), expected);

// Also try with some invalid Unicode.
let message = WebSocketMessage::Binary([0xf0, 0x90, 0x80].to_vec());
let message = WebSocketMessage::Binary(Bytes::from([0xf0, 0x90, 0x80].as_slice()));
let expected = r#"Binary([240, 144, 128])"#;
assert_eq!(format!("{:?}", debug_message(&message)), expected);

let message = WebSocketMessage::Ping(Vec::new());
let message = WebSocketMessage::Ping(Bytes::new());
let expected = r#"Ping([])"#;
assert_eq!(format!("{:?}", debug_message(&message)), expected);
}
Expand Down Expand Up @@ -576,11 +585,11 @@ mod tests {
let mut stream = stream.fuse();

// Ping.
stream.send(WebSocketMessage::Ping(Vec::new())).await?;
stream.send(WebSocketMessage::Ping(Bytes::new())).await?;
// Expect Pong.
assert_eq!(
stream.next().await.unwrap()?,
WebSocketMessage::Pong(Vec::new()),
WebSocketMessage::Pong(Bytes::new()),
);

let future = stream.select_next_some();
Expand Down Expand Up @@ -720,11 +729,11 @@ mod tests {
async fn send_messages() {
async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
stream
.send(WebSocketMessage::Text("42".to_string()))
.send(WebSocketMessage::Text(Utf8Bytes::from_static("42")))
.await?;
stream.send(WebSocketMessage::Pong(Vec::new())).await?;
stream.send(WebSocketMessage::Pong(Bytes::new())).await?;
stream
.send(WebSocketMessage::Text("43".to_string()))
.send(WebSocketMessage::Text(Utf8Bytes::from_static("43")))
.await?;
stream.send(WebSocketMessage::Close(None)).await?;
Ok(())
Expand All @@ -747,18 +756,18 @@ mod tests {
#[ignore = "stress test; test takes a long time"]
async fn stress_stream() {
async fn test(mut stream: WebSocketStream) -> Result<(), WebSocketError> {
fn random_buf() -> Vec<u8> {
fn random_buf() -> Bytes {
let len = (0..32).choose(&mut thread_rng()).unwrap();
let mut vec = Vec::new();
vec.extend((0..len).map(|_| thread_rng().gen::<u8>()));
vec
Bytes::from(vec)
}

fn random_str() -> String {
fn random_str() -> Utf8Bytes {
let len = (0..32).choose(&mut thread_rng()).unwrap();
let mut string = String::new();
string.extend((0..len).map(|_| thread_rng().gen::<char>()));
string
Utf8Bytes::from(string)
}

for _ in 0..50000 {
Expand Down

0 comments on commit 1f83033

Please sign in to comment.