diff --git a/Cargo.lock b/Cargo.lock index 04221cb5..1b22a333 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -911,19 +911,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "env_logger" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" -dependencies = [ - "humantime", - "is-terminal", - "log", - "regex", - "termcolor", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -1261,12 +1248,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" version = "0.14.27" @@ -1549,6 +1530,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tracing", "ttl_cache", ] @@ -1560,7 +1542,6 @@ dependencies = [ "actix-web", "clap", "const_format", - "env_logger", "lazy_static", "limitador", "log", @@ -1577,6 +1558,9 @@ dependencies = [ "tonic", "tonic-build", "tonic-reflection", + "tracing", + "tracing-log", + "tracing-subscriber", "url", ] @@ -1644,6 +1628,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1792,6 +1785,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -1886,6 +1889,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "paperclip" version = "0.8.2" @@ -2376,8 +2385,17 @@ checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2388,9 +2406,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -2676,6 +2700,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.2.0" @@ -2864,15 +2897,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "termcolor" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "1.0.50" @@ -2893,6 +2917,16 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.30" @@ -3148,6 +3182,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3251,6 +3315,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/limitador-server/Cargo.toml b/limitador-server/Cargo.toml index cc96bd00..8c97e5ad 100644 --- a/limitador-server/Cargo.toml +++ b/limitador-server/Cargo.toml @@ -27,7 +27,9 @@ prost = "0.12" prost-types = "0.12" serde_yaml = "0.9" log = "0.4" -env_logger = "0.10.0" +tracing = "0.1.40" +tracing-log = "0.2.0" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2" actix-web = "4.1" actix-rt = "2" diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index 73ee74ed..e6a1481d 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -20,7 +20,7 @@ use crate::envoy_rls::server::RateLimitHeaders; use limitador::storage; -use log::LevelFilter; +use tracing::level_filters::LevelFilter; #[derive(Debug)] pub struct Configuration { diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index de6f8925..68394583 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -40,6 +40,7 @@ impl MyRateLimiter { #[tonic::async_trait] impl RateLimitService for MyRateLimiter { + #[tracing::instrument(skip_all)] async fn should_rate_limit( &self, request: Request, diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index b528d5ab..b32001ea 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -44,6 +44,7 @@ async fn status() -> web::Json<()> { Json(()) } +#[tracing::instrument(skip(data))] #[api_v2_operation] async fn metrics(data: web::Data>) -> String { match data.get_ref().as_ref() { @@ -53,6 +54,7 @@ async fn metrics(data: web::Data>) -> String { } #[api_v2_operation] +#[tracing::instrument(skip(data))] async fn get_limits( data: web::Data>, namespace: web::Path, @@ -66,6 +68,7 @@ async fn get_limits( Ok(Json(resp_limits)) } +#[tracing::instrument(skip(data))] #[api_v2_operation] async fn get_counters( data: web::Data>, @@ -89,6 +92,7 @@ async fn get_counters( } } +#[tracing::instrument(skip(state))] #[api_v2_operation] async fn check( state: web::Data>, @@ -117,6 +121,7 @@ async fn check( } } +#[tracing::instrument(skip(data))] #[api_v2_operation] async fn report( data: web::Data>, @@ -139,6 +144,7 @@ async fn report( } } +#[tracing::instrument(skip(data))] #[api_v2_operation] async fn check_and_report( data: web::Data>, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 079e45fc..17205a73 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -14,7 +14,6 @@ use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; use clap::{value_parser, Arg, ArgAction, Command}; use const_format::formatcp; -use env_logger::Builder; use limitador::counter::Counter; use limitador::errors::LimitadorError; use limitador::limit::Limit; @@ -30,7 +29,6 @@ use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; use limitador::{ storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder, }; -use log::LevelFilter; use notify::event::{ModifyKind, RenameMode}; use notify::{Error, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use std::env::VarError; @@ -42,6 +40,8 @@ use std::{env, process}; use sysinfo::{RefreshKind, System, SystemExt}; use thiserror::Error; use tokio::runtime::Handle; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::fmt::format::FmtSpan; mod envoy_rls; mod http_api; @@ -285,13 +285,17 @@ async fn main() -> Result<(), Box> { let config = { let (config, version) = create_config(); println!("{LIMITADOR_HEADER} {version}"); - let mut builder = Builder::new(); - if let Some(level) = config.log_level { - builder.filter(None, level); + let level = config.log_level.unwrap_or_else(|| { + tracing_subscriber::filter::EnvFilter::from_default_env() + .max_level_hint() + .unwrap_or(LevelFilter::ERROR) + }); + let builder = if level >= LevelFilter::DEBUG { + tracing_subscriber::fmt().with_span_events(FmtSpan::CLOSE) } else { - builder.parse_default_env(); - } - builder.init(); + tracing_subscriber::fmt() + }; + builder.with_max_level(level).init(); info!("Version: {}", version); info!("Using config: {:?}", config); @@ -759,10 +763,10 @@ fn create_config() -> (Configuration, &'static str) { config.log_level = match matches.get_count("v") { 0 => None, - 1 => Some(LevelFilter::Warn), - 2 => Some(LevelFilter::Info), - 3 => Some(LevelFilter::Debug), - 4 => Some(LevelFilter::Trace), + 1 => Some(LevelFilter::WARN), + 2 => Some(LevelFilter::INFO), + 3 => Some(LevelFilter::DEBUG), + 4 => Some(LevelFilter::TRACE), _ => unreachable!("Verbosity should at most be 4!"), }; diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 6c15a85a..0f2336b7 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -34,6 +34,7 @@ async-trait = "0.1" cfg-if = "1" prometheus = "0.13" lazy_static = "1" +tracing = "0.1.40" # Optional dependencies rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] } diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index 177ae1d0..64730b51 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -588,9 +588,10 @@ impl AsyncRateLimiter { }); } + let access = self.prometheus_metrics.counter_accesses(); let check_result = self .storage - .check_and_update(&mut counters, delta, load_counters) + .check_and_update(&mut counters, delta, load_counters, access) .await?; let counters = if load_counters { diff --git a/limitador/src/prometheus_metrics.rs b/limitador/src/prometheus_metrics.rs index 9704a7f3..4b2e0f51 100644 --- a/limitador/src/prometheus_metrics.rs +++ b/limitador/src/prometheus_metrics.rs @@ -1,5 +1,8 @@ use crate::limit::Namespace; -use prometheus::{Encoder, IntCounterVec, IntGauge, Opts, Registry, TextEncoder}; +use prometheus::{ + Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, Opts, Registry, TextEncoder, +}; +use std::time::Duration; const NAMESPACE_LABEL: &str = "limitador_namespace"; const LIMIT_NAME_LABEL: &str = "limit_name"; @@ -22,15 +25,26 @@ lazy_static! { name: "limitador_up".into(), description: "Limitador is running".into(), }; + static ref DATASTORE_LATENCY: Metric = Metric { + name: "counter_latency".into(), + description: "Latency to the underlying counter datastore".into(), + }; } pub struct PrometheusMetrics { registry: Registry, authorized_calls: IntCounterVec, limited_calls: IntCounterVec, + counter_latency: Histogram, use_limit_name_label: bool, } +impl Default for PrometheusMetrics { + fn default() -> Self { + Self::new() + } +} + impl PrometheusMetrics { pub fn new() -> Self { Self::new_with_options(false) @@ -65,6 +79,18 @@ impl PrometheusMetrics { self.limited_calls.with_label_values(&labels).inc(); } + pub fn counter_access(&self, duration: Duration) { + self.counter_latency.observe(duration.as_secs_f64()); + } + + #[must_use] + pub fn counter_accesses(&self) -> CounterAccess { + CounterAccess { + metrics: self, + duration: Duration::ZERO, + } + } + pub fn gather_metrics(&self) -> String { let mut buffer = Vec::new(); @@ -79,6 +105,7 @@ impl PrometheusMetrics { let authorized_calls_counter = Self::authorized_calls_counter(); let limited_calls_counter = Self::limited_calls_counter(use_limit_name_label); let limitador_up_gauge = Self::limitador_up_gauge(); + let counter_latency = Self::counter_latency(); let registry = Registry::new(); @@ -94,12 +121,17 @@ impl PrometheusMetrics { .register(Box::new(limitador_up_gauge.clone())) .unwrap(); + registry + .register(Box::new(counter_latency.clone())) + .unwrap(); + limitador_up_gauge.set(1); Self { registry, authorized_calls: authorized_calls_counter, limited_calls: limited_calls_counter, + counter_latency, use_limit_name_label, } } @@ -129,6 +161,33 @@ impl PrometheusMetrics { fn limitador_up_gauge() -> IntGauge { IntGauge::new(&LIMITADOR_UP.name, &LIMITADOR_UP.description).unwrap() } + + fn counter_latency() -> Histogram { + Histogram::with_opts(HistogramOpts::new( + &DATASTORE_LATENCY.name, + &DATASTORE_LATENCY.description, + )) + .unwrap() + } +} + +pub struct CounterAccess<'a> { + metrics: &'a PrometheusMetrics, + duration: Duration, +} + +impl CounterAccess<'_> { + pub fn observe(&mut self, duration: Duration) { + self.duration += duration; + } +} + +impl<'a> Drop for CounterAccess<'a> { + fn drop(&mut self) { + if self.duration > Duration::ZERO { + self.metrics.counter_access(self.duration); + } + } } #[cfg(test)] @@ -265,6 +324,39 @@ mod tests { ) } + #[test] + fn collects_latencies() { + let metrics = PrometheusMetrics::new(); + assert_eq!(metrics.counter_latency.get_sample_count(), 0); + { + let _access = metrics.counter_accesses(); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 0); + { + let mut access = metrics.counter_accesses(); + access.observe(Duration::from_millis(12)); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 1); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(12).as_secs_f64() + ); + { + let mut access = metrics.counter_accesses(); + access.observe(Duration::from_millis(5)); + assert_eq!(metrics.counter_latency.get_sample_count(), 1); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(12).as_secs_f64() + ); + } + assert_eq!(metrics.counter_latency.get_sample_count(), 2); + assert_eq!( + metrics.counter_latency.get_sample_sum(), + Duration::from_millis(17).as_secs_f64() + ); + } + fn formatted_counter_with_namespace_and_limit( metric_name: &str, count: i32, diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index f99ea49d..a57a610d 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -12,28 +12,33 @@ use rocksdb::{ }; use std::collections::{BTreeSet, HashSet}; use std::time::{Duration, SystemTime}; +use tracing::trace_span; pub struct RocksDbStorage { db: DBWithThreadMode, } impl CounterStorage for RocksDbStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let key = key_for_counter(counter); let value = self.insert_or_update(&key, counter, 0)?; Ok(counter.max_value() >= value.value() + delta) } + #[tracing::instrument(skip_all)] fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let key = key_for_counter(counter); self.insert_or_update(&key, counter, delta)?; Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -45,7 +50,12 @@ impl CounterStorage for RocksDbStorage { for counter in &mut *counters { let key = key_for_counter(counter); let slice: &[u8] = key.as_ref(); - let (val, ttl) = match self.db.get(slice)? { + let entry = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + self.db.get(slice)? + }; + let (val, ttl) = match entry { None => (0, Duration::from_secs(counter.limit().seconds())), Some(raw) => { let slice: &[u8] = raw.as_ref(); @@ -75,44 +85,66 @@ impl CounterStorage for RocksDbStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut counters = HashSet::default(); let namepaces: BTreeSet<&str> = limits.iter().map(|l| l.namespace().as_ref()).collect(); for ns in namepaces { - for entry in self.db.prefix_iterator(prefix_for_namespace(ns)) { - let (key, value) = entry?; - let mut counter = partial_counter_from_counter_key(key.as_ref()); - if counter.namespace().as_ref() != ns { - break; - } - let value: ExpiringValue = value.as_ref().try_into()?; - for limit in limits { - if limit == counter.limit() { - counter.update_to_limit(limit); - let ttl = value.ttl(); - counter.set_expires_in(ttl); - counter.set_remaining(limit.max_value() - value.value()); - break; + let mut iterator = self.db.prefix_iterator(prefix_for_namespace(ns)); + loop { + let option = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + iterator.next() + }; + let next = option; + match next { + None => break, + Some(entry) => { + let (key, value) = entry?; + let mut counter = partial_counter_from_counter_key(key.as_ref()); + if counter.namespace().as_ref() != ns { + break; + } + let value: ExpiringValue = value.as_ref().try_into()?; + for limit in limits { + if limit == counter.limit() { + counter.update_to_limit(limit); + let ttl = value.ttl(); + counter.set_expires_in(ttl); + counter.set_remaining(limit.max_value() - value.value()); + break; + } + } + if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO + { + counters.insert(counter); + } } } - if counter.expires_in().expect("Duration needs to be set") > Duration::ZERO { - counters.insert(counter); - } } } Ok(counters) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { let counters = self.get_counters(&limits)?; for counter in &counters { + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db.delete(key_for_counter(counter))?; } Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { + let span = trace_span!("datastore"); + let _entered = span.enter(); for entry in self.db.iterator(IteratorMode::Start) { + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db.delete(entry?.0)? } Ok(()) @@ -163,7 +195,12 @@ impl RocksDbStorage { delta: i64, ) -> Result { let now = SystemTime::now(); - let value = match self.db.get(key)? { + let entry = { + let span = trace_span!("datastore"); + let _entered = span.enter(); + self.db.get(key)? + }; + let value = match entry { None => ExpiringValue::default(), Some(raw) => { let slice: &[u8] = raw.as_ref(); @@ -173,6 +210,8 @@ impl RocksDbStorage { if value.value_at(now) + delta <= counter.max_value() { let expiring_value = ExpiringValue::new(delta, now + Duration::from_secs(counter.limit().seconds())); + let span = trace_span!("datastore"); + let _entered = span.enter(); self.db .merge(key, >>::into(expiring_value))?; return Ok(value.update(delta, counter.seconds(), now)); diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index a1b21dc5..35f2a681 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -17,6 +17,7 @@ pub struct InMemoryStorage { } impl CounterStorage for InMemoryStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); @@ -35,6 +36,7 @@ impl CounterStorage for InMemoryStorage { Ok(counter.max_value() >= value + delta) } + #[tracing::instrument(skip_all)] fn add_counter(&self, limit: &Limit) -> Result<(), StorageErr> { if limit.variables().is_empty() { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); @@ -47,6 +49,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut limits_by_namespace = self.limits_for_namespace.write().unwrap(); let now = SystemTime::now(); @@ -90,6 +93,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -175,6 +179,7 @@ impl CounterStorage for InMemoryStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -215,6 +220,7 @@ impl CounterStorage for InMemoryStorage { Ok(res) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { self.delete_counters_of_limit(&limit); @@ -222,6 +228,7 @@ impl CounterStorage for InMemoryStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { self.limits_for_namespace.write().unwrap().clear(); Ok(()) diff --git a/limitador/src/storage/infinispan/infinispan_storage.rs b/limitador/src/storage/infinispan/infinispan_storage.rs index f3829658..af790634 100644 --- a/limitador/src/storage/infinispan/infinispan_storage.rs +++ b/limitador/src/storage/infinispan/infinispan_storage.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::infinispan::counters::{Consistency, CounterOpts}; use crate::storage::infinispan::response::response_to_string; use crate::storage::infinispan::{ @@ -30,6 +31,7 @@ pub struct InfinispanStorageBuilder { #[async_trait] impl AsyncCounterStorage for InfinispanStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let counter_key = key_for_counter(counter); let counter_val = @@ -41,6 +43,7 @@ impl AsyncCounterStorage for InfinispanStorage { } } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let counter_key = key_for_counter(counter); @@ -65,11 +68,13 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(()) } - async fn check_and_update( + #[tracing::instrument(skip_all)] + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + _counter_access: CounterAccess<'a>, ) -> Result { let mut counter_keys = Vec::with_capacity(counters.len()); @@ -130,6 +135,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -158,6 +164,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(res) } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { self.delete_counters_associated_with_limit(&limit).await?; @@ -165,6 +172,7 @@ impl AsyncCounterStorage for InfinispanStorage { Ok(()) } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let _ = self .infinispan diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index fec8b013..c6cb1c92 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::{Limit, Namespace}; +use crate::prometheus_metrics::CounterAccess; use crate::InMemoryStorage; use async_trait::async_trait; use std::collections::{HashMap, HashSet}; @@ -237,14 +238,15 @@ impl AsyncStorage { self.counters.update_counter(counter, delta).await } - pub async fn check_and_update( + pub async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + counter_access: CounterAccess<'a>, ) -> Result { self.counters - .check_and_update(counters, delta, load_counters) + .check_and_update(counters, delta, load_counters, counter_access) .await } @@ -281,11 +283,12 @@ pub trait CounterStorage: Sync + Send { pub trait AsyncCounterStorage: Sync + Send { async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result; async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr>; - async fn check_and_update( + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + counter_access: CounterAccess<'a>, ) -> Result; async fn get_counters(&self, limits: HashSet) -> Result, StorageErr>; async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr>; diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 65659390..bb94e933 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -4,6 +4,7 @@ use self::redis::aio::ConnectionManager; use self::redis::ConnectionInfo; use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; @@ -12,7 +13,8 @@ use async_trait::async_trait; use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, Instant}; +use tracing::{trace_span, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -30,38 +32,51 @@ pub struct AsyncRedisStorage { #[async_trait] impl AsyncCounterStorage for AsyncRedisStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_manager.clone(); - match con - .get::>(key_for_counter(counter)) - .await? + let span = trace_span!("datastore"); + match async move { + con.get::>(key_for_counter(counter)) + .await + } + .instrument(span) + .await? { Some(val) => Ok(val - delta >= 0), None => Ok(counter.max_value() - delta >= 0), } } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key_for_counter(counter)) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.max_value()) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await?; + let span = trace_span!("datastore"); + async { + redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key_for_counter(counter)) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.max_value()) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .await + } + .instrument(span) + .await?; Ok(()) } - async fn check_and_update( + #[tracing::instrument(skip_all)] + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + mut counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.conn_manager.clone(); let counter_keys: Vec = counters.iter().map(key_for_counter).collect(); @@ -74,15 +89,35 @@ impl AsyncCounterStorage for AsyncRedisStorage { script_invocation.key(counter_key); } - let script_res: Vec> = script_invocation.invoke_async(&mut con).await?; + let script_res: Vec> = { + let span = trace_span!("datastore"); + async { + let start = Instant::now(); + let result = script_invocation.invoke_async(&mut con).await; + counter_access.observe(start.elapsed()); + result + } + .instrument(span) + .await? + }; if let Some(res) = is_limited(counters, delta, script_res) { return Ok(res); } } else { - let counter_vals: Vec> = redis::cmd("MGET") - .arg(counter_keys.clone()) - .query_async(&mut con) - .await?; + let counter_vals: Vec> = { + let span = trace_span!("datastore"); + async { + let start = Instant::now(); + let result = redis::cmd("MGET") + .arg(counter_keys.clone()) + .query_async(&mut con) + .await; + counter_access.observe(start.elapsed()); + result + } + .instrument(span) + .await? + }; for (i, counter) in counters.iter().enumerate() { let remaining = counter_vals[i].unwrap_or(counter.max_value()) - delta; @@ -97,28 +132,43 @@ impl AsyncCounterStorage for AsyncRedisStorage { // TODO: this can be optimized by using pipelines with multiple updates for (counter_idx, key) in counter_keys.into_iter().enumerate() { let counter = &counters[counter_idx]; - redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.max_value()) - .arg(counter.seconds()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .await?; + let span = trace_span!("datastore"); + async { + let start = Instant::now(); + let result = redis::Script::new(SCRIPT_UPDATE_COUNTER) + .key(key) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.max_value()) + .arg(counter.seconds()) + .arg(delta) + .invoke_async::<_, _>(&mut con) + .await; + counter_access.observe(start.elapsed()); + result + } + .instrument(span) + .await? } Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); let mut con = self.conn_manager.clone(); for limit in limits { - let counter_keys = con - .smembers::>(key_for_counters_of_limit(&limit)) - .await?; + let counter_keys = { + let span = trace_span!("datastore"); + async { + con.smembers::>(key_for_counters_of_limit(&limit)) + .await + } + .instrument(span) + .await? + }; for counter_key in counter_keys { let mut counter: Counter = counter_from_counter_key(&counter_key, &limit); @@ -130,9 +180,20 @@ impl AsyncCounterStorage for AsyncRedisStorage { // do the "get" + "delete if none" atomically. // This does not cause any bugs, but consumes memory // unnecessarily. - if let Some(val) = con.get::>(counter_key.clone()).await? { + let option = { + let span = trace_span!("datastore"); + async { con.get::>(counter_key.clone()).await } + .instrument(span) + .await? + }; + if let Some(val) = option { counter.set_remaining(val); - let ttl = con.ttl(&counter_key).await?; + let ttl = { + let span = trace_span!("datastore"); + async { con.ttl(&counter_key).await } + .instrument(span) + .await? + }; counter.set_expires_in(Duration::from_secs(ttl)); res.insert(counter); @@ -143,16 +204,24 @@ impl AsyncCounterStorage for AsyncRedisStorage { Ok(res) } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { for limit in limits { - self.delete_counters_associated_with_limit(&limit).await?; + let span = trace_span!("datastore"); + async { self.delete_counters_associated_with_limit(&limit).await } + .instrument(span) + .await? } Ok(()) } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - redis::cmd("FLUSHDB").query_async(&mut con).await?; + let span = trace_span!("datastore"); + async { redis::cmd("FLUSHDB").query_async(&mut con).await } + .instrument(span) + .await?; Ok(()) } } @@ -176,12 +245,21 @@ impl AsyncRedisStorage { async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); - let counter_keys = con - .smembers::>(key_for_counters_of_limit(limit)) - .await?; + let counter_keys = { + let span = trace_span!("datastore"); + async { + con.smembers::>(key_for_counters_of_limit(limit)) + .await + } + .instrument(span) + .await? + }; for counter_key in counter_keys { - con.del(counter_key).await?; + let span = trace_span!("datastore"); + async { con.del(counter_key).await } + .instrument(span) + .await?; } Ok(()) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 34e05f3c..2f6e1fc1 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -1,5 +1,6 @@ use crate::counter::Counter; use crate::limit::Limit; +use crate::prometheus_metrics::CounterAccess; use crate::storage::keys::*; use crate::storage::redis::batcher::Batcher; use crate::storage::redis::counters_cache::{CountersCache, CountersCacheBuilder}; @@ -47,12 +48,14 @@ pub struct CachedRedisStorage { #[async_trait] impl AsyncCounterStorage for CachedRedisStorage { + #[tracing::instrument(skip_all)] async fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { self.async_redis_storage .is_within_limits(counter, delta) .await } + #[tracing::instrument(skip_all)] async fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { self.async_redis_storage .update_counter(counter, delta) @@ -63,11 +66,13 @@ impl AsyncCounterStorage for CachedRedisStorage { // limits. In order to do so, we'd need to run this whole function // atomically, but that'd be too slow. // This function trades accuracy for speed. - async fn check_and_update( + #[tracing::instrument(skip_all)] + async fn check_and_update<'a>( &self, counters: &mut Vec, delta: i64, load_counters: bool, + _counter_access: CounterAccess<'a>, ) -> Result { let mut con = self.redis_conn_manager.clone(); @@ -166,14 +171,17 @@ impl AsyncCounterStorage for CachedRedisStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] async fn get_counters(&self, limits: HashSet) -> Result, StorageErr> { self.async_redis_storage.get_counters(limits).await } + #[tracing::instrument(skip_all)] async fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { self.async_redis_storage.delete_counters(limits).await } + #[tracing::instrument(skip_all)] async fn clear(&self) -> Result<(), StorageErr> { self.async_redis_storage.clear().await } diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index 8d0861e5..d6dee2ec 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -23,6 +23,7 @@ pub struct RedisStorage { } impl CounterStorage for RedisStorage { + #[tracing::instrument(skip_all)] fn is_within_limits(&self, counter: &Counter, delta: i64) -> Result { let mut con = self.conn_pool.get()?; @@ -32,10 +33,12 @@ impl CounterStorage for RedisStorage { } } + #[tracing::instrument(skip_all)] fn add_counter(&self, _limit: &Limit) -> Result<(), StorageErr> { Ok(()) } + #[tracing::instrument(skip_all)] fn update_counter(&self, counter: &Counter, delta: i64) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; @@ -50,6 +53,7 @@ impl CounterStorage for RedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn check_and_update( &self, counters: &mut Vec, @@ -100,6 +104,7 @@ impl CounterStorage for RedisStorage { Ok(Authorization::Ok) } + #[tracing::instrument(skip_all)] fn get_counters(&self, limits: &HashSet) -> Result, StorageErr> { let mut res = HashSet::new(); @@ -132,6 +137,7 @@ impl CounterStorage for RedisStorage { Ok(res) } + #[tracing::instrument(skip_all)] fn delete_counters(&self, limits: HashSet) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; @@ -147,6 +153,7 @@ impl CounterStorage for RedisStorage { Ok(()) } + #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; redis::cmd("FLUSHDB").execute(&mut *con);