Skip to content

Commit

Permalink
perf: remove filter specific measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Dec 3, 2024
1 parent 0b944af commit 5574beb
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 109 deletions.
20 changes: 15 additions & 5 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ impl IoUringLoop {
// Just double buffer the pending writes for simplicity
let mut double_pending_sends = Vec::with_capacity(pending_sends.capacity());

let processing_metrics = metrics::ProcessingMetrics::new();
let mut processing_metrics = metrics::ProcessingMetrics::new();

// When sending packets, this is the direction used when updating metrics
let send_dir = if matches!(ctx, PacketProcessorCtx::Router { .. }) {
Expand Down Expand Up @@ -567,7 +567,8 @@ impl IoUringLoop {
}
Token::Send { key } => {
let packet = loop_ctx.pop_packet(key).finalize_send();
let asn_info = packet.asn_info.as_ref().into();
let ip_metrics_entry = packet.asn_info;
let asn_info = ip_metrics_entry.as_ref().into();

if ret < 0 {
let source =
Expand All @@ -576,16 +577,25 @@ impl IoUringLoop {
metrics::packets_dropped_total(send_dir, &source, &asn_info)
.inc();
} else if ret as usize != packet.data.len() {
metrics::packets_total(send_dir, &asn_info).inc();
metrics::errors_total(
send_dir,
"sent bytes != packet length",
&asn_info,
)
.inc();
*processing_metrics
.packets_total
.entry((send_dir, ip_metrics_entry))
.or_default() += 1;
} else {
metrics::packets_total(send_dir, &asn_info).inc();
metrics::bytes_total(send_dir, &asn_info).inc_by(ret as u64);
*processing_metrics
.packets_total
.entry((send_dir, ip_metrics_entry.clone()))
.or_default() += 1;
*processing_metrics
.bytes_total
.entry((send_dir, ip_metrics_entry))
.or_default() += ret as usize;
}
}
}
Expand Down
33 changes: 19 additions & 14 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,26 @@ impl DownstreamReceiveWorkerConfig {
"received packet from downstream"
);

let timer = processing_time.start_timer();
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Err(error) => {
let discriminant = error.discriminant();
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc();

error_acc.push_error(error);
}
}
processing_time.observe_closure_duration(
|| match Self::process_downstream_received_packet(
packet,
config,
sessions,
destinations,
) {
Ok(()) => {
error_acc.maybe_send();
}
Err(error) => {
let discriminant = error.discriminant();
metrics::errors_total(metrics::READ, discriminant, &metrics::EMPTY).inc();
metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY)
.inc();

timer.stop_and_record();
error_acc.push_error(error);
}
},
);
}

/// Processes a packet by running it through the filter chain.
Expand Down
75 changes: 3 additions & 72 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,11 @@
* limitations under the License.
*/

use prometheus::{exponential_buckets, Histogram};

use crate::{
config::Filter as FilterConfig,
filters::{prelude::*, FilterRegistry},
metrics::{histogram_opts, CollectorExt},
};

const FILTER_LABEL: &str = "filter";

/// Start the histogram bucket at an eighth of a millisecond, as we bucketed the full filter
/// chain processing starting at a quarter of a millisecond, so we we will want finer granularity
/// here.
const BUCKET_START: f64 = 0.000125;

const BUCKET_FACTOR: f64 = 2.5;

/// At an exponential factor of 2.5 (BUCKET_FACTOR), 11 iterations gets us to just over half a
/// second. Any processing that occurs over half a second is far too long, so we end
/// the bucketing there as we don't care about granularity past this value.
const BUCKET_COUNT: usize = 11;

/// A chain of [`Filter`]s to be executed in order.
///
/// Executes each filter, passing the [`ReadContext`] and [`WriteContext`]
Expand All @@ -45,50 +28,11 @@ const BUCKET_COUNT: usize = 11;
#[derive(Clone, Default)]
pub struct FilterChain {
filters: Vec<(String, FilterInstance)>,
filter_read_duration_seconds: Vec<Histogram>,
filter_write_duration_seconds: Vec<Histogram>,
}

