diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000000..9dce589bb2 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,4 @@ +[target.'cfg(all())'] +rustflags = [ + "-Ctarget-feature=+aes,+avx2", +] diff --git a/Cargo.lock b/Cargo.lock index 7619e1ab01..138b75cc67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1280,7 +1280,7 @@ dependencies = [ "hyper", "hyper-util", "log", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", @@ -1707,7 +1707,7 @@ dependencies = [ "kube-core", "openssl", "pem", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pemfile 2.2.0", "secrecy", "serde", @@ -2750,9 +2750,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.17" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", @@ -3444,7 +3444,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pki-types", "tokio", ] @@ -3744,9 +3744,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.3" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna 1.0.3", diff --git a/benches/token_router.rs b/benches/token_router.rs index 7afda6b581..51dd0effd3 100644 --- a/benches/token_router.rs +++ b/benches/token_router.rs @@ -31,22 +31,31 @@ fn token_router(b: Bencher, token_kind: &str) { use rand::seq::SliceRandom as _; let tok = tokens.choose(&mut rand).unwrap(); - let mut rc = quilkin::filters::ReadContext::new( - cm.clone(), - quilkin::net::EndpointAddress::LOCALHOST, - pool.clone().alloc(), - ); - rc.metadata.insert( + let mut metadata = quilkin::net::endpoint::DynamicMetadata::default(); + metadata.insert( quilkin::net::endpoint::metadata::Key::from_static( quilkin::filters::capture::CAPTURED_BYTES, ), quilkin::net::endpoint::metadata::Value::Bytes((*tok).clone().into()), ); - rc + ( + cm.clone(), + pool.clone().alloc(), + Vec::with_capacity(1), + metadata, + ) }) .counter(divan::counter::BytesCount::new(total_token_size)) - .bench_local_values(|mut rc| { + .bench_local_values(|(cm, buffer, mut dest, metadata)| { + let mut rc = quilkin::filters::ReadContext { + endpoints: cm, + destinations: &mut dest, + source: quilkin::net::EndpointAddress::LOCALHOST, + contents: buffer, + metadata, + }; + let _ = divan::black_box(filter.sync_read(&mut rc)); }) } diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index b9b28eb44b..3420eaa6ba 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -254,6 +254,7 @@ pub enum PacketProcessorCtx { sessions: Arc, error_acc: super::error::ErrorAccumulator, worker_id: usize, + destinations: Vec, }, SessionPool { pool: Arc, @@ -326,6 +327,7 @@ fn process_packet( sessions, worker_id, error_acc, + destinations, } => { let received_at = UtcTimestamp::now(); if let Some(last_received_at) = last_received_at { @@ -340,7 +342,12 @@ fn process_packet( }; crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task( - ds_packet, *worker_id, config, sessions, error_acc, + ds_packet, + *worker_id, + config, + sessions, + error_acc, + destinations, ); packet_processed_event.write(1); diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs index 4408017a02..0915cc0321 100644 --- a/src/components/proxy/packet_router.rs +++ b/src/components/proxy/packet_router.rs @@ -61,6 +61,7 @@ impl DownstreamReceiveWorkerConfig { config: &Arc, sessions: &Arc, error_acc: &mut super::error::ErrorAccumulator, + destinations: &mut Vec, ) { tracing::trace!( id = worker_id, @@ -70,7 +71,7 @@ impl DownstreamReceiveWorkerConfig { ); let timer = metrics::processing_time(metrics::READ).start_timer(); - match Self::process_downstream_received_packet(packet, config, sessions) { + match Self::process_downstream_received_packet(packet, config, sessions, destinations) { Ok(()) => { error_acc.maybe_send(); } @@ -92,6 +93,7 @@ impl DownstreamReceiveWorkerConfig { packet: DownstreamPacket, config: &Arc, sessions: &Arc, + destinations: &mut Vec, ) -> Result<(), PipelineError> { if !config.clusters.read().has_endpoints() { tracing::trace!("no upstream endpoints"); @@ -103,21 +105,18 @@ impl DownstreamReceiveWorkerConfig { config.clusters.clone_value(), packet.source.into(), packet.contents, + destinations, ); filters.read(&mut context).map_err(PipelineError::Filter)?; - let ReadContext { - destinations, - contents, - .. - } = context; + let ReadContext { contents, .. } = context; // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer // into an immutable one with its own internal arc so it can be cloned // cheaply and returned to the pool once all references are dropped let contents = contents.freeze(); - for epa in destinations { + for epa in destinations.drain(0..) { let session_key = SessionKey { source: packet.source, dest: epa.to_socket_addr()?, diff --git a/src/components/proxy/packet_router/io_uring.rs b/src/components/proxy/packet_router/io_uring.rs index 22244b4fd6..2b535f41fd 100644 --- a/src/components/proxy/packet_router/io_uring.rs +++ b/src/components/proxy/packet_router/io_uring.rs @@ -45,6 +45,7 @@ impl super::DownstreamReceiveWorkerConfig { sessions, error_acc: super::super::error::ErrorAccumulator::new(error_sender), worker_id, + destinations: Vec::with_capacity(1), }, io_uring_shared::PacketReceiver::Router(upstream_receiver), buffer_pool, diff --git a/src/components/proxy/packet_router/reference.rs b/src/components/proxy/packet_router/reference.rs index 0269e4b072..30b5360964 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/components/proxy/packet_router/reference.rs @@ -108,6 +108,7 @@ impl super::DownstreamReceiveWorkerConfig { let mut error_acc = crate::components::proxy::error::ErrorAccumulator::new(error_sender); + let mut destinations = Vec::with_capacity(1); loop { // Initialize a buffer for the UDP packet. We use the maximum size of a UDP @@ -131,7 +132,14 @@ impl super::DownstreamReceiveWorkerConfig { } last_received_at = Some(received_at); - Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc); + Self::process_task( + packet, + worker_id, + &config, + &sessions, + &mut error_acc, + &mut destinations, + ); } Err(error) => { tracing::error!(%error, "error receiving packet"); diff --git a/src/config/slot.rs b/src/config/slot.rs index cd2c34531a..3615cb458c 100644 --- a/src/config/slot.rs +++ b/src/config/slot.rs @@ -194,7 +194,7 @@ impl JsonSchema for Slot { } impl crate::filters::Filter for Slot { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { self.load().read(ctx) } diff --git a/src/filters.rs b/src/filters.rs index 586010e546..7f191a50cb 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -208,7 +208,7 @@ pub trait Filter: Send + Sync { /// This function should return an `Some` if the packet processing should /// proceed. If the packet should be rejected, it will return [`None`] /// instead. By default, the context passes through unchanged. - fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> { Ok(()) } diff --git a/src/filters/capture.rs b/src/filters/capture.rs index 28adde6353..90feb352d5 100644 --- a/src/filters/capture.rs +++ b/src/filters/capture.rs @@ -58,7 +58,7 @@ impl Capture { impl Filter for Capture { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { let capture = self.capture.capture(&mut ctx.contents); ctx.metadata.insert( self.is_present_key, @@ -160,11 +160,13 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); assert!(filter .read(&mut ReadContext::new( endpoints.into(), (std::net::Ipv4Addr::LOCALHOST, 80).into(), alloc_buffer(b"abc"), + &mut dest, )) .is_err()); } @@ -237,10 +239,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut context = ReadContext::new( endpoints.into(), "127.0.0.1:80".parse().unwrap(), alloc_buffer(b"helloabc"), + &mut dest, ); filter.read(&mut context).unwrap(); diff --git a/src/filters/chain.rs b/src/filters/chain.rs index 8d9ad21b5b..c57dc892b4 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -273,7 +273,7 @@ impl schemars::JsonSchema for FilterChain { } impl Filter for FilterChain { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { for ((id, instance), histogram) in self .filters .iter() @@ -296,12 +296,8 @@ impl Filter for FilterChain { // has rejected, and the destinations is empty, we passthrough to all. // Which mimics the old behaviour while avoid clones in most cases. if ctx.destinations.is_empty() { - ctx.destinations = ctx - .endpoints - .endpoints() - .into_iter() - .map(|ep| ep.address) - .collect(); + ctx.destinations + .extend(ctx.endpoints.endpoints().into_iter().map(|ep| ep.address)); } Ok(()) @@ -382,10 +378,12 @@ mod tests { crate::test::load_test_filters(); let config = TestConfig::new(); let endpoints_fixture = endpoints(); + let mut dest = Vec::new(); let mut context = ReadContext::new( endpoints_fixture.clone(), "127.0.0.1:70".parse().unwrap(), alloc_buffer(b"hello"), + &mut dest, ); config.filters.read(&mut context).unwrap(); @@ -435,22 +433,24 @@ mod tests { .unwrap(); let endpoints_fixture = endpoints(); - let mut context = ReadContext::new( - endpoints_fixture.clone(), - "127.0.0.1:70".parse().unwrap(), - alloc_buffer(b"hello"), - ); - - chain.read(&mut context).unwrap(); + let mut dest = Vec::new(); + + let (contents, metadata) = { + let mut context = ReadContext::new( + endpoints_fixture.clone(), + "127.0.0.1:70".parse().unwrap(), + alloc_buffer(b"hello"), + &mut dest, + ); + chain.read(&mut context).unwrap(); + (context.contents, context.metadata) + }; let expected = endpoints_fixture.clone(); - assert_eq!(expected.endpoints(), context.destinations); - assert_eq!( - b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70", - &*context.contents - ); + assert_eq!(expected.endpoints(), dest); + assert_eq!(b"hello:odr:127.0.0.1:70:odr:127.0.0.1:70", &*contents); assert_eq!( "receive:receive", - context.metadata[&"downstream".into()].as_string().unwrap() + metadata[&"downstream".into()].as_string().unwrap() ); let mut context = WriteContext::new( diff --git a/src/filters/compress.rs b/src/filters/compress.rs index 4e07e88c8f..1b334dcc45 100644 --- a/src/filters/compress.rs +++ b/src/filters/compress.rs @@ -56,7 +56,7 @@ impl Compress { impl Filter for Compress { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { let original_size = ctx.contents.len(); match self.on_read { @@ -296,10 +296,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut read_context = ReadContext::new( endpoints.into(), "127.0.0.1:8080".parse().unwrap(), alloc_buffer(&expected), + &mut dest, ); compress.read(&mut read_context).expect("should compress"); @@ -356,11 +358,13 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); assert!(compression .read(&mut ReadContext::new( endpoints.into(), "127.0.0.1:8080".parse().unwrap(), alloc_buffer(b"hello"), + &mut dest, )) .is_err()); } @@ -379,10 +383,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut read_context = ReadContext::new( endpoints.into(), "127.0.0.1:8080".parse().unwrap(), alloc_buffer(b"hello"), + &mut dest, ); compression.read(&mut read_context).unwrap(); assert_eq!(b"hello", &*read_context.contents); @@ -474,10 +480,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut read_context = ReadContext::new( endpoints.into(), "127.0.0.1:8080".parse().unwrap(), write_context.contents, + &mut dest, ); filter.read(&mut read_context).expect("should decompress"); diff --git a/src/filters/concatenate.rs b/src/filters/concatenate.rs index 9bf1c7b612..c54f7abc26 100644 --- a/src/filters/concatenate.rs +++ b/src/filters/concatenate.rs @@ -43,7 +43,7 @@ impl Concatenate { } impl Filter for Concatenate { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { match self.on_read { Strategy::Append => { ctx.contents.extend_from_slice(&self.bytes); diff --git a/src/filters/debug.rs b/src/filters/debug.rs index 676bab6146..a26f11e6f0 100644 --- a/src/filters/debug.rs +++ b/src/filters/debug.rs @@ -38,7 +38,7 @@ impl Debug { impl Filter for Debug { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { info!(id = ?self.config.id, source = ?&ctx.source, contents = ?String::from_utf8_lossy(&ctx.contents), "Read filter event"); Ok(()) } diff --git a/src/filters/drop.rs b/src/filters/drop.rs index 2b561b3f71..21d46e70cd 100644 --- a/src/filters/drop.rs +++ b/src/filters/drop.rs @@ -32,7 +32,7 @@ impl Drop { impl Filter for Drop { #[cfg_attr(feature = "instrument", tracing::instrument(skip_all))] - fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> { Err(FilterError::Dropped) } diff --git a/src/filters/firewall.rs b/src/filters/firewall.rs index 29a3f83fc7..769ae911cf 100644 --- a/src/filters/firewall.rs +++ b/src/filters/firewall.rs @@ -50,7 +50,7 @@ impl StaticFilter for Firewall { impl Filter for Firewall { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { for rule in &self.on_read { if rule.contains(ctx.source.to_socket_addr()?) { return match rule.action { @@ -134,13 +134,25 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())].into(), ); - let mut ctx = ReadContext::new(endpoints.into(), (local_ip, 80).into(), alloc_buffer([])); + let mut dest = Vec::new(); + let mut ctx = ReadContext::new( + endpoints.into(), + (local_ip, 80).into(), + alloc_buffer([]), + &mut dest, + ); assert!(firewall.read(&mut ctx).is_ok()); let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new((Ipv4Addr::LOCALHOST, 8080).into())].into(), ); - let mut ctx = ReadContext::new(endpoints.into(), (local_ip, 2000).into(), alloc_buffer([])); + let mut dest = Vec::new(); + let mut ctx = ReadContext::new( + endpoints.into(), + (local_ip, 2000).into(), + alloc_buffer([]), + &mut dest, + ); assert!(logs_contain("quilkin::filters::firewall")); // the given name to the the logger by tracing assert!(logs_contain("Allow")); diff --git a/src/filters/load_balancer.rs b/src/filters/load_balancer.rs index d093055515..8a537c13b2 100644 --- a/src/filters/load_balancer.rs +++ b/src/filters/load_balancer.rs @@ -38,7 +38,7 @@ impl LoadBalancer { } impl Filter for LoadBalancer { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { self.endpoint_chooser.choose_endpoints(ctx); Ok(()) } @@ -75,11 +75,14 @@ mod tests { .map(Endpoint::new) .collect::>(); let endpoints = crate::net::cluster::ClusterMap::new_default(endpoints); - let mut context = ReadContext::new(endpoints.into(), source, alloc_buffer([])); - - filter.read(&mut context).unwrap(); + let mut dest = Vec::new(); + { + let mut context = + ReadContext::new(endpoints.into(), source, alloc_buffer([]), &mut dest); + filter.read(&mut context).unwrap(); + } - context.destinations + dest } #[tokio::test] diff --git a/src/filters/load_balancer/endpoint_chooser.rs b/src/filters/load_balancer/endpoint_chooser.rs index 14426dc3f3..d033f99ef2 100644 --- a/src/filters/load_balancer/endpoint_chooser.rs +++ b/src/filters/load_balancer/endpoint_chooser.rs @@ -28,7 +28,7 @@ use crate::filters::ReadContext; /// EndpointChooser chooses from a set of endpoints that a proxy is connected to. pub trait EndpointChooser: Send + Sync { /// choose_endpoints asks for the next endpoint(s) to use. - fn choose_endpoints(&self, endpoints: &mut ReadContext); + fn choose_endpoints(&self, endpoints: &mut ReadContext<'_>); } /// RoundRobinEndpointChooser chooses endpoints in round-robin order. @@ -45,15 +45,16 @@ impl RoundRobinEndpointChooser { } impl EndpointChooser for RoundRobinEndpointChooser { - fn choose_endpoints(&self, ctx: &mut ReadContext) { + fn choose_endpoints(&self, ctx: &mut ReadContext<'_>) { let count = self.next_endpoint.fetch_add(1, Ordering::Relaxed); // Note: The index is guaranteed to be in range. - ctx.destinations = vec![ctx - .endpoints - .nth_endpoint(count % ctx.endpoints.num_of_endpoints()) - .unwrap() - .address - .clone()]; + ctx.destinations.push( + ctx.endpoints + .nth_endpoint(count % ctx.endpoints.num_of_endpoints()) + .unwrap() + .address + .clone(), + ); } } @@ -61,10 +62,11 @@ impl EndpointChooser for RoundRobinEndpointChooser { pub struct RandomEndpointChooser; impl EndpointChooser for RandomEndpointChooser { - fn choose_endpoints(&self, ctx: &mut ReadContext) { + fn choose_endpoints(&self, ctx: &mut ReadContext<'_>) { // The index is guaranteed to be in range. let index = thread_rng().gen_range(0..ctx.endpoints.num_of_endpoints()); - ctx.destinations = vec![ctx.endpoints.nth_endpoint(index).unwrap().address.clone()]; + ctx.destinations + .push(ctx.endpoints.nth_endpoint(index).unwrap().address.clone()); } } @@ -72,14 +74,15 @@ impl EndpointChooser for RandomEndpointChooser { pub struct HashEndpointChooser; impl EndpointChooser for HashEndpointChooser { - fn choose_endpoints(&self, ctx: &mut ReadContext) { + fn choose_endpoints(&self, ctx: &mut ReadContext<'_>) { let mut hasher = DefaultHasher::new(); ctx.source.hash(&mut hasher); - ctx.destinations = vec![ctx - .endpoints - .nth_endpoint(hasher.finish() as usize % ctx.endpoints.num_of_endpoints()) - .unwrap() - .address - .clone()]; + ctx.destinations.push( + ctx.endpoints + .nth_endpoint(hasher.finish() as usize % ctx.endpoints.num_of_endpoints()) + .unwrap() + .address + .clone(), + ); } } diff --git a/src/filters/local_rate_limit.rs b/src/filters/local_rate_limit.rs index 05087c70d8..1c4462b5dd 100644 --- a/src/filters/local_rate_limit.rs +++ b/src/filters/local_rate_limit.rs @@ -148,7 +148,7 @@ impl LocalRateLimit { } impl Filter for LocalRateLimit { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { if self.acquire_token(&ctx.source) { Ok(()) } else { @@ -235,7 +235,13 @@ mod tests { .into(), ); - let mut context = ReadContext::new(endpoints.into(), address.clone(), alloc_buffer([9])); + let mut dest = Vec::new(); + let mut context = ReadContext::new( + endpoints.into(), + address.clone(), + alloc_buffer([9]), + &mut dest, + ); let result = r.read(&mut context); if should_succeed { diff --git a/src/filters/match.rs b/src/filters/match.rs index 74bfd81cfa..b5f7c77da3 100644 --- a/src/filters/match.rs +++ b/src/filters/match.rs @@ -119,7 +119,7 @@ fn match_filter<'config, 'ctx, Ctx>( impl Filter for Match { #[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))] - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { tracing::trace!(metadata=?ctx.metadata); match_filter( &self.on_read_filters, @@ -197,10 +197,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut ctx = ReadContext::new( endpoints.into(), ([127, 0, 0, 1], 7000).into(), alloc_buffer(contents), + &mut dest, ); ctx.metadata.insert(key, "abc".into()); @@ -211,10 +213,12 @@ mod tests { let endpoints = crate::net::cluster::ClusterMap::new_default( [Endpoint::new("127.0.0.1:81".parse().unwrap())].into(), ); + let mut dest = Vec::new(); let mut ctx = ReadContext::new( endpoints.into(), ([127, 0, 0, 1], 7000).into(), alloc_buffer(contents), + &mut dest, ); ctx.metadata.insert(key, "xyz".into()); diff --git a/src/filters/pass.rs b/src/filters/pass.rs index 4b639538fc..b0cf00d118 100644 --- a/src/filters/pass.rs +++ b/src/filters/pass.rs @@ -31,7 +31,7 @@ impl Pass { impl Filter for Pass { #[cfg_attr(feature = "instrument", tracing::instrument(skip_all))] - fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> { Ok(()) } diff --git a/src/filters/read.rs b/src/filters/read.rs index ca412289db..2eab8f6a92 100644 --- a/src/filters/read.rs +++ b/src/filters/read.rs @@ -27,12 +27,11 @@ use crate::{ }; /// The input arguments to [`Filter::read`]. -#[non_exhaustive] -pub struct ReadContext { +pub struct ReadContext<'ctx> { /// The upstream endpoints that the packet will be forwarded to. pub endpoints: Arc, /// The upstream endpoints that the packet will be forwarded to. - pub destinations: Vec, + pub destinations: &'ctx mut Vec, /// The source of the received packet. pub source: EndpointAddress, /// Contents of the received packet. @@ -41,13 +40,18 @@ pub struct ReadContext { pub metadata: DynamicMetadata, } -impl ReadContext { +impl<'ctx> ReadContext<'ctx> { /// Creates a new [`ReadContext`]. #[inline] - pub fn new(endpoints: Arc, source: EndpointAddress, contents: PoolBuffer) -> Self { + pub fn new( + endpoints: Arc, + source: EndpointAddress, + contents: PoolBuffer, + destinations: &'ctx mut Vec, + ) -> Self { Self { endpoints, - destinations: Vec::new(), + destinations, source, contents, metadata: <_>::default(), diff --git a/src/filters/registry.rs b/src/filters/registry.rs index 7628453a2f..ed9fc6c00e 100644 --- a/src/filters/registry.rs +++ b/src/filters/registry.rs @@ -74,7 +74,7 @@ mod tests { struct TestFilter {} impl Filter for TestFilter { - fn read(&self, _: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, _: &mut ReadContext<'_>) -> Result<(), FilterError> { Err(FilterError::Custom("test error")) } @@ -105,11 +105,13 @@ mod tests { let endpoint = Endpoint::new(addr.clone()); let endpoints = crate::net::cluster::ClusterMap::new_default([endpoint.clone()].into()); + let mut dest = Vec::new(); assert!(filter .read(&mut ReadContext::new( endpoints.into(), addr.clone(), alloc_buffer([]), + &mut dest, )) .is_ok()); assert!(filter diff --git a/src/filters/timestamp.rs b/src/filters/timestamp.rs index 62d86db158..ec50696021 100644 --- a/src/filters/timestamp.rs +++ b/src/filters/timestamp.rs @@ -87,7 +87,7 @@ impl TryFrom for Timestamp { } impl Filter for Timestamp { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { self.observe(&ctx.metadata, Direction::Read); Ok(()) } @@ -157,10 +157,12 @@ mod tests { async fn basic() { const TIMESTAMP_KEY: &str = "BASIC"; let filter = Timestamp::from_config(Config::new(TIMESTAMP_KEY).into()); + let mut dest = Vec::new(); let mut ctx = ReadContext::new( <_>::default(), (std::net::Ipv4Addr::UNSPECIFIED, 0).into(), alloc_buffer(b"hello"), + &mut dest, ); ctx.metadata.insert( TIMESTAMP_KEY.into(), @@ -188,10 +190,12 @@ mod tests { ); let timestamp = Timestamp::from_config(Config::new(TIMESTAMP_KEY).into()); let source = (std::net::Ipv4Addr::UNSPECIFIED, 0); + let mut dest = Vec::new(); let mut ctx = ReadContext::new( <_>::default(), source.into(), alloc_buffer([0, 0, 0, 0, 99, 81, 55, 181]), + &mut dest, ); capture.read(&mut ctx).unwrap(); diff --git a/src/filters/token_router.rs b/src/filters/token_router.rs index cc7ffae47f..8efd2e1a98 100644 --- a/src/filters/token_router.rs +++ b/src/filters/token_router.rs @@ -33,12 +33,12 @@ pub struct TokenRouter { impl TokenRouter { /// Non-async version of [`Filter::read`], as this filter does no actual async /// operations. Used in benchmarking. - pub fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + pub fn sync_read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { match ctx.metadata.get(&self.config.metadata_key) { Some(metadata::Value::Bytes(token)) => { let tok = crate::net::cluster::Token::new(token); - ctx.destinations = ctx.endpoints.addresses_for_token(tok); + ctx.endpoints.addresses_for_token(tok, ctx.destinations); if ctx.destinations.is_empty() { Err(FilterError::TokenRouter(RouterError::NoEndpointMatch { @@ -69,7 +69,7 @@ impl StaticFilter for TokenRouter { } impl Filter for TokenRouter { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { self.sync_read(ctx) } } @@ -89,7 +89,7 @@ impl StaticFilter for HashedTokenRouter { } impl Filter for HashedTokenRouter { - fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { self.0.sync_read(ctx) } } @@ -256,7 +256,8 @@ mod tests { } .into(), ); - let mut ctx = new_ctx(); + let mut dest = Vec::new(); + let mut ctx = new_ctx(&mut dest); ctx.metadata .insert(TOKEN_KEY.into(), Value::Bytes(b"123".to_vec().into())); assert_read(&filter, ctx); @@ -265,7 +266,8 @@ mod tests { #[tokio::test] async fn factory_empty_config() { let filter = TokenRouter::from_config(None); - let mut ctx = new_ctx(); + let mut dest = Vec::new(); + let mut ctx = new_ctx(&mut dest); ctx.metadata .insert(CAPTURED_BYTES.into(), Value::Bytes(b"123".to_vec().into())); assert_read(&filter, ctx); @@ -278,21 +280,24 @@ mod tests { metadata_key: CAPTURED_BYTES.into(), }; let filter = TokenRouter::from_config(config.into()); + let mut dest = Vec::new(); - let mut ctx = new_ctx(); + let mut ctx = new_ctx(&mut dest); ctx.metadata .insert(CAPTURED_BYTES.into(), Value::Bytes(b"123".to_vec().into())); assert_read(&filter, ctx); + dest.clear(); // invalid key - let mut ctx = new_ctx(); + let mut ctx = new_ctx(&mut dest); ctx.metadata .insert(CAPTURED_BYTES.into(), Value::Bytes(b"567".to_vec().into())); assert!(filter.read(&mut ctx).is_err()); + dest.clear(); // no key - let mut ctx = new_ctx(); + let mut ctx = new_ctx(&mut dest); assert!(filter.read(&mut ctx).is_err()); } @@ -305,7 +310,7 @@ mod tests { assert_write_no_change(&filter); } - fn new_ctx() -> ReadContext { + fn new_ctx(dest: &mut Vec) -> ReadContext<'_> { let endpoint1 = Endpoint::with_metadata( "127.0.0.1:80".parse().unwrap(), Metadata { @@ -327,10 +332,11 @@ mod tests { endpoints.into(), "127.0.0.1:100".parse().unwrap(), pool.alloc_slice(b"hello"), + dest, ) } - fn assert_read(filter: &F, mut ctx: ReadContext) + fn assert_read(filter: &F, mut ctx: ReadContext<'_>) where F: Filter + ?Sized, { diff --git a/src/net/cluster.rs b/src/net/cluster.rs index 3bf9f6750b..92fdc86023 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -505,10 +505,10 @@ where ret } - pub fn addresses_for_token(&self, token: Token) -> Vec { - self.token_map - .get(&token.0) - .map_or(Vec::new(), |addrs| addrs.value().to_vec()) + pub fn addresses_for_token(&self, token: Token, addrs: &mut Vec) { + if let Some(ma) = self.token_map.get(&token.0) { + addrs.extend(ma.value().iter().cloned()); + } } } diff --git a/src/test.rs b/src/test.rs index 171d31e1ed..2195a73def 100644 --- a/src/test.rs +++ b/src/test.rs @@ -370,7 +370,9 @@ where .unwrap()])); let source = "127.0.0.1:90".parse().unwrap(); let contents = b"hello"; - let mut context = ReadContext::new(endpoints.clone(), source, alloc_buffer(contents)); + let mut dest = Vec::new(); + let mut context = + ReadContext::new(endpoints.clone(), source, alloc_buffer(contents), &mut dest); filter.read(&mut context).unwrap(); assert!(context.destinations.is_empty());