-
-
Notifications
You must be signed in to change notification settings - Fork 58
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test(socketio/ack): switch to raw websocket for
ack
integration tests
- Loading branch information
Showing
1 changed file
with
91 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,110 @@ | ||
//! Tests for acknowledgements | ||
//! | ||
//! TODO: switch to `rust_socketio` when it will support ack responses | ||
mod fixture; | ||
mod utils; | ||
|
||
use fixture::{create_server, create_ws_connection}; | ||
use futures::{FutureExt, SinkExt}; | ||
use futures::{SinkExt, StreamExt}; | ||
use socketioxide::extract::SocketRef; | ||
use socketioxide::{Packet, PacketData}; | ||
use tokio::sync::mpsc; | ||
use tokio::time::Duration; | ||
use tokio_tungstenite::tungstenite::Message; | ||
|
||
use crate::fixture::{socketio_client, socketio_client_with_handler}; | ||
#[tokio::test] | ||
pub async fn emit_with_ack() { | ||
const PORT: u16 = 2100; | ||
use Message::*; | ||
let io = create_server(PORT).await; | ||
let (tx, mut rx) = mpsc::channel::<[String; 1]>(4); | ||
|
||
io.ns("/", move |socket: SocketRef| async move { | ||
let res = assert_ok!(socket.emit_with_ack::<[String; 1]>("test", "foo")); | ||
let ack = assert_ok!(res.await); | ||
assert_ok!(tx.try_send(ack.data)); | ||
|
||
let res = assert_ok!(socket | ||
.timeout(Duration::from_millis(500)) | ||
.emit_with_ack::<[String; 1]>("test", "foo")); | ||
let ack = assert_ok!(res.await); | ||
assert_ok!(tx.try_send(ack.data)); | ||
}); | ||
|
||
let (mut stx, mut srx) = create_ws_connection(PORT).await.split(); | ||
assert_ok!(srx.next().await.unwrap()); | ||
assert_ok!(srx.next().await.unwrap()); | ||
|
||
let msg = assert_ok!(srx.next().await.unwrap()); | ||
assert_eq!(msg, Text("421[\"test\",\"foo\"]".to_string())); | ||
assert_ok!(stx.send(Text("431[\"oof\"]".to_string())).await); | ||
|
||
let ack = rx.recv().await.unwrap(); | ||
assert_eq!(ack[0], "oof"); | ||
|
||
let msg = assert_ok!(srx.next().await.unwrap()); | ||
assert_eq!(msg, Text("422[\"test\",\"foo\"]".to_string())); | ||
assert_ok!(stx.send(Text("432[\"oof\"]".to_string())).await); | ||
|
||
let ack = rx.recv().await.unwrap(); | ||
assert_eq!(ack[0], "oof"); | ||
|
||
assert_ok!(stx.close().await); | ||
} | ||
|
||
#[tokio::test] | ||
pub async fn ack_emit_single_with_ack() { | ||
pub async fn broadcast_with_ack() { | ||
const PORT: u16 = 2100; | ||
use Message::*; | ||
let io = create_server(PORT).await; | ||
let (tx, mut rx) = mpsc::channel::<String>(4); | ||
let (tx, mut rx) = mpsc::channel::<[String; 1]>(4); | ||
|
||
let io2 = io.clone(); | ||
io.ns("/", move |socket: SocketRef| async move { | ||
let res = assert_ok!(socket.emit_with_ack::<String>("test", "test")); | ||
let res = assert_ok!(io2.emit_with_ack::<[String; 1]>("test", "foo")); | ||
let ack = assert_ok!(res.await); | ||
tx.try_send(ack.data).unwrap(); | ||
assert_ok!(tx.try_send(ack.data)); | ||
|
||
let res = assert_ok!(io2 | ||
.timeout(Duration::from_millis(500)) | ||
.emit_with_ack::<[String; 1]>("test", "foo")); | ||
let ack = assert_ok!(res.await); | ||
assert_ok!(tx.try_send(ack.data)); | ||
|
||
let res = assert_ok!(socket | ||
.broadcast() | ||
.timeout(Duration::from_millis(500)) | ||
.emit_with_ack::<[String; 1]>("test", "foo")); | ||
let ack = assert_ok!(res.await); | ||
assert_ok!(tx.try_send(ack.data)); | ||
}); | ||
|
||
let handler = |_, _| Box::pin(async move {}); | ||
let client = socketio_client_with_handler(PORT, "test", handler, ()).await; | ||
let client = assert_ok!(client); | ||
// Spawn 5 clients and make them echo the ack | ||
for _ in 0..5 { | ||
tokio::spawn(async move { | ||
let (mut stx, mut srx) = create_ws_connection(PORT).await.split(); | ||
assert_ok!(srx.next().await.unwrap()); | ||
assert_ok!(srx.next().await.unwrap()); | ||
|
||
while let Some(msg) = srx.next().await { | ||
let msg = match assert_ok!(msg) { | ||
Text(msg) => msg, | ||
_ => panic!("Unexpected message"), | ||
}; | ||
let ack = match assert_ok!(Packet::try_from(msg[1..].to_string())).inner { | ||
PacketData::Event(_, _, Some(ack)) => ack, | ||
_ => panic!("Unexpected packet"), | ||
}; | ||
assert_ok!( | ||
stx.send(Text(format!("43{}[\"oof\"]", ack.to_string()))) | ||
.await | ||
); | ||
} | ||
}); | ||
} | ||
|
||
assert_eq!(rx.recv().await.unwrap(), "test"); | ||
assert_eq!(rx.recv().await.unwrap(), "test"); | ||
assert_ok!(client.disconnect().await); | ||
for _ in 0..(5 * 3) { | ||
let msg = rx.recv().await.unwrap(); | ||
assert_eq!(msg[0], "oof"); | ||
} | ||
} |