diff --git a/socketioxide/tests/acknowledgement.rs b/socketioxide/tests/acknowledgement.rs index 6dee6b5b..b2df06cc 100644 --- a/socketioxide/tests/acknowledgement.rs +++ b/socketioxide/tests/acknowledgement.rs @@ -1,31 +1,110 @@ +//! Tests for acknowledgements +//! +//! TODO: switch to `rust_socketio` when it will support ack responses mod fixture; mod utils; use fixture::{create_server, create_ws_connection}; -use futures::{FutureExt, SinkExt}; +use futures::{SinkExt, StreamExt}; use socketioxide::extract::SocketRef; +use socketioxide::{Packet, PacketData}; use tokio::sync::mpsc; +use tokio::time::Duration; use tokio_tungstenite::tungstenite::Message; -use crate::fixture::{socketio_client, socketio_client_with_handler}; +#[tokio::test] +pub async fn emit_with_ack() { + const PORT: u16 = 2100; + use Message::*; + let io = create_server(PORT).await; + let (tx, mut rx) = mpsc::channel::<[String; 1]>(4); + + io.ns("/", move |socket: SocketRef| async move { + let res = assert_ok!(socket.emit_with_ack::<[String; 1]>("test", "foo")); + let ack = assert_ok!(res.await); + assert_ok!(tx.try_send(ack.data)); + + let res = assert_ok!(socket + .timeout(Duration::from_millis(500)) + .emit_with_ack::<[String; 1]>("test", "foo")); + let ack = assert_ok!(res.await); + assert_ok!(tx.try_send(ack.data)); + }); + + let (mut stx, mut srx) = create_ws_connection(PORT).await.split(); + assert_ok!(srx.next().await.unwrap()); + assert_ok!(srx.next().await.unwrap()); + + let msg = assert_ok!(srx.next().await.unwrap()); + assert_eq!(msg, Text("421[\"test\",\"foo\"]".to_string())); + assert_ok!(stx.send(Text("431[\"oof\"]".to_string())).await); + + let ack = rx.recv().await.unwrap(); + assert_eq!(ack[0], "oof"); + + let msg = assert_ok!(srx.next().await.unwrap()); + assert_eq!(msg, Text("422[\"test\",\"foo\"]".to_string())); + assert_ok!(stx.send(Text("432[\"oof\"]".to_string())).await); + + let ack = rx.recv().await.unwrap(); + assert_eq!(ack[0], "oof"); + + assert_ok!(stx.close().await); +} #[tokio::test] -pub async fn ack_emit_single_with_ack() { +pub async fn broadcast_with_ack() { const PORT: u16 = 2100; + use Message::*; let io = create_server(PORT).await; - let (tx, mut rx) = mpsc::channel::(4); + let (tx, mut rx) = mpsc::channel::<[String; 1]>(4); + let io2 = io.clone(); io.ns("/", move |socket: SocketRef| async move { - let res = assert_ok!(socket.emit_with_ack::("test", "test")); + let res = assert_ok!(io2.emit_with_ack::<[String; 1]>("test", "foo")); let ack = assert_ok!(res.await); - tx.try_send(ack.data).unwrap(); + assert_ok!(tx.try_send(ack.data)); + + let res = assert_ok!(io2 + .timeout(Duration::from_millis(500)) + .emit_with_ack::<[String; 1]>("test", "foo")); + let ack = assert_ok!(res.await); + assert_ok!(tx.try_send(ack.data)); + + let res = assert_ok!(socket + .broadcast() + .timeout(Duration::from_millis(500)) + .emit_with_ack::<[String; 1]>("test", "foo")); + let ack = assert_ok!(res.await); + assert_ok!(tx.try_send(ack.data)); }); - let handler = |_, _| Box::pin(async move {}); - let client = socketio_client_with_handler(PORT, "test", handler, ()).await; - let client = assert_ok!(client); + // Spawn 5 clients and make them echo the ack + for _ in 0..5 { + tokio::spawn(async move { + let (mut stx, mut srx) = create_ws_connection(PORT).await.split(); + assert_ok!(srx.next().await.unwrap()); + assert_ok!(srx.next().await.unwrap()); + + while let Some(msg) = srx.next().await { + let msg = match assert_ok!(msg) { + Text(msg) => msg, + _ => panic!("Unexpected message"), + }; + let ack = match assert_ok!(Packet::try_from(msg[1..].to_string())).inner { + PacketData::Event(_, _, Some(ack)) => ack, + _ => panic!("Unexpected packet"), + }; + assert_ok!( + stx.send(Text(format!("43{}[\"oof\"]", ack.to_string()))) + .await + ); + } + }); + } - assert_eq!(rx.recv().await.unwrap(), "test"); - assert_eq!(rx.recv().await.unwrap(), "test"); - assert_ok!(client.disconnect().await); + for _ in 0..(5 * 3) { + let msg = rx.recv().await.unwrap(); + assert_eq!(msg[0], "oof"); + } }