diff --git a/capture/src/partition_limits.rs b/capture/src/partition_limits.rs index 3866657..cd0148f 100644 --- a/capture/src/partition_limits.rs +++ b/capture/src/partition_limits.rs @@ -11,6 +11,7 @@ use std::num::NonZeroU32; use std::sync::Arc; use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter}; +use metrics::gauge; // See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads #[derive(Clone)] @@ -38,6 +39,16 @@ impl PartitionLimiter { pub fn is_limited(&self, key: &String) -> bool { self.forced_keys.contains(key) || self.limiter.check_key(key).is_err() } + + /// Reports the number of tracked keys to prometheus every 10 seconds, + /// needs to be spawned in a separate task. + pub async fn report_metrics(&self) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10)); + loop { + interval.tick().await; + gauge!("partition_limits_key_count", self.limiter.len() as f64); + } + } } #[cfg(test)] diff --git a/capture/src/server.rs b/capture/src/server.rs index 32bafa8..3eca676 100644 --- a/capture/src/server.rs +++ b/capture/src/server.rs @@ -49,6 +49,12 @@ where config.burst_limit, config.overflow_forced_keys, ); + if config.export_prometheus { + let partition = partition.clone(); + tokio::spawn(async move { + partition.report_metrics().await; + }); + } let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition) .expect("failed to start Kafka sink");