impl FilterChain {
pub fn new(filters: Vec<(String, FilterInstance)>) -> Result<Self, CreationError> {
let subsystem = "filter";

Ok(Self {
filter_read_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
histogram_opts(
"read_duration_seconds",
subsystem,
"Seconds taken to execute a given filter's `read`.",
Some(
exponential_buckets(BUCKET_START, BUCKET_FACTOR, BUCKET_COUNT)
.unwrap(),
),
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists())
})
.collect::<Result<_, prometheus::Error>>()?,
filter_write_duration_seconds: filters
.iter()
.map(|(name, _)| {
Histogram::with_opts(
histogram_opts(
"write_duration_seconds",
subsystem,
"Seconds taken to execute a given filter's `write`.",
Some(exponential_buckets(0.000125, 2.5, 11).unwrap()),
)
.const_label(FILTER_LABEL, name),
)
.and_then(|histogram| histogram.register_if_not_exists())
})
.collect::<Result<_, prometheus::Error>>()?,
filters,
})
Ok(Self { filters })
}

#[inline]
Expand Down Expand Up @@ -274,15 +218,9 @@ impl schemars::JsonSchema for FilterChain {

impl Filter for FilterChain {
fn read(&self, ctx: &mut ReadContext<'_>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
.zip(self.filter_read_duration_seconds.iter())
{
for (id, instance) in self.filters.iter() {
tracing::trace!(%id, "read filtering packet");
let timer = histogram.start_timer();
let result = instance.filter().read(ctx);
timer.stop_and_record();
match result {
Ok(()) => tracing::trace!(%id, "read passing packet"),
Err(error) => {
Expand All @@ -304,16 +242,9 @@ impl Filter for FilterChain {
}

fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
.rev()
.zip(self.filter_write_duration_seconds.iter().rev())
{
for (id, instance) in self.filters.iter().rev() {
tracing::trace!(%id, "write filtering packet");
let timer = histogram.start_timer();
let result = instance.filter().write(ctx);
timer.stop_and_record();
match result {
Ok(()) => tracing::trace!(%id, "write passing packet"),
Err(error) => {
Expand Down
33 changes: 16 additions & 17 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

use std::collections::HashMap;

use crate::net::maxmind_db::MetricsIpNetEntry;
use once_cell::sync::Lazy;
use prometheus::{
core::Collector, local::LocalHistogram, Histogram, HistogramOpts, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, DEFAULT_BUCKETS,
};

pub use prometheus::Result;

/// "event" is used as a label for Metrics that can apply to both Filter
/// `read` and `write` executions.
pub const DIRECTION_LABEL: &str = "event";
Expand Down Expand Up @@ -58,7 +58,7 @@ pub(crate) const BUCKET_FACTOR: f64 = 2.0;
/// care about granularity past 1 second.
pub(crate) const BUCKET_COUNT: usize = 13;

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Direction {
Read,
Write,
Expand Down Expand Up @@ -269,34 +269,33 @@ pub fn register<T: Collector + Sized + Clone + 'static>(collector: T) -> T {
.unwrap()
}

pub trait CollectorExt: Collector + Clone + Sized + 'static {
/// Registers the current metric collector with the provided registry
/// if not already registered.
fn register_if_not_exists(self) -> Result<Self> {
match registry().register(Box::from(self.clone())) {
Ok(_) | Err(prometheus::Error::AlreadyReg) => Ok(self),
Err(err) => Err(err),
}
}
}

impl<C: Collector + Clone + 'static> CollectorExt for C {}

/// A local instance of all of the metrics related to packet processing.
pub struct ProcessingMetrics {
pub read_processing_time: LocalHistogram,
pub packets_total: HashMap<(Direction, Option<MetricsIpNetEntry>), usize>,
pub bytes_total: HashMap<(Direction, Option<MetricsIpNetEntry>), usize>,
}

impl ProcessingMetrics {
pub fn new() -> Self {
Self {
read_processing_time: processing_time(READ).local(),
packets_total: <_>::default(),
bytes_total: <_>::default(),
}
}

#[inline]
pub fn flush(&self) {
pub fn flush(&mut self) {
self.read_processing_time.flush();

for ((send_dir, asn_info), amount) in self.packets_total.drain() {
packets_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _);
}

for ((send_dir, asn_info), amount) in self.bytes_total.drain() {
bytes_total(send_dir, &asn_info.as_ref().into()).inc_by(amount as _);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/maxmind_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub struct IpNetEntry {
pub prefix: String,
}

#[derive(Clone)]
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct MetricsIpNetEntry {
pub prefix: String,
pub id: u64,
Expand Down

0 comments on commit 5574beb

Please sign in to comment.