Skip to content

Commit

Permalink
perf: make filters synchronous (#1023)
Browse files Browse the repository at this point in the history
* perf: make filters synchronous

* Make SessionPool sync

* update io-uring loop

* review feedback
  • Loading branch information
XAMPPRocky authored Oct 8, 2024
1 parent a3fcf72 commit 1d6b455
Show file tree
Hide file tree
Showing 33 changed files with 279 additions and 358 deletions.
4 changes: 2 additions & 2 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ trace_test!(uring_receiver, {
// Drop the socket, otherwise it can
drop(ws);

let _ = sb.timeout(500, ready).await;
let _ = ready.recv().unwrap();

let msg = "hello-downstream";
tracing::debug!("sending packet");
Expand Down Expand Up @@ -185,7 +185,7 @@ trace_test!(
.unwrap();

for wn in workers {
let _ = sb.timeout(200, wn).await;
let _ = wn.recv().unwrap();
}

let socket = std::sync::Arc::new(sb.client());
Expand Down
5 changes: 2 additions & 3 deletions docs/src/services/proxy/filters/writing_custom_filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ sent to a downstream client.
use quilkin::filters::prelude::*;
/// Appends data to each packet
#[async_trait::async_trait]
impl Filter for Greet {
async fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
ctx.contents.extend_from_slice(b"Hello");
Ok(())
}
async fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
ctx.contents.extend_from_slice(b"Goodbye");
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions examples/quilkin-filter-example/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ struct Greet {
config: Config,
}

#[async_trait::async_trait]
impl Filter for Greet {
async fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
ctx.contents
.prepend_from_slice(format!("{} ", self.config.greeting).as_bytes());
Ok(())
}
async fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
ctx.contents
.prepend_from_slice(format!("{} ", self.config.greeting).as_bytes());
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl Proxy {
)?;

for notification in worker_notifications {
let _ = notification.await;
let _ = notification.recv();
}

tracing::info!("Quilkin is ready");
Expand Down
160 changes: 71 additions & 89 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,32 @@ pub enum PacketProcessorCtx {
Router {
config: Arc<crate::config::Config>,
sessions: Arc<crate::components::proxy::SessionPool>,
error_sender: super::error::ErrorSender,
/// Receiver for upstream packets being sent to this downstream
upstream_receiver: crate::components::proxy::sessions::DownstreamReceiver,
error_acc: super::error::ErrorAccumulator,
worker_id: usize,
},
SessionPool {
pool: Arc<crate::components::proxy::SessionPool>,
downstream_receiver: tokio::sync::mpsc::Receiver<proxy::SendPacket>,
port: u16,
},
}

pub enum PacketReceiver {
Router(crate::components::proxy::sessions::DownstreamReceiver),
SessionPool(tokio::sync::mpsc::Receiver<proxy::SendPacket>),
}

/// Spawns worker tasks
///
/// One task processes received packets, notifying the io-uring loop when a
/// packet finishes processing, the other receives packets to send and notifies
/// the io-uring loop when there are 1 or more packets available to be sent
fn spawn_workers(
rt: &tokio::runtime::Runtime,
ctx: PacketProcessorCtx,
receiver: PacketReceiver,
pending_sends: PendingSends,
packet_processed_event: EventFdWriter,
mut shutdown_rx: crate::ShutdownRx,
shutdown_event: EventFdWriter,
) -> tokio::sync::mpsc::Sender<RecvPacket> {
let (tx, mut rx) = tokio::sync::mpsc::channel::<RecvPacket>(1);

) {
// Spawn a task that just monitors the shutdown receiver to notify the io-uring loop to exit
rt.spawn(async move {
// The result is uninteresting, either a shutdown has been signalled, or all senders have been dropped
Expand All @@ -287,45 +286,8 @@ fn spawn_workers(
shutdown_event.write(1);
});

match ctx {
PacketProcessorCtx::Router {
config,
sessions,
error_sender,
worker_id,
upstream_receiver,
} => {
rt.spawn(async move {
let mut last_received_at = None;

let mut error_acc = super::error::ErrorAccumulator::new(error_sender);

while let Some(packet) = rx.recv().await {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((received_at - last_received_at).nanos());
}
last_received_at = Some(received_at);

let ds_packet = proxy::packet_router::DownstreamPacket {
contents: packet.buffer,
source: packet.source,
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet,
worker_id,
&config,
&sessions,
&mut error_acc,
)
.await;

packet_processed_event.write(1);
}
});

match receiver {
PacketReceiver::Router(upstream_receiver) => {
rt.spawn(async move {
while let Ok(packet) = upstream_receiver.recv().await {
let packet = SendPacket {
Expand All @@ -337,27 +299,7 @@ fn spawn_workers(
}
});
}
PacketProcessorCtx::SessionPool {
pool,
port,
mut downstream_receiver,
} => {
rt.spawn(async move {
let mut last_received_at = None;

while let Some(packet) = rx.recv().await {
pool.process_received_upstream_packet(
packet.buffer,
packet.source,
port,
&mut last_received_at,
)
.await;

packet_processed_event.write(1);
}
});

PacketReceiver::SessionPool(mut downstream_receiver) => {
rt.spawn(async move {
while let Some(packet) = downstream_receiver.recv().await {
let packet = SendPacket {
Expand All @@ -370,8 +312,52 @@ fn spawn_workers(
});
}
}
}

fn process_packet(
ctx: &mut PacketProcessorCtx,
packet_processed_event: &EventFdWriter,
packet: RecvPacket,
last_received_at: &mut Option<UtcTimestamp>,
) {
match ctx {
PacketProcessorCtx::Router {
config,
sessions,
worker_id,
error_acc,
} => {
let received_at = UtcTimestamp::now();
if let Some(last_received_at) = last_received_at {
metrics::packet_jitter(metrics::READ, &metrics::EMPTY)
.set((received_at - *last_received_at).nanos());
}
*last_received_at = Some(received_at);

let ds_packet = proxy::packet_router::DownstreamPacket {
contents: packet.buffer,
source: packet.source,
};

tx
crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet, *worker_id, config, sessions, error_acc,
);

packet_processed_event.write(1);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
let mut last_received_at = None;

pool.process_received_upstream_packet(
packet.buffer,
packet.source,
*port,
&mut last_received_at,
);

packet_processed_event.write(1);
}
}
}

#[inline]
Expand All @@ -384,8 +370,6 @@ enum Token {
Recv { key: usize },
/// Packet sent
Send { key: usize },
/// Recv packet processed
RecvPacketProcessed,
/// One or more packets are ready to be sent
PendingsSends,
/// Loop shutdown requested
Expand Down Expand Up @@ -543,12 +527,13 @@ impl IoUringLoop {
pub fn spawn(
self,
thread_name: String,
ctx: PacketProcessorCtx,
mut ctx: PacketProcessorCtx,
receiver: PacketReceiver,
buffer_pool: Arc<crate::pool::BufferPool>,
shutdown: crate::ShutdownRx,
) -> Result<tokio::sync::oneshot::Receiver<()>, PipelineError> {
) -> Result<std::sync::mpsc::Receiver<()>, PipelineError> {
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = std::sync::mpsc::channel();

let rt = self.runtime;
let socket = self.socket;
Expand All @@ -562,7 +547,7 @@ impl IoUringLoop {
// Used to notify the uring when a received packet has finished
// processing and we can perform another recv, as we (currently) only
// ever process a single packet at a time
let mut process_event = EventFd::new()?;
let process_event = EventFd::new()?;
// Used to notify the uring loop to shutdown
let mut shutdown_event = EventFd::new()?;

Expand All @@ -589,11 +574,10 @@ impl IoUringLoop {

// Spawn the worker tasks that process in an async context unlike
// our io-uring loop below
let process_packet_tx = spawn_workers(
spawn_workers(
&rt,
ctx,
receiver,
pending_sends.clone(),
process_event.writer(),
shutdown,
shutdown_event.writer(),
);
Expand All @@ -619,6 +603,8 @@ impl IoUringLoop {

// Notify that we have set everything up
let _ = tx.send(());
let mut last_received_at = None;
let process_event_writer = process_event.writer();

// The core io uring loop
'io: loop {
Expand Down Expand Up @@ -660,17 +646,13 @@ impl IoUringLoop {
}

let packet = packet.finalize_recv(ret as usize);
if process_packet_tx.blocking_send(packet).is_err() {
unreachable!("packet process thread has a pending packet");
}

// Queue the wait for the processing of the packet to finish
loop_ctx.push_with_token(
process_event.io_uring_entry(),
Token::RecvPacketProcessed,
process_packet(
&mut ctx,
&process_event_writer,
packet,
&mut last_received_at,
);
}
Token::RecvPacketProcessed => {

loop_ctx.enqueue_recv(buffer_pool.clone().alloc());
}
Token::PendingsSends => {
Expand Down
17 changes: 7 additions & 10 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct DownstreamReceiveWorkerConfig {

impl DownstreamReceiveWorkerConfig {
#[inline]
pub(crate) async fn process_task(
pub(crate) fn process_task(
packet: DownstreamPacket,
worker_id: usize,
config: &Arc<Config>,
Expand All @@ -70,7 +70,7 @@ impl DownstreamReceiveWorkerConfig {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions).await {
match Self::process_downstream_received_packet(packet, config, sessions) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -88,7 +88,7 @@ impl DownstreamReceiveWorkerConfig {

/// Processes a packet by running it through the filter chain.
#[inline]
async fn process_downstream_received_packet(
fn process_downstream_received_packet(
packet: DownstreamPacket,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
Expand All @@ -104,10 +104,7 @@ impl DownstreamReceiveWorkerConfig {
packet.source.into(),
packet.contents,
);
filters
.read(&mut context)
.await
.map_err(PipelineError::Filter)?;
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext {
destinations,
Expand All @@ -123,10 +120,10 @@ impl DownstreamReceiveWorkerConfig {
for epa in destinations {
let session_key = SessionKey {
source: packet.source,
dest: epa.to_socket_addr().await?,
dest: epa.to_socket_addr()?,
};

sessions.send(session_key, contents.clone()).await?;
sessions.send(session_key, contents.clone())?;
}

Ok(())
Expand All @@ -146,7 +143,7 @@ pub async fn spawn_receivers(
upstream_receiver: DownstreamReceiver,
buffer_pool: Arc<crate::pool::BufferPool>,
shutdown: crate::ShutdownRx,
) -> crate::Result<Vec<tokio::sync::oneshot::Receiver<()>>> {
) -> crate::Result<Vec<std::sync::mpsc::Receiver<()>>> {
let (error_sender, mut error_receiver) = mpsc::channel(128);

let port = crate::net::socket_port(&socket);
Expand Down
6 changes: 3 additions & 3 deletions src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(
self,
shutdown: crate::ShutdownRx,
) -> eyre::Result<tokio::sync::oneshot::Receiver<()>> {
) -> eyre::Result<std::sync::mpsc::Receiver<()>> {
use crate::components::proxy::io_uring_shared;

let Self {
Expand All @@ -43,10 +43,10 @@ impl super::DownstreamReceiveWorkerConfig {
io_uring_shared::PacketProcessorCtx::Router {
config,
sessions,
error_sender,
upstream_receiver,
error_acc: super::super::error::ErrorAccumulator::new(error_sender),
worker_id,
},
io_uring_shared::PacketReceiver::Router(upstream_receiver),
buffer_pool,
shutdown,
)
Expand Down
Loading

0 comments on commit 1d6b455

Please sign in to comment.