diff --git a/Cargo.lock b/Cargo.lock index 30a471da..26330f77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -821,46 +821,37 @@ checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" [[package]] name = "crossbeam-channel" -version = "0.5.8" +version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" dependencies = [ - "cfg-if", "crossbeam-utils", ] [[package]] name = "crossbeam-deque" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" dependencies = [ - "cfg-if", "crossbeam-epoch", "crossbeam-utils", ] [[package]] name = "crossbeam-epoch" -version = "0.9.15" +version = "0.9.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" dependencies = [ - "autocfg", - "cfg-if", "crossbeam-utils", - "memoffset", - "scopeguard", ] [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crypto-common" @@ -1567,7 +1558,6 @@ dependencies = [ "moka", "paste", "postcard", - "prometheus", "r2d2", "rand", "redis", @@ -1602,6 +1592,7 @@ dependencies = [ "opentelemetry-stdout", "opentelemetry_sdk", "paperclip", + "prometheus", "prost 0.12.1", "prost-types", "serde", @@ -1704,15 +1695,6 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" -[[package]] -name = "memoffset" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" -dependencies = [ - "autocfg", -] - [[package]] name = "mime" version = "0.3.17" diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index 07907a04..b0cbaf0c 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -46,6 +46,7 @@ lazy_static = "1.4.0" clap = "4.3" sysinfo = "0.29.7" openssl = { version = "0.10.57", features = ["vendored"] } +prometheus = "0.13.3" [build-dependencies] diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index d5b1747f..f9a57139 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -19,7 +19,7 @@ use crate::envoy_rls::server::envoy::service::ratelimit::v3::rate_limit_service_ use crate::envoy_rls::server::envoy::service::ratelimit::v3::{ RateLimitRequest, RateLimitResponse, }; -use crate::Limiter; +use crate::{Limiter, PROMETHEUS_METRICS}; include!("envoy_types.rs"); @@ -124,8 +124,11 @@ impl RateLimitService for MyRateLimiter { let mut rate_limited_resp = rate_limited_resp.unwrap(); let resp_code = if rate_limited_resp.limited { + PROMETHEUS_METRICS + .incr_limited_calls(&namespace, rate_limited_resp.limit_name.as_deref()); Code::OverLimit } else { + PROMETHEUS_METRICS.incr_authorized_calls(&namespace); Code::Ok }; diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index b32001ea..f86863dc 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -1,5 +1,5 @@ use crate::http_api::request_types::{CheckAndReportInfo, Counter, Limit}; -use crate::Limiter; +use crate::{Limiter, PROMETHEUS_METRICS}; use actix_web::{http::StatusCode, ResponseError}; use actix_web::{App, HttpServer}; use paperclip::actix::{ @@ -44,13 +44,10 @@ async fn status() -> web::Json<()> { Json(()) } -#[tracing::instrument(skip(data))] +#[tracing::instrument(skip(_data))] #[api_v2_operation] -async fn metrics(data: web::Data>) -> String { - match data.get_ref().as_ref() { - Limiter::Blocking(limiter) => limiter.gather_prometheus_metrics(), - Limiter::Async(limiter) => limiter.gather_prometheus_metrics(), - } +async fn metrics(_data: web::Data>) -> String { + PROMETHEUS_METRICS.gather_metrics() } #[api_v2_operation] @@ -170,8 +167,11 @@ async fn check_and_report( match rate_limited_and_update_result { Ok(is_rate_limited) => { if is_rate_limited.limited { + PROMETHEUS_METRICS + .incr_limited_calls(&namespace, is_rate_limited.limit_name.as_deref()); Err(ErrorResponse::TooManyRequests) } else { + PROMETHEUS_METRICS.incr_authorized_calls(&namespace); Ok(Json(())) } } diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 8aff7275..5a8004f7 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -13,8 +13,10 @@ use crate::config::{ }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; +use crate::metrics::MetricsLayer; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; +use lazy_static::lazy_static; use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; @@ -36,6 +38,7 @@ use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::{trace, Resource}; +use prometheus_metrics::PrometheusMetrics; use std::env::VarError; use std::fmt::Display; use std::fs; @@ -55,6 +58,8 @@ mod envoy_rls; mod http_api; mod config; +mod metrics; +pub mod prometheus_metrics; const LIMITADOR_VERSION: &str = env!("CARGO_PKG_VERSION"); const LIMITADOR_PROFILE: &str = env!("LIMITADOR_PROFILE"); @@ -71,6 +76,10 @@ pub enum LimitadorServerError { Internal(LimitadorError), } +lazy_static! { + pub static ref PROMETHEUS_METRICS: PrometheusMetrics = PrometheusMetrics::default(); +} + pub enum Limiter { Blocking(RateLimiter), Async(AsyncRateLimiter), @@ -85,29 +94,19 @@ impl From for LimitadorServerError { impl Limiter { pub async fn new(config: Configuration) -> Result { let rate_limiter = match config.storage { - StorageConfiguration::Redis(cfg) => { - Self::redis_limiter(cfg, config.limit_name_in_labels).await - } + StorageConfiguration::Redis(cfg) => Self::redis_limiter(cfg).await, #[cfg(feature = "infinispan")] - StorageConfiguration::Infinispan(cfg) => { - Self::infinispan_limiter(cfg, config.limit_name_in_labels).await - } - StorageConfiguration::InMemory(cfg) => { - Self::in_memory_limiter(cfg, config.limit_name_in_labels) - } - StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg, config.limit_name_in_labels), + StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await, + StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg), + StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg), }; Ok(rate_limiter) } - async fn redis_limiter(cfg: RedisStorageConfiguration, limit_name_labels: bool) -> Self { + async fn redis_limiter(cfg: RedisStorageConfiguration) -> Self { let storage = Self::storage_using_redis(cfg).await; - let mut rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); - - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } + let rate_limiter_builder = AsyncRateLimiterBuilder::new(storage); Self::Async(rate_limiter_builder.build()) } @@ -164,10 +163,7 @@ impl Limiter { } #[cfg(feature = "infinispan")] - async fn infinispan_limiter( - cfg: InfinispanStorageConfiguration, - limit_name_labels: bool, - ) -> Self { + async fn infinispan_limiter(cfg: InfinispanStorageConfiguration) -> Self { use url::Url; let parsed_url = Url::parse(&cfg.url).unwrap(); @@ -203,17 +199,13 @@ impl Limiter { None => builder.build().await, }; - let mut rate_limiter_builder = + let rate_limiter_builder = AsyncRateLimiterBuilder::new(AsyncStorage::with_counter_storage(Box::new(storage))); - if limit_name_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Async(rate_limiter_builder.build()) } - fn disk_limiter(cfg: DiskStorageConfiguration, limit_name_in_labels: bool) -> Self { + fn disk_limiter(cfg: DiskStorageConfiguration) -> Self { let storage = match DiskStorage::open(cfg.path.as_str(), cfg.optimization) { Ok(storage) => storage, Err(err) => { @@ -221,24 +213,16 @@ impl Limiter { process::exit(1) } }; - let mut rate_limiter_builder = + let rate_limiter_builder = RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } - fn in_memory_limiter(cfg: InMemoryStorageConfiguration, limit_name_in_labels: bool) -> Self { - let mut rate_limiter_builder = + fn in_memory_limiter(cfg: InMemoryStorageConfiguration) -> Self { + let rate_limiter_builder = RateLimiterBuilder::new(cfg.cache_size.or_else(guess_cache_size).unwrap()); - if limit_name_in_labels { - rate_limiter_builder = rate_limiter_builder.with_prometheus_limit_name_labels() - } - Self::Blocking(rate_limiter_builder.build()) } @@ -304,6 +288,12 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::layer() }; + let metrics_layer = MetricsLayer::new().gather( + "should_rate_limit", + |timings| PROMETHEUS_METRICS.counter_access(Duration::from(timings)), + vec!["datastore"], + ); + if !config.tracing_endpoint.is_empty() { global::set_text_map_propagator(TraceContextPropagator::new()); let tracer = opentelemetry_otlp::new_pipeline() @@ -320,16 +310,20 @@ async fn main() -> Result<(), Box> { let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); tracing_subscriber::registry() .with(level) + .with(metrics_layer) .with(fmt_layer) .with(telemetry_layer) .init(); } else { tracing_subscriber::registry() .with(level) + .with(metrics_layer) .with(fmt_layer) .init(); }; + PROMETHEUS_METRICS.set_use_limit_name_in_label(config.limit_name_in_labels); + info!("Version: {}", version); info!("Using config: {:?}", config); config diff --git a/limitador-server/src/metrics.rs b/limitador-server/src/metrics.rs new file mode 100644 index 00000000..b19119e6 --- /dev/null +++ b/limitador-server/src/metrics.rs @@ -0,0 +1,292 @@ +use std::collections::HashMap; +use std::ops; +use std::time::{Duration, Instant}; +use tracing::span::{Attributes, Id}; +use tracing::Subscriber; +use tracing_subscriber::layer::{Context, Layer}; +use tracing_subscriber::registry::LookupSpan; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Timings { + idle: u64, + busy: u64, + last: Instant, +} + +impl Timings { + fn new() -> Self { + Self { + idle: 0, + busy: 0, + last: Instant::now(), + } + } +} + +impl ops::Add for Timings { + type Output = Self; + + fn add(self, rhs: Self) -> Self { + Self { + busy: self.busy + rhs.busy, + idle: self.idle + rhs.idle, + last: self.last.max(rhs.last), + } + } +} + +impl ops::AddAssign for Timings { + fn add_assign(&mut self, rhs: Self) { + *self = *self + rhs + } +} + +impl From for Duration { + fn from(timings: Timings) -> Self { + Duration::from_nanos(timings.idle + timings.busy) + } +} + +#[derive(Debug, Clone)] +struct SpanState { + group_times: HashMap, +} + +impl SpanState { + fn new(group: String) -> Self { + Self { + group_times: HashMap::from([(group, Timings::new())]), + } + } + + fn increment(&mut self, group: &String, timings: Timings) -> &mut Self { + self.group_times + .entry(group.to_string()) + .and_modify(|x| *x += timings) + .or_insert(timings); + self + } +} + +pub struct MetricsGroup { + consumer: F, + records: Vec, +} + +impl MetricsGroup { + pub fn new(consumer: F, records: Vec) -> Self { + Self { consumer, records } + } +} + +pub struct MetricsLayer { + groups: HashMap>, +} + +impl MetricsLayer { + pub fn new() -> Self { + Self { + groups: HashMap::new(), + } + } + + pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self { + // TODO(adam-cattermole): does not handle case where aggregate already exists + let rec = records.iter().map(|r| r.to_string()).collect(); + self.groups + .entry(aggregate.to_string()) + .or_insert(MetricsGroup::new(consumer, rec)); + self + } +} + +impl Layer for MetricsLayer +where + S: Subscriber, + S: for<'lookup> LookupSpan<'lookup>, +{ + fn on_new_span(&self, _attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + + // if there's a parent + if let Some(parent) = span.parent() { + // if the parent has SpanState propagate to this span + if let Some(parent_state) = parent.extensions_mut().get_mut::() { + extensions.insert(parent_state.clone()); + } + } + + // if we are an aggregator + if self.groups.contains_key(name) { + if let Some(span_state) = extensions.get_mut::() { + // if the SpanState has come from parent and we must append + // (we are a second level aggregator) + span_state + .group_times + .entry(name.to_string()) + .or_insert(Timings::new()); + } else { + // otherwise create a new SpanState with ourselves + extensions.insert(SpanState::new(name.to_string())) + } + } + + if let Some(span_state) = extensions.get_mut::() { + // either we are an aggregator or nested within one + for group in span_state.group_times.keys() { + for record in &self + .groups + .get(group) + .expect("Span state contains group times for an unconfigured group") + .records + { + if name == record { + extensions.insert(Timings::new()); + return; + } + } + } + // if here we are an intermediate span that should not be recorded + } + } + + fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.idle += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + if let Some(timings) = extensions.get_mut::() { + let now = Instant::now(); + timings.busy += (now - timings.last).as_nanos() as u64; + timings.last = now; + } + } + + fn on_close(&self, id: Id, ctx: Context<'_, S>) { + let span = ctx.span(&id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + let name = span.name(); + let mut t: Option = None; + + if let Some(timing) = extensions.get_mut::() { + let mut time = *timing; + time.idle += (Instant::now() - time.last).as_nanos() as u64; + t = Some(time); + } + + if let Some(span_state) = extensions.get_mut::() { + if let Some(timing) = t { + let group_times = span_state.group_times.clone(); + // iterate over the groups this span belongs to + 'aggregate: for group in group_times.keys() { + // find the set of records related to these groups in the layer + for record in &self.groups.get(group).unwrap().records { + // if we are a record for this group then increment the relevant + // span-local timing and continue to the next group + if name == record { + span_state.increment(group, timing); + continue 'aggregate; + } + } + } + } + // we have updated local span_state + // but we need to bubble back up through parents + // NOTE: this propagates the timings, ready to be cloned by next new span! + if let Some(parent) = span.parent() { + parent.extensions_mut().replace(span_state.clone()); + } + // IF we are aggregator call consume function + if let Some(metrics_group) = self.groups.get(name) { + (metrics_group.consumer)(*span_state.group_times.get(name).unwrap()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::{MetricsLayer, SpanState, Timings}; + use std::time::Instant; + + #[test] + fn timings_add() { + let now = Instant::now(); + let t1 = Timings { + idle: 5, + busy: 5, + last: now, + }; + let t2 = Timings { + idle: 3, + busy: 5, + last: now, + }; + let t3 = t1 + t2; + assert_eq!( + t3, + Timings { + idle: 8, + busy: 10, + last: now + } + ) + } + + #[test] + fn timings_add_assign() { + let now = Instant::now(); + let mut t1 = Timings { + idle: 5, + busy: 5, + last: now, + }; + let t2 = Timings { + idle: 3, + busy: 5, + last: now, + }; + t1 += t2; + assert_eq!( + t1, + Timings { + idle: 8, + busy: 10, + last: now + } + ) + } + + #[test] + fn span_state_increment() { + let group = String::from("group"); + let mut span_state = SpanState::new(group.clone()); + let t1 = Timings { + idle: 5, + busy: 5, + last: Instant::now(), + }; + span_state.increment(&group, t1); + assert_eq!(span_state.group_times.get(&group).unwrap().idle, t1.idle); + assert_eq!(span_state.group_times.get(&group).unwrap().busy, t1.busy); + } + + #[test] + fn metrics_layer() { + let consumer = |_| println!("group/record"); + let ml = MetricsLayer::new().gather("group", consumer, vec!["record"]); + assert_eq!(ml.groups.get("group").unwrap().records, vec!["record"]); + } +} diff --git a/limitador/src/prometheus_metrics.rs b/limitador-server/src/prometheus_metrics.rs similarity index 82% rename from limitador/src/prometheus_metrics.rs rename to limitador-server/src/prometheus_metrics.rs index 4b2e0f51..34f9537d 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador-server/src/prometheus_metrics.rs @@ -1,7 +1,9 @@ -use crate::limit::Namespace; +use lazy_static::lazy_static; +use limitador::limit::Namespace; use prometheus::{ Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder, }; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; const NAMESPACE_LABEL: &str = "limitador_namespace"; @@ -36,7 +38,7 @@ pub struct PrometheusMetrics { authorized_calls: IntCounterVec, limited_calls: IntCounterVec, counter_latency: Histogram, - use_limit_name_label: bool, + use_limit_name_label: AtomicBool, } impl Default for PrometheusMetrics { @@ -58,6 +60,11 @@ impl PrometheusMetrics { Self::new_with_options(true) } + pub fn set_use_limit_name_in_label(&self, use_limit_name_in_label: bool) { + self.use_limit_name_label + .store(use_limit_name_in_label, Ordering::SeqCst) + } + pub fn incr_authorized_calls(&self, namespace: &Namespace) { self.authorized_calls .with_label_values(&[namespace.as_ref()]) @@ -70,7 +77,7 @@ impl PrometheusMetrics { { let mut labels = vec![namespace.as_ref()]; - if self.use_limit_name_label { + if self.use_limit_name_label.load(Ordering::Relaxed) { // If we have configured the metric to accept 2 labels we need to // set values for them. labels.push(limit_name.into().unwrap_or("")); @@ -83,14 +90,6 @@ impl PrometheusMetrics { self.counter_latency.observe(duration.as_secs_f64()); } - #[must_use] - pub fn counter_accesses(&self) -> CounterAccess { - CounterAccess { - metrics: self, - duration: Duration::ZERO, - } - } - pub fn gather_metrics(&self) -> String { let mut buffer = Vec::new(); @@ -132,7 +131,7 @@ impl PrometheusMetrics { authorized_calls: authorized_calls_counter, limited_calls: limited_calls_counter, counter_latency, - use_limit_name_label, + use_limit_name_label: AtomicBool::new(use_limit_name_label), } } @@ -171,25 +170,6 @@ impl PrometheusMetrics { } } -pub struct CounterAccess<'a> { - metrics: &'a PrometheusMetrics, - duration: Duration, -} - -impl CounterAccess<'_> { - pub fn observe(&mut self, duration: Duration) { - self.duration += duration; - } -} - -impl<'a> Drop for CounterAccess<'a> { - fn drop(&mut self) { - if self.duration > Duration::ZERO { - self.metrics.counter_access(self.duration); - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -324,39 +304,6 @@ mod tests { ) } - #[test] - fn collects_latencies() { - let metrics = PrometheusMetrics::new(); - assert_eq!(metrics.counter_latency.get_sample_count(), 0); - { - let _access = metrics.counter_accesses(); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 0); - { - let mut access = metrics.counter_accesses(); - access.observe(Duration::from_millis(12)); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 1); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(12).as_secs_f64() - ); - { - let mut access = metrics.counter_accesses(); - access.observe(Duration::from_millis(5)); - assert_eq!(metrics.counter_latency.get_sample_count(), 1); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(12).as_secs_f64() - ); - } - assert_eq!(metrics.counter_latency.get_sample_count(), 2); - assert_eq!( - metrics.counter_latency.get_sample_sum(), - Duration::from_millis(17).as_secs_f64() - ); - } - fn formatted_counter_with_namespace_and_limit( metric_name: &str, count: i32, diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 0f2336b7..682a9295 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -32,7 +32,6 @@ thiserror = "1" futures = "0.3" async-trait = "0.1" cfg-if = "1" -prometheus = "0.13" lazy_static = "1" tracing = "0.1.40" diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 64730b51..25010829 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -197,38 +197,33 @@ use std::collections::{HashMap, HashSet}; use crate::counter::Counter; use crate::errors::LimitadorError; use crate::limit::{Limit, Namespace}; -use crate::prometheus_metrics::PrometheusMetrics; use crate::storage::in_memory::InMemoryStorage; use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage}; #[macro_use] -extern crate lazy_static; extern crate core; pub mod counter; pub mod errors; pub mod limit; -mod prometheus_metrics; pub mod storage; pub struct RateLimiter { storage: Storage, - prometheus_metrics: PrometheusMetrics, } pub struct AsyncRateLimiter { storage: AsyncStorage, - prometheus_metrics: PrometheusMetrics, } pub struct RateLimiterBuilder { storage: Storage, - prometheus_limit_name_labels_enabled: bool, } pub struct CheckResult { pub limited: bool, pub counters: Vec, + pub limit_name: Option, } impl From for bool { @@ -239,16 +234,12 @@ impl From for bool { impl RateLimiterBuilder { pub fn with_storage(storage: Storage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } + Self { storage } } pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_limit_name_labels_enabled: false, } } @@ -257,53 +248,25 @@ impl RateLimiterBuilder { self } - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self - } - pub fn build(self) -> RateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - RateLimiter { storage: self.storage, - prometheus_metrics, } } } pub struct AsyncRateLimiterBuilder { storage: AsyncStorage, - prometheus_limit_name_labels_enabled: bool, } impl AsyncRateLimiterBuilder { pub fn new(storage: AsyncStorage) -> Self { - Self { - storage, - prometheus_limit_name_labels_enabled: false, - } - } - - pub fn with_prometheus_limit_name_labels(mut self) -> Self { - self.prometheus_limit_name_labels_enabled = true; - self + Self { storage } } pub fn build(self) -> AsyncRateLimiter { - let prometheus_metrics = if self.prometheus_limit_name_labels_enabled { - PrometheusMetrics::new_with_counters_by_limit_name() - } else { - PrometheusMetrics::new() - }; - AsyncRateLimiter { storage: self.storage, - prometheus_metrics, } } } @@ -312,14 +275,12 @@ impl RateLimiter { pub fn new(cache_size: u64) -> Self { Self { storage: Storage::new(cache_size), - prometheus_metrics: PrometheusMetrics::new(), } } pub fn new_with_storage(counters: Box) -> Self { Self { storage: Storage::with_counter_storage(counters), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -357,8 +318,6 @@ impl RateLimiter { match self.storage.is_within_limits(&counter, delta) { Ok(within_limits) => { if !within_limits { - self.prometheus_metrics - .incr_limited_calls(namespace, counter.limit().name()); return Ok(true); } } @@ -366,7 +325,6 @@ impl RateLimiter { } } - self.prometheus_metrics.incr_authorized_calls(namespace); Ok(false) } @@ -394,10 +352,10 @@ impl RateLimiter { let mut counters = self.counters_that_apply(namespace, values)?; if counters.is_empty() { - self.prometheus_metrics.incr_authorized_calls(namespace); return Ok(CheckResult { limited: false, counters, + limit_name: None, }); } @@ -412,21 +370,16 @@ impl RateLimiter { }; match check_result { - Authorization::Ok => { - self.prometheus_metrics.incr_authorized_calls(namespace); - Ok(CheckResult { - limited: false, - counters, - }) - } - Authorization::Limited(name) => { - self.prometheus_metrics - .incr_limited_calls(namespace, name.as_deref()); - Ok(CheckResult { - limited: true, - counters, - }) - } + Authorization::Ok => Ok(CheckResult { + limited: false, + counters, + limit_name: None, + }), + Authorization::Limited(name) => Ok(CheckResult { + limited: true, + counters, + limit_name: name, + }), } } @@ -474,10 +427,6 @@ impl RateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - fn counters_that_apply( &self, namespace: &Namespace, @@ -504,7 +453,6 @@ impl AsyncRateLimiter { pub fn new_with_storage(storage: Box) -> Self { Self { storage: AsyncStorage::with_counter_storage(storage), - prometheus_metrics: PrometheusMetrics::new(), } } @@ -542,16 +490,12 @@ impl AsyncRateLimiter { match self.storage.is_within_limits(&counter, delta).await { Ok(within_limits) => { if !within_limits { - self.prometheus_metrics - .incr_limited_calls(namespace, counter.limit().name()); return Ok(true); } } Err(e) => return Err(e.into()), } } - - self.prometheus_metrics.incr_authorized_calls(namespace); Ok(false) } @@ -581,17 +525,16 @@ impl AsyncRateLimiter { let mut counters = self.counters_that_apply(namespace, values).await?; if counters.is_empty() { - self.prometheus_metrics.incr_authorized_calls(namespace); return Ok(CheckResult { limited: false, counters, + limit_name: None, }); } - let access = self.prometheus_metrics.counter_accesses(); let check_result = self .storage - .check_and_update(&mut counters, delta, load_counters, access) + .check_and_update(&mut counters, delta, load_counters) .await?; let counters = if load_counters { @@ -601,22 +544,16 @@ impl AsyncRateLimiter { }; match check_result { - Authorization::Ok => { - self.prometheus_metrics.incr_authorized_calls(namespace); - - Ok(CheckResult { - limited: false, - counters, - }) - } - Authorization::Limited(name) => { - self.prometheus_metrics - .incr_limited_calls(namespace, name.as_deref()); - Ok(CheckResult { - limited: true, - counters, - }) - } + Authorization::Ok => Ok(CheckResult { + limited: false, + counters, + limit_name: None, + }), + Authorization::Limited(name) => Ok(CheckResult { + limited: true, + counters, + limit_name: name, + }), } } @@ -668,10 +605,6 @@ impl AsyncRateLimiter { Ok(()) } - pub fn gather_prometheus_metrics(&self) -> String { - self.prometheus_metrics.gather_metrics() - } - async fn counters_that_apply( &self, namespace: &Namespace, diff --git a/limitador/src/storage/infinispan/infinispan_storage.rs b/limitador/src/storage/infinispan/infinispan_storage.rs index af790634..7e521d8a 100644 --- a/limitador/src/storage/infinispan/infinispan_storage.rs +++ b/limitador/src/storage/infinispan/infinispan_storage.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::infinispan::counters::{Consistency, CounterOpts}; use crate::storage::infinispan::response::response_to_string; use crate::storage::infinispan::{ @@ -74,7 +73,6 @@ impl AsyncCounterStorage for InfinispanStorage { counters: &mut Vec, delta: i64, load_counters: bool, - _counter_access: CounterAccess<'a>, ) -> Result { let mut counter_keys = Vec::with_capacity(counters.len()); diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index c6cb1c92..72e0346b 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; -use crate::prometheus_metrics::CounterAccess; use crate::InMemoryStorage; use async_trait::async_trait; use std::collections::{HashMap, HashSet}; @@ -243,10 +242,9 @@ impl AsyncStorage { counters: &mut Vec, delta: i64, load_counters: bool, - counter_access: CounterAccess<'a>, ) -> Result { self.counters - .check_and_update(counters, delta, load_counters, counter_access) + .check_and_update(counters, delta, load_counters) .await } @@ -288,7 +286,6 @@ pub trait AsyncCounterStorage: Sync + Send { counters: &mut Vec, delta: i64, load_counters: bool, - counter_access: CounterAccess<'a>, ) -> Result; async fn get_counters(&self, limits: HashSet) -> Result, StorageErr>; async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr>; diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 23782254..d180b11b 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,7 +4,6 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; @@ -13,8 +12,8 @@ use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::{Duration, Instant}; -use tracing::{trace_span, Instrument}; +use std::time::Duration; +use tracing::{debug_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -36,13 +35,10 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); - match async move { - con.get::>(key_for_counter(counter)) - .await - } - .instrument(span) - .await? + match con + .get::>(key_for_counter(counter)) + .instrument(debug_span!("datastore")) + .await? { Some(val) => Ok(val + delta <= counter.max_value()), None => Ok(counter.max_value() - delta >= 0), @@ -53,18 +49,14 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); - async { - redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key_for_counter(counter)) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await - } - .instrument(span) - .await?; + redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key_for_counter(counter)) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .instrument(debug_span!("datastore")) + .await?; Ok(()) } @@ -75,7 +67,6 @@ impl AsyncCounterStorage for AsyncRedisStorage { counters: &mut Vec, delta: i64, load_counters: bool, - mut counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.conn_manager.clone(); let counter_keys: Vec = counters.iter().map(key_for_counter).collect(); @@ -89,33 +80,21 @@ impl AsyncCounterStorage for AsyncRedisStorage { } let script_res: Vec> = { - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = script_invocation.invoke_async(&mut con).await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + script_invocation + .invoke_async(&mut con) + .instrument(debug_span!("datastore")) + .await? }; if let Some(res) = is_limited(counters, delta, script_res) { return Ok(res); } } else { let counter_vals: Vec> = { - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = redis::cmd("MGET") - .arg(counter_keys.clone()) - .query_async(&mut con) - .await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + redis::cmd("MGET") + .arg(counter_keys.clone()) + .query_async(&mut con) + .instrument(debug_span!("datastore")) + .await? }; for (i, counter) in counters.iter().enumerate() { @@ -132,21 +111,14 @@ impl AsyncCounterStorage for AsyncRedisStorage { // TODO: this can be optimized by using pipelines with multiple updates for (counter_idx, key) in counter_keys.into_iter().enumerate() { let counter = &counters[counter_idx]; - let span = trace_span!("datastore"); - async { - let start = Instant::now(); - let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await; - counter_access.observe(start.elapsed()); - result - } - .instrument(span) - .await? + redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .instrument(debug_span!("datastore")) + .await? } Ok(Authorization::Ok) @@ -160,13 +132,9 @@ impl AsyncCounterStorage for AsyncRedisStorage { for limit in limits { let counter_keys = { - let span = trace_span!("datastore"); - async { - con.smembers::>(key_for_counters_of_limit(&limit)) - .await - } - .instrument(span) - .await? + con.smembers::>(key_for_counters_of_limit(&limit)) + .instrument(debug_span!("datastore")) + .await? }; for counter_key in counter_keys { @@ -180,17 +148,15 @@ impl AsyncCounterStorage for AsyncRedisStorage { // This does not cause any bugs, but consumes memory // unnecessarily. let option = { - let span = trace_span!("datastore"); - async { con.get::>(counter_key.clone()).await } - .instrument(span) + con.get::>(counter_key.clone()) + .instrument(debug_span!("datastore")) .await? }; if let Some(val) = option { counter.set_remaining(limit.max_value() - val); let ttl = { - let span = trace_span!("datastore"); - async { con.ttl(&counter_key).await } - .instrument(span) + con.ttl(&counter_key) + .instrument(debug_span!("datastore")) .await? }; counter.set_expires_in(Duration::from_secs(ttl)); @@ -206,9 +172,8 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { - let span = trace_span!("datastore"); - async { self.delete_counters_associated_with_limit(&limit).await } - .instrument(span) + self.delete_counters_associated_with_limit(&limit) + .instrument(debug_span!("datastore")) .await? } Ok(()) @@ -217,9 +182,9 @@ impl AsyncCounterStorage for AsyncRedisStorage { #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - let span = trace_span!("datastore"); - async { redis::cmd("FLUSHDB").query_async(&mut con).await } - .instrument(span) + redis::cmd("FLUSHDB") + .query_async(&mut con) + .instrument(debug_span!("datastore")) .await?; Ok(()) } @@ -245,19 +210,14 @@ impl AsyncRedisStorage { let mut con = self.conn_manager.clone(); let counter_keys = { - let span = trace_span!("datastore"); - async { - con.smembers::>(key_for_counters_of_limit(limit)) - .await - } - .instrument(span) - .await? + con.smembers::>(key_for_counters_of_limit(limit)) + .instrument(debug_span!("datastore")) + .await? }; for counter_key in counter_keys { - let span = trace_span!("datastore"); - async { con.del(counter_key).await } - .instrument(span) + con.del(counter_key) + .instrument(debug_span!("datastore")) .await?; } diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a3e329c3..f29ccab4 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,6 +1,5 @@ use crate::counter::Counter; use crate::limit::Limit; -use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; use crate::storage::redis::redis_async::AsyncRedisStorage; @@ -70,7 +69,6 @@ impl AsyncCounterStorage for CachedRedisStorage { counters: &mut Vec, delta: i64, load_counters: bool, - _counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.redis_conn_manager.clone();