Skip to content

Commit

Permalink
Merge pull request #219 from Totodore/test-socketio-client
Browse files Browse the repository at this point in the history
test(socketio): improve testing by directly using the `rust_socketio` client
  • Loading branch information
Totodore authored Dec 24, 2023
2 parents b9e22e0 + 38b0b2d commit 39bc2f9
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
axum = "0.7.2"
salvo = { version = "0.63.0", features = ["tower-compat"] }
rust_socketio = { version = "0.4.2", features = ["async"] }

[workspace.package]
version = "0.9.1"
Expand Down
1 change: 1 addition & 0 deletions socketioxide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ engineioxide = { path = "../engineioxide", features = [
"test-utils",
] }
tokio-tungstenite.workspace = true
rust_socketio.workspace = true
axum.workspace = true
salvo.workspace = true
tokio = { workspace = true, features = [
Expand Down
152 changes: 76 additions & 76 deletions socketioxide/tests/extractors.rs
Original file line number Diff line number Diff line change
@@ -1,99 +1,99 @@
use futures::SinkExt;
use socketioxide::extract::{Data, SocketRef, State};
//! Tests for extractors
use serde_json::json;
use socketioxide::extract::{Data, SocketRef, State, TryData};
use tokio::sync::mpsc;

use fixture::{create_server, create_server_with_state};

use crate::fixture::socketio_client;

mod fixture;
use fixture::{create_server, create_server_with_state, create_ws_connection};
use tokio_tungstenite::tungstenite::Message;
mod utils;

#[tokio::test]
pub async fn state_extractor() {
const PORT: u16 = 2000;
let state = 1112i32;
let io = create_server_with_state(2000, state).await;
let io = create_server_with_state(PORT, state).await;
let (tx, mut rx) = mpsc::channel::<i32>(4);
io.ns("/", move |socket: SocketRef, state: State<i32>| {
println!("Socket connected on / namespace with id: {}", socket.id);
tx.try_send(*state).unwrap();

let tx1 = tx.clone();
let tx2 = tx.clone();
let tx3 = tx.clone();
assert_ok!(tx.try_send(*state));
socket.on("test", move |State(state): State<i32>| {
println!("test event received");
tx.try_send(*state).unwrap();
});
socket.on("async_test", move |State(state): State<i32>| async move {
println!("async_test event received");
tx2.try_send(*state).unwrap();
});
// This handler should not be called
socket.on("ko_test", move |State(_): State<String>| {
println!("ko_test event received");
tx3.try_send(1213231).unwrap();
});
socket.on_disconnect(move |State(state): State<i32>| {
println!("close event received");
tx1.try_send(*state).unwrap();
assert_ok!(tx.try_send(*state))
});
});

let mut stream = create_ws_connection(2000).await;
stream
.send(Message::Text("42[\"test\", 1]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"async_test\", 2]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"ko_test\", 2]".to_string()))
.await
.unwrap();
let client = assert_ok!(socketio_client(PORT, ()).await);
assert_eq!(rx.recv().await.unwrap(), state);

assert_ok!(client.emit("test", json!("foo")).await);
assert_eq!(rx.recv().await.unwrap(), state);
assert_eq!(rx.recv().await.unwrap(), state);
stream.close(None).await.unwrap();
assert_eq!(rx.recv().await.unwrap(), state);

assert_ok!(client.disconnect().await);
}

#[tokio::test]
pub async fn data_extractor() {
let io = create_server(2001).await;
let (tx, mut rx) = mpsc::channel::<i32>(4);
io.ns("/", move |socket: SocketRef| {
println!("Socket connected on / namespace with id: {}", socket.id);
let tx1 = tx.clone();
let tx2 = tx.clone();
socket.on("test", move |Data(data): Data<i32>| {
println!("test event received");
tx1.try_send(data).unwrap();
});
socket.on("async_test", move |Data(data): Data<i32>| async move {
println!("async_test event received");
tx2.try_send(data).unwrap();
const PORT: u16 = 2001;
let io = create_server(PORT).await;
let (tx, mut rx) = mpsc::channel::<String>(4);
let tx1 = tx.clone();
io.ns("/", move |socket: SocketRef, Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
socket.on("test", move |Data(data): Data<String>| {
assert_ok!(tx.try_send(data));
});
// This handler should not be called
socket.on("ko_test", move |Data(_): Data<String>| {
println!("ko_test event received");
tx.try_send(1213231).unwrap();
});

assert_ok!(socketio_client(PORT, ()).await);
assert_ok!(socketio_client(PORT, 1321).await);

// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

let client = assert_ok!(socketio_client(PORT, "foo").await);
assert_eq!(rx.recv().await.unwrap(), "foo");

assert_ok!(client.emit("test", json!("oof")).await);
assert_eq!(rx.recv().await.unwrap(), "oof");

assert_ok!(client.emit("test", json!({ "test": 132 })).await);
// Capacity should be the same as the handler should not be called
assert_eq!(tx1.capacity(), 4);

assert_ok!(client.disconnect().await);
}

#[tokio::test]
pub async fn try_data_extractor() {
const PORT: u16 = 2002;
let io = create_server(PORT).await;
let (tx, mut rx) = mpsc::channel::<Result<String, serde_json::Error>>(4);
io.ns("/", move |s: SocketRef, TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
s.on("test", move |TryData(data): TryData<String>| {
assert_ok!(tx.try_send(data));
});
});

let mut stream = create_ws_connection(2001).await;
stream
.send(Message::Text("42[\"test\", 1]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"async_test\", 2]".to_string()))
.await
.unwrap();
stream
.send(Message::Text("42[\"ko_test\", 2]".to_string()))
.await
.unwrap();
assert_eq!(rx.recv().await.unwrap(), 1);
assert_eq!(rx.recv().await.unwrap(), 2);
stream.close(None).await.unwrap();
// Non deserializable data
assert_ok!(socketio_client(PORT, ()).await);
assert_err!(rx.recv().await.unwrap());

// Non deserializable data
assert_ok!(socketio_client(PORT, 1321).await);
assert_err!(rx.recv().await.unwrap());

let client = assert_ok!(socketio_client(PORT, "foo").await);
let res = assert_ok!(rx.recv().await.unwrap());
assert_eq!(res, "foo");

assert_ok!(client.emit("test", json!("oof")).await);
let res = assert_ok!(rx.recv().await.unwrap());
assert_eq!(res, "oof");

// Non deserializable data
assert_ok!(client.emit("test", json!({ "test": 132 })).await);
assert_err!(rx.recv().await.unwrap());

assert_ok!(client.disconnect().await);
}
32 changes: 31 additions & 1 deletion socketioxide/tests/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,23 @@ use std::{
};

use engineioxide::service::NotFoundService;
use futures::SinkExt;
use futures::{future::BoxFuture, SinkExt};
use http::Request;
use http_body_util::{BodyExt, Either, Empty, Full};
use hyper::server::conn::http1;
use hyper_util::{
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use rust_socketio::{
asynchronous::{Client as SocketIoClient, ClientBuilder},
Payload,
};
use serde::{Deserialize, Serialize};
use socketioxide::{adapter::LocalAdapter, service::SocketIoService, SocketIo};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};

/// An OpenPacket is used to initiate a connection
#[derive(Debug, Serialize, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -120,6 +125,31 @@ pub async fn create_server(port: u16) -> SocketIo {
io
}

pub async fn socketio_client_with_handler<F>(
port: u16,
event: &str,
callback: F,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error>
where
F: FnMut(Payload, SocketIoClient) -> BoxFuture<'static, ()> + Send + Sync + 'static,
{
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.on(event, callback)
.auth(auth)
.connect()
.await
}
pub async fn socketio_client(
port: u16,
auth: impl Into<serde_json::Value>,
) -> Result<SocketIoClient, rust_socketio::Error> {
ClientBuilder::new(format!("http://127.0.0.1:{}", port))
.auth(auth)
.connect()
.await
}

async fn spawn_server(port: u16, svc: SocketIoService<NotFoundService, LocalAdapter>) {
let addr = &SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let listener = TcpListener::bind(&addr).await.unwrap();
Expand Down
43 changes: 43 additions & 0 deletions socketioxide/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#![allow(dead_code)]

#[macro_export]
macro_rules! assert_ok {
($e:expr) => {
assert_ok!($e,)
};
($e:expr,) => {{
use std::result::Result::*;
match $e {
Ok(v) => v,
Err(e) => panic!("assertion failed: Err({:?})", e),
}
}};
($e:expr, $($arg:tt)+) => {{
use std::result::Result::*;
match $e {
Ok(v) => v,
Err(e) => panic!("assertion failed: Err({:?}): {}", e, format_args!($($arg)+)),
}
}};
}

#[macro_export]
macro_rules! assert_err {
($e:expr) => {
assert_err!($e,);
};
($e:expr,) => {{
use std::result::Result::*;
match $e {
Ok(v) => panic!("assertion failed: Ok({:?})", v),
Err(e) => e,
}
}};
($e:expr, $($arg:tt)+) => {{
use std::result::Result::*;
match $e {
Ok(v) => panic!("assertion failed: Ok({:?}): {}", v, format_args!($($arg)+)),
Err(e) => e,
}
}};
}

0 comments on commit 39bc2f9

Please sign in to comment.