diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 8962d5ef..7d49a3a8 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -12,7 +12,6 @@ use crate::config::{ use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; use crate::metrics::MetricsLayer; -use ::metrics::histogram; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; use limitador::counter::Counter; @@ -202,11 +201,17 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::layer() }; - let metrics_layer = MetricsLayer::new().gather( - "should_rate_limit", - |timings| histogram!("counter_latency").record(Duration::from(timings).as_secs_f64()), - vec!["datastore"], - ); + let metrics_layer = MetricsLayer::new() + .gather( + "should_rate_limit", + PrometheusMetrics::record_datastore_latency, + vec!["datastore"], + ) + .gather( + "flush_batcher_and_update_counters", + PrometheusMetrics::record_datastore_latency, + vec!["datastore"], + ); if !config.tracing_endpoint.is_empty() { global::set_text_map_propagator(TraceContextPropagator::new()); diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs index b19119e6..d41df57f 100644 --- a/limitador-server/src/metrics.rs +++ b/limitador-server/src/metrics.rs @@ -11,6 +11,7 @@ pub struct Timings { idle: u64, busy: u64, last: Instant, + updated: bool, } impl Timings { @@ -19,6 +20,7 @@ impl Timings { idle: 0, busy: 0, last: Instant::now(), + updated: false, } } } @@ -31,6 +33,7 @@ impl ops::Add for Timings { busy: self.busy + rhs.busy, idle: self.idle + rhs.idle, last: self.last.max(rhs.last), + updated: self.updated || rhs.updated, } } } @@ -68,39 +71,39 @@ impl SpanState { } } -pub struct MetricsGroup { - consumer: F, +pub struct MetricsGroup { + consumer: Box, records: Vec, } -impl MetricsGroup { - pub fn new(consumer: F, records: Vec) -> Self { +impl MetricsGroup { + pub fn new(consumer: Box, records: Vec) -> Self { Self { consumer, records } } } -pub struct MetricsLayer { - groups: HashMap>, +pub struct MetricsLayer { + groups: HashMap, } -impl MetricsLayer { +impl MetricsLayer { pub fn new() -> Self { Self { groups: HashMap::new(), } } - pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self { + pub fn gather(mut self, aggregate: &str, consumer: fn(Timings), records: Vec<&str>) -> Self { // TODO(adam-cattermole): does not handle case where aggregate already exists let rec = records.iter().map(|r| r.to_string()).collect(); self.groups .entry(aggregate.to_string()) - .or_insert(MetricsGroup::new(consumer, rec)); + .or_insert(MetricsGroup::new(Box::new(consumer), rec)); self } } -impl Layer for MetricsLayer +impl Layer for MetricsLayer where S: Subscriber, S: for<'lookup> LookupSpan<'lookup>, @@ -160,6 +163,7 @@ where let now = Instant::now(); timings.idle += (now - timings.last).as_nanos() as u64; timings.last = now; + timings.updated = true; } } @@ -171,6 +175,7 @@ where let now = Instant::now(); timings.busy += (now - timings.last).as_nanos() as u64; timings.last = now; + timings.updated = true; } } @@ -210,7 +215,9 @@ where } // IF we are aggregator call consume function if let Some(metrics_group) = self.groups.get(name) { - (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) + if let Some(t) = span_state.group_times.get(name).filter(|&t| t.updated) { + (metrics_group.consumer)(*t); + } } } } @@ -228,11 +235,13 @@ mod tests { idle: 5, busy: 5, last: now, + updated: false, }; let t2 = Timings { idle: 3, busy: 5, last: now, + updated: false, }; let t3 = t1 + t2; assert_eq!( @@ -240,7 +249,8 @@ mod tests { Timings { idle: 8, busy: 10, - last: now + last: now, + updated: false, } ) } @@ -252,11 +262,13 @@ mod tests { idle: 5, busy: 5, last: now, + updated: false, }; let t2 = Timings { idle: 3, busy: 5, last: now, + updated: false, }; t1 += t2; assert_eq!( @@ -264,7 +276,8 @@ mod tests { Timings { idle: 8, busy: 10, - last: now + last: now, + updated: false, } ) } @@ -277,6 +290,7 @@ mod tests { idle: 5, busy: 5, last: Instant::now(), + updated: true, }; span_state.increment(&group, t1); assert_eq!(span_state.group_times.get(&group).unwrap().idle, t1.idle); diff --git a/limitador-server/src/prometheus_metrics.rs b/limitador-server/src/prometheus_metrics.rs index 67dd60f5..3293ec22 100644 --- a/limitador-server/src/prometheus_metrics.rs +++ b/limitador-server/src/prometheus_metrics.rs @@ -1,7 +1,9 @@ -use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge}; +use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use std::sync::Arc; +use std::time::Duration; +use crate::metrics::Timings; use limitador::limit::Namespace; const NAMESPACE_LABEL: &str = "limitador_namespace"; @@ -40,13 +42,18 @@ impl PrometheusMetrics { prometheus_handle: Arc, ) -> Self { describe_histogram!( - "counter_latency", + "datastore_latency", "Latency to the underlying counter datastore" ); describe_counter!("authorized_calls", "Authorized calls"); describe_counter!("limited_calls", "Limited calls"); describe_gauge!("limitador_up", "Limitador is running"); gauge!("limitador_up").set(1); + describe_gauge!( + "datastore_partitioned", + "Limitador is partitioned from backing datastore" + ); + gauge!("datastore_partitioned").set(0); Self { use_limit_name_label, prometheus_handle, @@ -86,6 +93,10 @@ impl PrometheusMetrics { pub fn gather_metrics(&self) -> String { self.prometheus_handle.render() } + + pub fn record_datastore_latency(timings: Timings) { + histogram!("datastore_latency").record(Duration::from(timings).as_secs_f64()) + } } #[cfg(test)] diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index a88ef332..d527bd1f 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -3,6 +3,8 @@ use crate::storage::atomic_expiring_value::AtomicExpiringValue; use crate::storage::redis::DEFAULT_MAX_CACHED_COUNTERS; use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use metrics::{counter, gauge, histogram}; +use moka::notification::RemovalCause; use moka::sync::Cache; use std::collections::HashMap; use std::future::Future; @@ -42,8 +44,11 @@ impl CachedCounterValue { } } - pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime) { - self.value.add_and_set_expiry(delta, expire_at); + pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime, max_value: u64) { + let new_val = self.value.add_and_set_expiry(delta, expire_at); + if new_val > max_value { + histogram!("counter_overshoot").record((new_val - max_value) as f64); + } self.initial_value.fetch_add(delta, Ordering::SeqCst); self.from_authority.store(true, Ordering::Release); } @@ -150,6 +155,7 @@ impl Batcher { } Entry::Vacant(miss) => { self.limiter.acquire().await.unwrap().forget(); + gauge!("batcher_size").increment(1); miss.insert_entry(value); } }; @@ -183,6 +189,7 @@ impl Batcher { let value = self.updates.get(counter).unwrap().clone(); result.insert(counter.clone(), value); } + histogram!("batcher_flush_size").record(result.len() as f64); let result = consumer(result).await; batch.iter().for_each(|counter| { let prev = self @@ -190,6 +197,7 @@ impl Batcher { .remove_if(counter, |_, v| v.no_pending_writes()); if prev.is_some() { self.limiter.add_permits(1); + gauge!("batcher_size").decrement(1); } }); return result; @@ -232,6 +240,7 @@ impl CountersCache { if option.is_none() { let from_queue = self.batcher.updates.get(counter); if let Some(entry) = from_queue { + gauge!("cache_size").increment(1); self.cache.insert(counter.clone(), entry.value().clone()); return Some(entry.value().clone()); } @@ -255,17 +264,22 @@ impl CountersCache { if expiry_ts > SystemTime::now() { let mut from_cache = true; let cached = self.cache.get_with(counter.clone(), || { + gauge!("cache_size").increment(1); from_cache = false; if let Some(entry) = self.batcher.updates.get(&counter) { let cached_value = entry.value(); - cached_value.add_from_authority(remote_deltas, expiry_ts); + cached_value.add_from_authority( + remote_deltas, + expiry_ts, + counter.max_value(), + ); cached_value.clone() } else { Arc::new(CachedCounterValue::from_authority(&counter, redis_val)) } }); if from_cache { - cached.add_from_authority(remote_deltas, expiry_ts); + cached.add_from_authority(remote_deltas, expiry_ts, counter.max_value()); } return cached; } @@ -277,6 +291,7 @@ impl CountersCache { pub async fn increase_by(&self, counter: &Counter, delta: u64) { let val = self.cache.get_with_by_ref(counter, || { + gauge!("cache_size").increment(1); if let Some(entry) = self.batcher.updates.get(counter) { entry.value().clone() } else { @@ -304,9 +319,23 @@ impl CountersCacheBuilder { self } + fn eviction_listener( + _key: Arc, + value: Arc, + _removal_cause: RemovalCause, + ) { + gauge!("cache_size").decrement(1); + if value.no_pending_writes().not() { + counter!("evicted_pending_writes").increment(1); + } + } + pub fn build(&self, period: Duration) -> CountersCache { CountersCache { - cache: Cache::new(self.max_cached_counters as u64), + cache: Cache::builder() + .max_capacity(self.max_cached_counters as u64) + .eviction_listener(Self::eviction_listener) + .build(), batcher: Batcher::new(period, self.max_cached_counters), } } @@ -363,7 +392,11 @@ mod tests { let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, 5); assert!(value.no_pending_writes().not()); - value.add_from_authority(6, SystemTime::now().add(Duration::from_secs(1))); + value.add_from_authority( + 6, + SystemTime::now().add(Duration::from_secs(1)), + counter.max_value(), + ); assert!(value.no_pending_writes().not()); assert_eq!(value.pending_writes(), Ok(5)); } diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 3a353b72..d77a16bd 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -9,10 +9,11 @@ use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; +use metrics::histogram; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, Instant}; use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we @@ -209,11 +210,14 @@ impl AsyncRedisStorage { } pub async fn is_alive(&self) -> bool { + let now = Instant::now(); self.conn_manager .clone() .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) .await .is_ok() + .then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64())) + .is_some() } async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 38f5c831..992f2a81 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -307,11 +307,10 @@ async fn update_counters( } } - let span = debug_span!("datastore"); // The redis crate is not working with tables, thus the response will be a Vec of counter values let script_res: Vec = script_invocation .invoke_async(redis_conn) - .instrument(span) + .instrument(debug_span!("datastore")) .await?; // We need to update the values and ttls returned by redis @@ -331,7 +330,7 @@ async fn update_counters( Ok(res) } - +#[tracing::instrument(skip_all)] async fn flush_batcher_and_update_counters( mut redis_conn: C, storage_is_alive: bool,