Skip to content

Commit

Permalink
actually fix it
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 15, 2023
1 parent e38356e commit 97e2f04
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 103 deletions.
17 changes: 16 additions & 1 deletion benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ static THROUGHPUT_SERVER_INIT: Lazy<()> = Lazy::new(|| {
static FEEDBACK_LOOP: Lazy<()> = Lazy::new(|| {
std::thread::spawn(|| {
let socket = UdpSocket::bind(FEEDBACK_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

loop {
let mut packet = [0; MESSAGE_SIZE];
Expand All @@ -74,6 +77,9 @@ fn throughput_benchmark(c: &mut Criterion) {
// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
let socket = UdpSocket::bind(BENCH_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];

let mut group = c.benchmark_group("throughput");
Expand Down Expand Up @@ -125,6 +131,9 @@ fn write_feedback(addr: SocketAddr) -> mpsc::Sender<Vec<u8>> {
let (write_tx, write_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(addr).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
let (_, source) = socket.recv_from(&mut packet).unwrap();
while let Ok(packet) = write_rx.recv() {
Expand All @@ -142,6 +151,9 @@ fn readwrite_benchmark(c: &mut Criterion) {
let (read_tx, read_rx) = mpsc::channel::<Vec<u8>>();
std::thread::spawn(move || {
let socket = UdpSocket::bind(READ_LOOP_ADDR).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();
let mut packet = [0; MESSAGE_SIZE];
loop {
let (length, _) = socket.recv_from(&mut packet).unwrap();
Expand All @@ -164,9 +176,12 @@ fn readwrite_benchmark(c: &mut Criterion) {
Lazy::force(&WRITE_SERVER_INIT);

// Sleep to give the servers some time to warm-up.
std::thread::sleep(std::time::Duration::from_millis(500));
std::thread::sleep(std::time::Duration::from_millis(150));

let socket = UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
socket
.set_read_timeout(Some(std::time::Duration::from_millis(500)))
.unwrap();

// prime the direct write connection
socket.send_to(PACKETS[0], direct_write_addr).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {

assert_eq!(
"hello",
timeout(Duration::from_secs(5), rx.recv())
timeout(Duration::from_millis(500), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -449,7 +449,7 @@ mod tests {
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_secs(3), rx.recv()).await;
let result = timeout(Duration::from_millis(500), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
58 changes: 37 additions & 21 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ impl Proxy {
tracing::info!(port = self.port, proxy_id = &*id, "Starting");

let runtime_config = mode.unwrap_proxy();
let sessions = SessionPool::new(
config.clone(),
DualStackLocalSocket::new(self.port)?,
shutdown_rx.clone(),
);
let shared_socket = Arc::new(DualStackLocalSocket::new(self.port)?);
let sessions = SessionPool::new(config.clone(), shared_socket.clone(), shutdown_rx.clone());

let _xds_stream = if !self.management_server.is_empty() {
{
Expand Down Expand Up @@ -153,7 +150,7 @@ impl Proxy {
None
};

self.run_recv_from(&config, sessions.clone())?;
self.run_recv_from(&config, &sessions, shared_socket)?;
crate::protocol::spawn(self.qcmp_port).await?;
tracing::info!("Quilkin is ready");

Expand All @@ -177,18 +174,29 @@ impl Proxy {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
fn run_recv_from(&self, config: &Arc<Config>, sessions: Arc<SessionPool>) -> Result<()> {
fn run_recv_from(
&self,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
shared_socket: Arc<DualStackLocalSocket>,
) -> Result<()> {
// The number of worker tasks to spawn. Each task gets a dedicated queue to
// consume packets off.
let num_workers = num_cpus::get();

// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
for worker_id in 0..num_workers {
let socket = Arc::new(DualStackLocalSocket::new(self.port)?);
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id: 0,
socket: shared_socket,
config: config.clone(),
sessions: sessions.clone(),
});

for worker_id in 1..num_workers {
workers.push(crate::proxy::DownstreamReceiveWorkerConfig {
worker_id,
socket: socket.clone(),
socket: Arc::new(DualStackLocalSocket::new(self.port)?),
config: config.clone(),
sessions: sessions.clone(),
})
Expand Down Expand Up @@ -242,6 +250,7 @@ mod tests {

t.run_server(config, proxy, None);

tracing::trace!(%local_addr, "sending hello");
let msg = "hello";
endpoint1
.socket
Expand All @@ -250,14 +259,14 @@ mod tests {
.unwrap();
assert_eq!(
msg,
timeout(Duration::from_secs(1), endpoint1.packet_rx)
timeout(Duration::from_secs(100), endpoint1.packet_rx)
.await
.expect("should get a packet")
.unwrap()
);
assert_eq!(
msg,
timeout(Duration::from_secs(1), endpoint2.packet_rx)
timeout(Duration::from_secs(100), endpoint2.packet_rx)
.await
.expect("should get a packet")
.unwrap()
Expand Down Expand Up @@ -373,12 +382,14 @@ mod tests {
config: config.clone(),
sessions: SessionPool::new(
config,
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
Arc::new(
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
),
tokio::sync::watch::channel(()).1,
),
}
Expand Down Expand Up @@ -418,18 +429,23 @@ mod tests {
)
});

let sessions = SessionPool::new(
config.clone(),
let shared_socket = Arc::new(
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
);
let sessions = SessionPool::new(
config.clone(),
shared_socket.clone(),
tokio::sync::watch::channel(()).1,
);

proxy.run_recv_from(&config, sessions).unwrap();
proxy
.run_recv_from(&config, &sessions, shared_socket)
.unwrap();

let socket = create_socket().await;
socket.send_to(msg.as_bytes(), &local_addr).await.unwrap();
Expand Down
Loading

0 comments on commit 97e2f04

Please sign in to comment.