Skip to content

Commit

Permalink
Move pipeline errors from metrics to a fixed interval report (#843)
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky authored Nov 1, 2023
1 parent f1b1ef0 commit 334c75f
Showing 1 changed file with 46 additions and 8 deletions.
54 changes: 46 additions & 8 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
time::Duration,
};

use tokio::sync::mpsc;
use tonic::transport::Endpoint;

use super::Admin;
Expand Down Expand Up @@ -190,6 +191,7 @@ impl Proxy {
// The number of worker tasks to spawn. Each task gets a dedicated queue to
// consume packets off.
let num_workers = num_cpus::get();
let (error_sender, mut error_receiver) = mpsc::unbounded_channel();

// Contains config for each worker task.
let mut workers = Vec::with_capacity(num_workers);
Expand All @@ -198,6 +200,7 @@ impl Proxy {
socket: shared_socket,
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
});

for worker_id in 1..num_workers {
Expand All @@ -206,6 +209,7 @@ impl Proxy {
socket: Arc::new(DualStackLocalSocket::new(self.port)?),
config: config.clone(),
sessions: sessions.clone(),
error_sender: error_sender.clone(),
})
}

Expand All @@ -215,6 +219,31 @@ impl Proxy {
worker.spawn();
}

tokio::spawn(async move {
let mut log_task = tokio::time::interval(std::time::Duration::from_secs(5));

let mut pipeline_errors = std::collections::HashMap::<String, u64>::new();
loop {
tokio::select! {
_ = log_task.tick() => {
for (error, instances) in &pipeline_errors {
tracing::info!(%error, %instances, "pipeline report");
}
pipeline_errors.clear();
}
received = error_receiver.recv() => {
let Some(error) = received else {
tracing::info!("pipeline reporting task closed");
return;
};

let entry = pipeline_errors.entry(error.to_string()).or_default();
*entry += 1;
}
}
}
});

Ok(())
}
}
Expand Down Expand Up @@ -254,6 +283,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig {
pub socket: Arc<DualStackLocalSocket>,
pub config: Arc<Config>,
pub sessions: Arc<SessionPool>,
pub error_sender: mpsc::UnboundedSender<PipelineError>,
}

impl DownstreamReceiveWorkerConfig {
Expand All @@ -263,6 +293,7 @@ impl DownstreamReceiveWorkerConfig {
socket,
config,
sessions,
error_sender,
} = self;

tokio::spawn(async move {
Expand Down Expand Up @@ -298,7 +329,7 @@ impl DownstreamReceiveWorkerConfig {
}
last_received_at = Some(packet.received_at);

Self::process_task(packet, source, worker_id, &config, &sessions).await;
Self::process_task(packet, source, worker_id, &config, &sessions, &error_sender).await;
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand All @@ -318,6 +349,7 @@ impl DownstreamReceiveWorkerConfig {
worker_id: usize,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
error_sender: &mpsc::UnboundedSender<PipelineError>,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -337,10 +369,15 @@ impl DownstreamReceiveWorkerConfig {
crate::metrics::bytes_total(crate::metrics::READ, asn_info).inc_by(size as u64);
}
Err(error) => {
let source = error.to_string();
crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc();
crate::metrics::packets_dropped_total(crate::metrics::READ, &source, asn_info)
.inc();
let discriminant = PipelineErrorDiscriminants::from(&error).to_string();
crate::metrics::errors_total(crate::metrics::READ, &discriminant, asn_info).inc();
crate::metrics::packets_dropped_total(
crate::metrics::READ,
&discriminant,
asn_info,
)
.inc();
let _ = error_sender.send(error);
}
}

Expand Down Expand Up @@ -379,14 +416,13 @@ impl DownstreamReceiveWorkerConfig {
}
}

#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, strum_macros::EnumDiscriminants)]
#[strum_discriminants(derive(strum_macros::Display))]
pub enum PipelineError {
#[error("No upstream endpoints available")]
NoUpstreamEndpoints,
#[error("filter {0}")]
Filter(#[from] crate::filters::FilterError),
#[error("qcmp: {0}")]
Qcmp(#[from] crate::codec::qcmp::Error),
#[error("OS level error: {0}")]
Io(#[from] std::io::Error),
}
Expand Down Expand Up @@ -541,6 +577,7 @@ mod tests {
async fn spawn_downstream_receive_workers() {
let t = TestHelper::default();

let (error_sender, _error_receiver) = mpsc::unbounded_channel();
let socket = Arc::new(create_socket().await);
let addr = socket.local_ipv6_addr().unwrap();
let endpoint = t.open_socket_and_recv_single_packet().await;
Expand All @@ -555,6 +592,7 @@ mod tests {
worker_id: 1,
socket: socket.clone(),
config: config.clone(),
error_sender,
sessions: SessionPool::new(
config,
Arc::new(
Expand Down

0 comments on commit 334c75f

Please sign in to comment.