Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test: Concurrent clients for TokenRouter #1010

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 99 additions & 30 deletions tests/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,109 @@

use std::net::{Ipv6Addr, SocketAddr};

use tokio::time::{timeout, Duration};

use quilkin::{
config::Filter,
filters::{Capture, StaticFilter, TokenRouter},
net::endpoint::{metadata::MetadataView, Endpoint},
test::{AddressType, TestHelper},
};
use tokio::time::{timeout, Duration};

/// This test covers both token_router and capture filters,
/// since they work in concert together.
#[tokio::test]
async fn token_router() {
let mut t = TestHelper::default();

let local_addr = echo_server(&mut t).await;

// valid packet
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;

let msg = b"helloabc";
tracing::trace!(%local_addr, "sending echo packet");
socket.send_to(msg, &local_addr).await.unwrap();

tracing::trace!("awaiting echo packet");
assert_eq!(
"hello",
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
);

// send an invalid packet
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
}

// This test covers the scenario in https://github.com/googleforgames/quilkin/issues/988
// to make sure there are no issues with overlapping streams between clients.
#[tokio::test]
async fn multiple_clients() {
let limit = 10_000;
let mut t = TestHelper::default();
let local_addr = echo_server(&mut t).await;

let (mut a_rx, a_socket) = t.open_socket_and_recv_multiple_packets().await;
let (mut b_rx, b_socket) = t.open_socket_and_recv_multiple_packets().await;

tokio::spawn(async move {
// some room to breath
tokio::time::sleep(Duration::from_millis(50)).await;
for _ in 0..limit {
a_socket.send_to(b"Aabc", &local_addr).await.unwrap();
tokio::time::sleep(Duration::from_nanos(5)).await;
}
});
tokio::spawn(async move {
// some room to breath
tokio::time::sleep(Duration::from_millis(50)).await;
for _ in 0..limit {
b_socket.send_to(b"Babc", &local_addr).await.unwrap();
tokio::time::sleep(Duration::from_nanos(5)).await;
}
});

let mut success = 0;
let mut failed = 0;
for _ in 0..limit {
match timeout(Duration::from_millis(60), a_rx.recv()).await {
Ok(packet) => {
assert_eq!("A", packet.unwrap());
success += 1;
}
Err(_) => {
failed += 1;
}
}
match timeout(Duration::from_millis(60), b_rx.recv()).await {
Ok(packet) => {
assert_eq!("B", packet.unwrap());
success += 1;
}
Err(_) => {
failed += 1;
}
}
}

// allow for some dropped packets, since UDP.
let threshold = 0.95 * (2 * limit) as f64;
assert!(
success as f64 > threshold,
"Success: {}, Failed: {}",
success,
failed
);
}

// start an echo server and return what port it's on.
async fn echo_server(t: &mut TestHelper) -> SocketAddr {
let mut echo = t.run_echo_server(AddressType::Ipv6).await;
quilkin::test::map_to_localhost(&mut echo).await;

Expand All @@ -47,10 +136,13 @@ quilkin.dev:
let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent());
server_config.clusters.modify(|clusters| {
clusters.insert_default(
[Endpoint::with_metadata(
echo.clone(),
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
)]
[
Endpoint::with_metadata(
echo.clone(),
serde_yaml::from_str::<MetadataView<_>>(endpoint_metadata).unwrap(),
),
"127.0.0.2:5000".parse().unwrap(), // goes nowhere, so shouldn't do anything.
]
.into(),
)
});
Expand All @@ -73,28 +165,5 @@ quilkin.dev:
);

let server_port = t.run_server(server_config, None, None).await;

// valid packet
let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await;

let local_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, server_port));
let msg = b"helloabc";
tracing::trace!(%local_addr, "sending echo packet");
socket.send_to(msg, &local_addr).await.unwrap();

tracing::trace!("awaiting echo packet");
assert_eq!(
"hello",
timeout(Duration::from_millis(500), recv_chan.recv())
.await
.expect("should have received a packet")
.unwrap()
);

// send an invalid packet
let msg = b"helloxyz";
socket.send_to(msg, &local_addr).await.unwrap();

let result = timeout(Duration::from_millis(500), recv_chan.recv()).await;
assert!(result.is_err(), "should not have received a packet");
SocketAddr::from((Ipv6Addr::LOCALHOST, server_port))
}
Loading