From b325b95326ced633c1260420514eb3c8f1a0a571 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Tue, 26 Nov 2024 09:29:50 +0100 Subject: [PATCH] Pass buffer ref to router --- src/components/proxy/io_uring_shared.rs | 9 ++++++++- src/components/proxy/packet_router.rs | 13 ++++++------- src/components/proxy/packet_router/io_uring.rs | 1 + src/components/proxy/packet_router/reference.rs | 10 +++++++++- src/filters/token_router.rs | 2 ++ 5 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index b9b28eb44..3420eaa6b 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -254,6 +254,7 @@ pub enum PacketProcessorCtx { sessions: Arc, error_acc: super::error::ErrorAccumulator, worker_id: usize, + destinations: Vec, }, SessionPool { pool: Arc, @@ -326,6 +327,7 @@ fn process_packet( sessions, worker_id, error_acc, + destinations, } => { let received_at = UtcTimestamp::now(); if let Some(last_received_at) = last_received_at { @@ -340,7 +342,12 @@ fn process_packet( }; crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task( - ds_packet, *worker_id, config, sessions, error_acc, + ds_packet, + *worker_id, + config, + sessions, + error_acc, + destinations, ); packet_processed_event.write(1); diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 4408017a0..0915cc032 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -61,6 +61,7 @@ impl DownstreamReceiveWorkerConfig { config: &Arc, sessions: &Arc, error_acc: &mut super::error::ErrorAccumulator, + destinations: &mut Vec, ) { tracing::trace!( id = worker_id, @@ -70,7 +71,7 @@ impl DownstreamReceiveWorkerConfig { ); let timer = metrics::processing_time(metrics::READ).start_timer(); - match Self::process_downstream_received_packet(packet, config, sessions) { + match Self::process_downstream_received_packet(packet, config, sessions, destinations) { Ok(()) => { error_acc.maybe_send(); } @@ -92,6 +93,7 @@ impl DownstreamReceiveWorkerConfig { packet: DownstreamPacket, config: &Arc, sessions: &Arc, + destinations: &mut Vec, ) -> Result<(), PipelineError> { if !config.clusters.read().has_endpoints() { tracing::trace!("no upstream endpoints"); @@ -103,21 +105,18 @@ impl DownstreamReceiveWorkerConfig { config.clusters.clone_value(), packet.source.into(), packet.contents, + destinations, ); filters.read(&mut context).map_err(PipelineError::Filter)?; - let ReadContext { - destinations, - contents, - .. - } = context; + let ReadContext { contents, .. } = context; // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer // into an immutable one with its own internal arc so it can be cloned // cheaply and returned to the pool once all references are dropped let contents = contents.freeze(); - for epa in destinations { + for epa in destinations.drain(0..) { let session_key = SessionKey { source: packet.source, dest: epa.to_socket_addr()?, diff --git a/src/components/proxy/packet_router/io_uring.rs b/src/components/proxy/packet_router/io_uring.rs index 22244b4fd..2b535f41f 100644 --- a/src/components/proxy/packet_router/io_uring.rs +++ b/src/components/proxy/packet_router/io_uring.rs @@ -45,6 +45,7 @@ impl super::DownstreamReceiveWorkerConfig { sessions, error_acc: super::super::error::ErrorAccumulator::new(error_sender), worker_id, + destinations: Vec::with_capacity(1), }, io_uring_shared::PacketReceiver::Router(upstream_receiver), buffer_pool, diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs index 0269e4b07..30b536096 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/components/proxy/packet_router/reference.rs @@ -108,6 +108,7 @@ impl super::DownstreamReceiveWorkerConfig { let mut error_acc = crate::components::proxy::error::ErrorAccumulator::new(error_sender); + let mut destinations = Vec::with_capacity(1); loop { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP @@ -131,7 +132,14 @@ impl super::DownstreamReceiveWorkerConfig { } last_received_at = Some(received_at); - Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc); + Self::process_task( + packet, + worker_id, + &config, + &sessions, + &mut error_acc, + &mut destinations, + ); } Err(error) => { tracing::error!(%error, "error receiving packet"); diff --git a/src/filters/token_router.rs b/src/filters/token_router.rs index 711fa68f5..8def1b3c4 100644 --- a/src/filters/token_router.rs +++ b/src/filters/token_router.rs @@ -286,6 +286,7 @@ mod tests { ctx.metadata .insert(CAPTURED_BYTES.into(), Value::Bytes(b"123".to_vec().into())); assert_read(&filter, ctx); + dest.clear(); // invalid key let mut ctx = new_ctx(&mut dest); @@ -293,6 +294,7 @@ mod tests { .insert(CAPTURED_BYTES.into(), Value::Bytes(b"567".to_vec().into())); assert!(filter.read(&mut ctx).is_err()); + dest.clear(); // no key let mut ctx = new_ctx(&mut dest);