Skip to content

Commit

Permalink
Make SessionPool sync
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Oct 7, 2024
1 parent 42361a5 commit ec0668f
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 62 deletions.
4 changes: 2 additions & 2 deletions crates/test/tests/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ trace_test!(uring_receiver, {
// Drop the socket, otherwise it can
drop(ws);

let _ = sb.timeout(500, ready).await;
let _ = ready.recv().unwrap();

let msg = "hello-downstream";
tracing::debug!("sending packet");
Expand Down Expand Up @@ -185,7 +185,7 @@ trace_test!(
.unwrap();

for wn in workers {
let _ = sb.timeout(200, wn).await;
let _ = wn.recv().unwrap();
}

let socket = std::sync::Arc::new(sb.client());
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ impl Proxy {
)?;

for notification in worker_notifications {
let _ = notification.await;
let _ = notification.recv();
}

tracing::info!("Quilkin is ready");
Expand Down
7 changes: 3 additions & 4 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ fn spawn_workers(
&config,
&sessions,
&mut error_acc,
)
.await;
);

packet_processed_event.write(1);
}
Expand Down Expand Up @@ -546,9 +545,9 @@ impl IoUringLoop {
ctx: PacketProcessorCtx,
buffer_pool: Arc<crate::pool::BufferPool>,
shutdown: crate::ShutdownRx,
) -> Result<tokio::sync::oneshot::Receiver<()>, PipelineError> {
) -> Result<std::sync::mpsc::Receiver<()>, PipelineError> {
let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = std::sync::mpsc::channel();

let rt = self.runtime;
let socket = self.socket;
Expand Down
10 changes: 5 additions & 5 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct DownstreamReceiveWorkerConfig {

impl DownstreamReceiveWorkerConfig {
#[inline]
pub(crate) async fn process_task(
pub(crate) fn process_task(
packet: DownstreamPacket,
worker_id: usize,
config: &Arc<Config>,
Expand All @@ -70,7 +70,7 @@ impl DownstreamReceiveWorkerConfig {
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions).await {
match Self::process_downstream_received_packet(packet, config, sessions) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -88,7 +88,7 @@ impl DownstreamReceiveWorkerConfig {

/// Processes a packet by running it through the filter chain.
#[inline]
async fn process_downstream_received_packet(
fn process_downstream_received_packet(
packet: DownstreamPacket,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
Expand Down Expand Up @@ -123,7 +123,7 @@ impl DownstreamReceiveWorkerConfig {
dest: epa.to_socket_addr()?,
};

sessions.send(session_key, contents.clone()).await?;
sessions.send(session_key, contents.clone())?;
}

Ok(())
Expand All @@ -143,7 +143,7 @@ pub async fn spawn_receivers(
upstream_receiver: DownstreamReceiver,
buffer_pool: Arc<crate::pool::BufferPool>,
shutdown: crate::ShutdownRx,
) -> crate::Result<Vec<tokio::sync::oneshot::Receiver<()>>> {
) -> crate::Result<Vec<std::sync::mpsc::Receiver<()>>> {
let (error_sender, mut error_receiver) = mpsc::channel(128);

let port = crate::net::socket_port(&socket);
Expand Down
2 changes: 1 addition & 1 deletion src/components/proxy/packet_router/io_uring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(
self,
shutdown: crate::ShutdownRx,
) -> eyre::Result<tokio::sync::oneshot::Receiver<()>> {
) -> eyre::Result<std::sync::mpsc::Receiver<()>> {
use crate::components::proxy::io_uring_shared;

let Self {
Expand Down
9 changes: 4 additions & 5 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl super::DownstreamReceiveWorkerConfig {
pub async fn spawn(
self,
_shutdown: crate::ShutdownRx,
) -> eyre::Result<tokio::sync::oneshot::Receiver<()>> {
) -> eyre::Result<std::sync::mpsc::Receiver<()>> {
let Self {
worker_id,
upstream_receiver,
Expand All @@ -31,7 +31,7 @@ impl super::DownstreamReceiveWorkerConfig {
buffer_pool,
} = self;

let (tx, rx) = tokio::sync::oneshot::channel();
let (tx, rx) = std::sync::mpsc::channel();

let thread_span =
uring_span!(tracing::debug_span!("receiver", id = worker_id).or_current());
Expand Down Expand Up @@ -131,8 +131,7 @@ impl super::DownstreamReceiveWorkerConfig {
}
last_received_at = Some(received_at);

Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc)
.await;
Self::process_task(packet, worker_id, &config, &sessions, &mut error_acc);
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand All @@ -143,7 +142,7 @@ impl super::DownstreamReceiveWorkerConfig {
});

use eyre::WrapErr as _;
worker.await.context("failed to spawn receiver task")?;
worker.recv().context("failed to spawn receiver task")?;
Ok(rx)
}
}
Loading

0 comments on commit ec0668f

Please sign in to comment.