Skip to content

Commit

Permalink
Remove spawn task for each packet. (#842)
Browse files Browse the repository at this point in the history
When measured it seems we wasting more time with moving memory into the
task than we gained from having it be its own task.
  • Loading branch information
XAMPPRocky authored Nov 1, 2023
1 parent 80552ce commit e99a276
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl DownstreamReceiveWorkerConfig {
}
last_received_at = Some(packet.received_at);

Self::spawn_process_task(packet, source, worker_id, &config, &sessions)
Self::process_task(packet, source, worker_id, &config, &sessions).await;
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand All @@ -312,7 +312,7 @@ impl DownstreamReceiveWorkerConfig {
}

#[inline]
fn spawn_process_task(
async fn process_task(
packet: DownstreamPacket,
source: std::net::SocketAddr,
worker_id: usize,
Expand All @@ -327,43 +327,32 @@ impl DownstreamReceiveWorkerConfig {
"received packet from downstream"
);

tokio::spawn({
let config = config.clone();
let sessions = sessions.clone();
let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();

async move {
let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();

let asn_info = packet.asn_info.clone();
let asn_info = asn_info.as_ref();
match Self::process_downstream_received_packet(packet, config, sessions).await {
Ok(size) => {
crate::metrics::packets_total(crate::metrics::READ, asn_info).inc();
crate::metrics::bytes_total(crate::metrics::READ, asn_info)
.inc_by(size as u64);
}
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc();
crate::metrics::packets_dropped_total(
crate::metrics::READ,
&source,
asn_info,
)
.inc();
}
}

timer.stop_and_record();
let asn_info = packet.asn_info.clone();
let asn_info = asn_info.as_ref();
match Self::process_downstream_received_packet(packet, config, sessions).await {
Ok(size) => {
crate::metrics::packets_total(crate::metrics::READ, asn_info).inc();
crate::metrics::bytes_total(crate::metrics::READ, asn_info).inc_by(size as u64);
}
});
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc();
crate::metrics::packets_dropped_total(crate::metrics::READ, &source, asn_info)
.inc();
}
}

timer.stop_and_record();
}

/// Processes a packet by running it through the filter chain.
#[inline]
async fn process_downstream_received_packet(
packet: DownstreamPacket,
config: Arc<Config>,
sessions: Arc<SessionPool>,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
) -> Result<usize, PipelineError> {
let endpoints: Vec<_> = config.clusters.read().endpoints().collect();
if endpoints.is_empty() {
Expand Down

0 comments on commit e99a276

Please sign in to comment.