diff --git a/src/components/proxy/io_uring_shared.rs b/src/components/proxy/io_uring_shared.rs index 0a4c02389..6e23d54d6 100644 --- a/src/components/proxy/io_uring_shared.rs +++ b/src/components/proxy/io_uring_shared.rs @@ -455,7 +455,7 @@ impl IoUringLoop { // Just double buffer the pending writes for simplicity let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity()); - let processing_metrics = metrics::ProcessingMetrics::new(); + let mut processing_metrics = metrics::ProcessingMetrics::new(); // When sending packets, this is the direction used when updating metrics let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) { @@ -567,7 +567,8 @@ impl IoUringLoop { } Token::Send { key } => { let packet = loop_ctx.pop_packet(key).finalize_send(); - let asn_info = packet.asn_info.as_ref().into(); + let ip_metrics_entry = packet.asn_info; + let asn_info = ip_metrics_entry.as_ref().into(); if ret < 0 { let source = @@ -576,16 +577,25 @@ impl IoUringLoop { metrics::packets_dropped_total(send_dir, &source, &asn_info) .inc(); } else if ret as usize != packet.data.len() { - metrics::packets_total(send_dir, &asn_info).inc(); metrics::errors_total( send_dir, "sent bytes != packet length", &asn_info, ) .inc(); + *processing_metrics + .packets_total + .entry((send_dir, ip_metrics_entry)) + .or_default() += 1; } else { - metrics::packets_total(send_dir, &asn_info).inc(); - metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64); + *processing_metrics + .packets_total + .entry((send_dir, ip_metrics_entry.clone())) + .or_default() += 1; + *processing_metrics + .bytes_total + .entry((send_dir, ip_metrics_entry)) + .or_default() += ret as usize; } } } diff --git a/src/filters/chain.rs b/src/filters/chain.rs index c57dc892b..75baaef5a 100644 --- a/src/filters/chain.rs +++ b/src/filters/chain.rs @@ -14,28 +14,11 @@ * limitations under the License. */ -use prometheus::{exponential_buckets, Histogram}; - use crate::{ config::Filter as FilterConfig, filters::{prelude::*, FilterRegistry}, - metrics::{histogram_opts, CollectorExt}, }; -const FILTER_LABEL: &str = "filter"; - -/// Start the histogram bucket at an eighth of a millisecond, as we bucketed the full filter -/// chain processing starting at a quarter of a millisecond, so we we will want finer granularity -/// here. -const BUCKET_START: f64 = 0.000125; - -const BUCKET_FACTOR: f64 = 2.5; - -/// At an exponential factor of 2.5 (BUCKET_FACTOR), 11 iterations gets us to just over half a -/// second. Any processing that occurs over half a second is far too long, so we end -/// the bucketing there as we don't care about granularity past this value. -const BUCKET_COUNT: usize = 11; - /// A chain of [`Filter`]s to be executed in order. /// /// Executes each filter, passing the [`ReadContext`] and [`WriteContext`] @@ -45,50 +28,11 @@ const BUCKET_COUNT: usize = 11; #[derive(Clone, Default)] pub struct FilterChain { filters: Vec<(String, FilterInstance)>, - filter_read_duration_seconds: Vec, - filter_write_duration_seconds: Vec, } impl FilterChain { pub fn new(filters: Vec<(String, FilterInstance)>) -> Result { - let subsystem = "filter"; - - Ok(Self { - filter_read_duration_seconds: filters - .iter() - .map(|(name, _)| { - Histogram::with_opts( - histogram_opts( - "read_duration_seconds", - subsystem, - "Seconds taken to execute a given filter's `read`.", - Some( - exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT) - .unwrap(), - ), - ) - .const_label(FILTER_LABEL, name), - ) - .and_then(|histogram| histogram.register_if_not_exists()) - }) - .collect::>()?, - filter_write_duration_seconds: filters - .iter() - .map(|(name, _)| { - Histogram::with_opts( - histogram_opts( - "write_duration_seconds", - subsystem, - "Seconds taken to execute a given filter's `write`.", - Some(exponential_buckets(0.000125, 2.5, 11).unwrap()), - ) - .const_label(FILTER_LABEL, name), - ) - .and_then(|histogram| histogram.register_if_not_exists()) - }) - .collect::>()?, - filters, - }) + Ok(Self { filters }) } #[inline] @@ -274,15 +218,9 @@ impl schemars::JsonSchema for FilterChain { impl Filter for FilterChain { fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> { - for ((id, instance), histogram) in self - .filters - .iter() - .zip(self.filter_read_duration_seconds.iter()) - { + for (id, instance) in self.filters.iter() { tracing::trace!(%id, "read filtering packet"); - let timer = histogram.start_timer(); let result = instance.filter().read(ctx); - timer.stop_and_record(); match result { Ok(()) => tracing::trace!(%id, "read passing packet"), Err(error) => { @@ -304,16 +242,9 @@ impl Filter for FilterChain { } fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> { - for ((id, instance), histogram) in self - .filters - .iter() - .rev() - .zip(self.filter_write_duration_seconds.iter().rev()) - { + for (id, instance) in self.filters.iter().rev() { tracing::trace!(%id, "write filtering packet"); - let timer = histogram.start_timer(); let result = instance.filter().write(ctx); - timer.stop_and_record(); match result { Ok(()) => tracing::trace!(%id, "write passing packet"), Err(error) => { diff --git a/src/metrics.rs b/src/metrics.rs index a028ee2f3..704b1a3e6 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -14,6 +14,8 @@ * limitations under the License. */ +use std::collections::HashMap; + use crate::net::maxmind_db::MetricsIpNetEntry; use once_cell::sync::Lazy; use prometheus::{ @@ -21,8 +23,6 @@ use prometheus::{ IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS, }; -pub use prometheus::Result; - /// "event" is used as a label for Metrics that can apply to both Filter /// `read` and `write` executions. pub const DIRECTION_LABEL: &str = "event"; @@ -58,7 +58,7 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0; /// care about granularity past 1 second. pub(crate) const BUCKET_COUNT: usize = 13; -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum Direction { Read, Write, @@ -269,34 +269,33 @@ pub fn register(collector: T) -> T { .unwrap() } -pub trait CollectorExt: Collector + Clone + Sized + 'static { - /// Registers the current metric collector with the provided registry - /// if not already registered. - fn register_if_not_exists(self) -> Result { - match registry().register(Box::from(self.clone())) { - Ok(_) | Err(prometheus::Error::AlreadyReg) => Ok(self), - Err(err) => Err(err), - } - } -} - -impl CollectorExt for C {} - /// A local instance of all of the metrics related to packet processing. pub struct ProcessingMetrics { pub read_processing_time: LocalHistogram, + pub packets_total: HashMap<(Direction, Option), usize>, + pub bytes_total: HashMap<(Direction, Option), usize>, } impl ProcessingMetrics { pub fn new() -> Self { Self { read_processing_time: processing_time(READ).local(), + packets_total: <_>::default(), + bytes_total: <_>::default(), } } #[inline] - pub fn flush(&self) { + pub fn flush(&mut self) { self.read_processing_time.flush(); + + for ((send_dir, asn_info), amount) in self.packets_total.drain() { + packets_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _); + } + + for ((send_dir, asn_info), amount) in self.bytes_total.drain() { + bytes_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _); + } } } diff --git a/src/net/maxmind_db.rs b/src/net/maxmind_db.rs index 734841f51..27089bc27 100644 --- a/src/net/maxmind_db.rs +++ b/src/net/maxmind_db.rs @@ -171,7 +171,7 @@ pub struct IpNetEntry { pub prefix: String, } -#[derive(Clone)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct MetricsIpNetEntry { pub prefix: String, pub id: u64,