Skip to content

Commit

Permalink
Pass buffer ref to router
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Nov 26, 2024
1 parent bf0c825 commit b325b95
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
9 changes: 8 additions & 1 deletion src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub enum PacketProcessorCtx {
sessions: Arc<crate::components::proxy::SessionPool>,
error_acc: super::error::ErrorAccumulator,
worker_id: usize,
destinations: Vec<crate::net::EndpointAddress>,
},
SessionPool {
pool: Arc<crate::components::proxy::SessionPool>,
Expand Down Expand Up @@ -326,6 +327,7 @@ fn process_packet(
sessions,
worker_id,
error_acc,
destinations,
} => {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
Expand All @@ -340,7 +342,12 @@ fn process_packet(
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet, *worker_id, config, sessions, error_acc,
ds_packet,
*worker_id,
config,
sessions,
error_acc,
destinations,
);

packet_processed_event.write(1);
Expand Down
13 changes: 6 additions & 7 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl DownstreamReceiveWorkerConfig {
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -70,7 +71,7 @@ impl DownstreamReceiveWorkerConfig {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions) {
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -92,6 +93,7 @@ impl DownstreamReceiveWorkerConfig {
packet: DownstreamPacket,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
destinations: &mut Vec<crate::net::EndpointAddress>,
) -> Result<(), PipelineError> {
if !config.clusters.read().has_endpoints() {
tracing::trace!("no upstream endpoints");
Expand All @@ -103,21 +105,18 @@ impl DownstreamReceiveWorkerConfig {
config.clusters.clone_value(),
packet.source.into(),
packet.contents,
destinations,
);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext {
destinations,
contents,
..
} = context;
let ReadContext { contents, .. } = context;

// Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer
// into an immutable one with its own internal arc so it can be cloned
// cheaply and returned to the pool once all references are dropped
let contents = contents.freeze();

for epa in destinations {
for epa in destinations.drain(0..) {
let session_key = SessionKey {
source: packet.source,
dest: epa.to_socket_addr()?,
Expand Down
1 change: 1 addition & 0 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl super::DownstreamReceiveWorkerConfig {
sessions,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
destinations: Vec::with_capacity(1),
},
io_uring_shared::PacketReceiver::Router(upstream_receiver),
buffer_pool,
Expand Down
10 changes: 9 additions & 1 deletion src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl super::DownstreamReceiveWorkerConfig {

let mut error_acc =
crate::components::proxy::error::ErrorAccumulator::new(error_sender);
let mut destinations = Vec::with_capacity(1);

loop {
// Initialize a buffer for the UDP packet. We use the maximum size of a UDP
Expand All @@ -131,7 +132,14 @@ impl super::DownstreamReceiveWorkerConfig {
}
last_received_at = Some(received_at);

Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc);
Self::process_task(
packet,
worker_id,
&config,
&sessions,
&mut error_acc,
&mut destinations,
);
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand Down
2 changes: 2 additions & 0 deletions src/filters/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,15 @@ mod tests {
ctx.metadata
.insert(CAPTURED_BYTES.into(), Value::Bytes(b"123".to_vec().into()));
assert_read(&filter, ctx);
dest.clear();

// invalid key
let mut ctx = new_ctx(&mut dest);
ctx.metadata
.insert(CAPTURED_BYTES.into(), Value::Bytes(b"567".to_vec().into()));

assert!(filter.read(&mut ctx).is_err());
dest.clear();

// no key
let mut ctx = new_ctx(&mut dest);
Expand Down

0 comments on commit b325b95

Please sign in to comment.