diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index e08a6d1033..dde8edf207 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -25,6 +25,7 @@ use std::{ time::Duration, }; +use tokio::sync::mpsc; use tonic::transport::Endpoint; use super::Admin; @@ -190,6 +191,7 @@ impl Proxy { // The number of worker tasks to spawn. Each task gets a dedicated queue to // consume packets off. let num_workers = num_cpus::get(); + let (error_sender, mut error_receiver) = mpsc::unbounded_channel(); // Contains config for each worker task. let mut workers = Vec::with_capacity(num_workers); @@ -198,6 +200,7 @@ impl Proxy { socket: shared_socket, config: config.clone(), sessions: sessions.clone(), + error_sender: error_sender.clone(), }); for worker_id in 1..num_workers { @@ -206,6 +209,7 @@ impl Proxy { socket: Arc::new(DualStackLocalSocket::new(self.port)?), config: config.clone(), sessions: sessions.clone(), + error_sender: error_sender.clone(), }) } @@ -215,6 +219,31 @@ impl Proxy { worker.spawn(); } + tokio::spawn(async move { + let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5)); + + let mut pipeline_errors = std::collections::HashMap::::new(); + loop { + tokio::select! { + _ = log_task.tick() => { + for (error, instances) in &pipeline_errors { + tracing::info!(%error, %instances, "pipeline report"); + } + pipeline_errors.clear(); + } + received = error_receiver.recv() => { + let Some(error) = received else { + tracing::info!("pipeline reporting task closed"); + return; + }; + + let entry = pipeline_errors.entry(error.to_string()).or_default(); + *entry += 1; + } + } + } + }); + Ok(()) } } @@ -254,6 +283,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig { pub socket: Arc, pub config: Arc, pub sessions: Arc, + pub error_sender: mpsc::UnboundedSender, } impl DownstreamReceiveWorkerConfig { @@ -263,6 +293,7 @@ impl DownstreamReceiveWorkerConfig { socket, config, sessions, + error_sender, } = self; tokio::spawn(async move { @@ -298,7 +329,7 @@ impl DownstreamReceiveWorkerConfig { } last_received_at = Some(packet.received_at); - Self::process_task(packet, source, worker_id, &config, &sessions).await; + Self::process_task(packet, source, worker_id, &config, &sessions, &error_sender).await; } Err(error) => { tracing::error!(%error, "error receiving packet"); @@ -318,6 +349,7 @@ impl DownstreamReceiveWorkerConfig { worker_id: usize, config: &Arc, sessions: &Arc, + error_sender: &mpsc::UnboundedSender, ) { tracing::trace!( id = worker_id, @@ -337,10 +369,15 @@ impl DownstreamReceiveWorkerConfig { crate::metrics::bytes_total(crate::metrics::READ, asn_info).inc_by(size as u64); } Err(error) => { - let source = error.to_string(); - crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc(); - crate::metrics::packets_dropped_total(crate::metrics::READ, &source, asn_info) - .inc(); + let discriminant = PipelineErrorDiscriminants::from(&error).to_string(); + crate::metrics::errors_total(crate::metrics::READ, &discriminant, asn_info).inc(); + crate::metrics::packets_dropped_total( + crate::metrics::READ, + &discriminant, + asn_info, + ) + .inc(); + let _ = error_sender.send(error); } } @@ -379,14 +416,13 @@ impl DownstreamReceiveWorkerConfig { } } -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, strum_macros::EnumDiscriminants)] +#[strum_discriminants(derive(strum_macros::Display))] pub enum PipelineError { #[error("No upstream endpoints available")] NoUpstreamEndpoints, #[error("filter {0}")] Filter(#[from] crate::filters::FilterError), - #[error("qcmp: {0}")] - Qcmp(#[from] crate::codec::qcmp::Error), #[error("OS level error: {0}")] Io(#[from] std::io::Error), } @@ -541,6 +577,7 @@ mod tests { async fn spawn_downstream_receive_workers() { let t = TestHelper::default(); + let (error_sender, _error_receiver) = mpsc::unbounded_channel(); let socket = Arc::new(create_socket().await); let addr = socket.local_ipv6_addr().unwrap(); let endpoint = t.open_socket_and_recv_single_packet().await; @@ -555,6 +592,7 @@ mod tests { worker_id: 1, socket: socket.clone(), config: config.clone(), + error_sender, sessions: SessionPool::new( config, Arc::new(