Skip to content

Commit

Permalink
update io-uring loop
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 7, 2024
1 parent ec0668f commit 74fe80b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 84 deletions.
144 changes: 68 additions & 76 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,32 @@ pub enum PacketProcessorCtx {
Router {
config: Arc<crate::config::Config>,
sessions: Arc<crate::components::proxy::SessionPool>,
error_sender: super::error::ErrorSender,
/// Receiver for upstream packets being sent to this downstream
upstream_receiver: crate::components::proxy::sessions::DownstreamReceiver,
error_acc: super::error::ErrorAccumulator,
worker_id: usize,
},
SessionPool {
pool: Arc<crate::components::proxy::SessionPool>,
downstream_receiver: tokio::sync::mpsc::Receiver<proxy::SendPacket>,
port: u16,
},
}

pub enum PacketReceiver {
Router(crate::components::proxy::sessions::DownstreamReceiver),
SessionPool(tokio::sync::mpsc::Receiver<proxy::SendPacket>),
}

/// Spawns worker tasks
///
/// One task processes received packets, notifying the io-uring loop when a
/// packet finishes processing, the other receives packets to send and notifies
/// the io-uring loop when there are 1 or more packets available to be sent
fn spawn_workers(
rt: &tokio::runtime::Runtime,
ctx: PacketProcessorCtx,
receiver: PacketReceiver,
pending_sends: PendingSends,
packet_processed_event: EventFdWriter,
mut shutdown_rx: crate::ShutdownRx,
shutdown_event: EventFdWriter,
) -> tokio::sync::mpsc::Sender<RecvPacket> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<RecvPacket>(1);

) {
// Spawn a task that just monitors the shutdown receiver to notify the io-uring loop to exit
rt.spawn(async move {
// The result is uninteresting, either a shutdown has been signalled, or all senders have been dropped
Expand All @@ -287,44 +286,8 @@ fn spawn_workers(
shutdown_event.write(1);
});

match ctx {
PacketProcessorCtx::Router {
config,
sessions,
error_sender,
worker_id,
upstream_receiver,
} => {
rt.spawn(async move {
let mut last_received_at = None;

let mut error_acc = super::error::ErrorAccumulator::new(error_sender);

while let Some(packet) = rx.recv().await {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((received_at - last_received_at).nanos());
}
last_received_at = Some(received_at);

let ds_packet = proxy::packet_router::DownstreamPacket {
contents: packet.buffer,
source: packet.source,
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet,
worker_id,
&config,
&sessions,
&mut error_acc,
);

packet_processed_event.write(1);
}
});

match receiver {
PacketReceiver::Router(upstream_receiver) => {
rt.spawn(async move {
while let Ok(packet) = upstream_receiver.recv().await {
let packet = SendPacket {
Expand All @@ -336,27 +299,7 @@ fn spawn_workers(
}
});
}
PacketProcessorCtx::SessionPool {
pool,
port,
mut downstream_receiver,
} => {
rt.spawn(async move {
let mut last_received_at = None;

while let Some(packet) = rx.recv().await {
pool.process_received_upstream_packet(
packet.buffer,
packet.source,
port,
&mut last_received_at,
)
.await;

packet_processed_event.write(1);
}
});

PacketReceiver::SessionPool(mut downstream_receiver) => {
rt.spawn(async move {
while let Some(packet) = downstream_receiver.recv().await {
let packet = SendPacket {
Expand All @@ -369,8 +312,52 @@ fn spawn_workers(
});
}
}
}

tx
fn process_packet(
ctx: &mut PacketProcessorCtx,
packet_processed_event: &EventFdWriter,
packet: RecvPacket,
last_received_at: &mut Option<UtcTimestamp>,
) {
match ctx {
PacketProcessorCtx::Router {
config,
sessions,
worker_id,
error_acc,
} => {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((received_at - *last_received_at).nanos());
}
*last_received_at = Some(received_at);

let ds_packet = proxy::packet_router::DownstreamPacket {
contents: packet.buffer,
source: packet.source,
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet, *worker_id, config, sessions, error_acc,
);

packet_processed_event.write(1);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
let mut last_received_at = None;

pool.process_received_upstream_packet(
packet.buffer,
packet.source,
*port,
&mut last_received_at,
);

packet_processed_event.write(1);
}
}
}

#[inline]
Expand Down Expand Up @@ -542,7 +529,8 @@ impl IoUringLoop {
pub fn spawn(
self,
thread_name: String,
ctx: PacketProcessorCtx,
mut ctx: PacketProcessorCtx,
receiver: PacketReceiver,
buffer_pool: Arc<crate::pool::BufferPool>,
shutdown: crate::ShutdownRx,
) -> Result<std::sync::mpsc::Receiver<()>, PipelineError> {
Expand Down Expand Up @@ -588,11 +576,10 @@ impl IoUringLoop {

// Spawn the worker tasks that process in an async context unlike
// our io-uring loop below
let process_packet_tx = spawn_workers(
spawn_workers(
&rt,
ctx,
receiver,
pending_sends.clone(),
process_event.writer(),
shutdown,
shutdown_event.writer(),
);
Expand All @@ -618,6 +605,8 @@ impl IoUringLoop {

// Notify that we have set everything up
let _ = tx.send(());
let mut last_received_at = None;
let process_event_writer = process_event.writer();

// The core io uring loop
'io: loop {
Expand Down Expand Up @@ -659,9 +648,12 @@ impl IoUringLoop {
}

let packet = packet.finalize_recv(ret as usize);
if process_packet_tx.blocking_send(packet).is_err() {
unreachable!("packet process thread has a pending packet");
}
process_packet(
&mut ctx,
&process_event_writer,
packet,
&mut last_received_at,
);

// Queue the wait for the processing of the packet to finish
loop_ctx.push_with_token(
Expand Down
4 changes: 2 additions & 2 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ impl super::DownstreamReceiveWorkerConfig {
io_uring_shared::PacketProcessorCtx::Router {
config,
sessions,
error_sender,
upstream_receiver,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
},
io_uring_shared::PacketReceiver::Router(upstream_receiver),
buffer_pool,
shutdown,
)
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl SessionPool {
self.create_session_from_existing_socket(key, downstream_sender, port)
}

pub(crate) async fn process_received_upstream_packet(
pub(crate) fn process_received_upstream_packet(
self: &Arc<Self>,
packet: PoolBuffer,
mut recv_addr: SocketAddr,
Expand Down
7 changes: 2 additions & 5 deletions src/components/proxy/sessions/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,8 @@ impl super::SessionPool {

io_loop.spawn(
format!("session-{id}"),
io_uring_shared::PacketProcessorCtx::SessionPool {
pool,
downstream_receiver,
port,
},
io_uring_shared::PacketProcessorCtx::SessionPool { pool, port },
io_uring_shared::PacketReceiver::SessionPool(downstream_receiver),
buffer_pool,
shutdown,
)
Expand Down

0 comments on commit 74fe80b

Please sign in to comment.