Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional metrics #321

Merged
merged 8 commits into from
May 17, 2024
17 changes: 11 additions & 6 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::config::{
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
use crate::metrics::MetricsLayer;
use ::metrics::histogram;
use clap::{value_parser, Arg, ArgAction, Command};
use const_format::formatcp;
use limitador::counter::Counter;
Expand Down Expand Up @@ -202,11 +201,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::layer()
};

let metrics_layer = MetricsLayer::new().gather(
"should_rate_limit",
|timings| histogram!("counter_latency").record(Duration::from(timings).as_secs_f64()),
vec!["datastore"],
);
let metrics_layer = MetricsLayer::new()
.gather(
"should_rate_limit",
PrometheusMetrics::record_datastore_latency,
vec!["datastore"],
)
.gather(
"flush_batcher_and_update_counters",
PrometheusMetrics::record_datastore_latency,
vec!["datastore"],
);

if !config.tracing_endpoint.is_empty() {
global::set_text_map_propagator(TraceContextPropagator::new());
Expand Down
40 changes: 27 additions & 13 deletions limitador-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Timings {
idle: u64,
busy: u64,
last: Instant,
updated: bool,
}

impl Timings {
Expand All @@ -19,6 +20,7 @@ impl Timings {
idle: 0,
busy: 0,
last: Instant::now(),
updated: false,
}
}
}
Expand All @@ -31,6 +33,7 @@ impl ops::Add for Timings {
busy: self.busy + rhs.busy,
idle: self.idle + rhs.idle,
last: self.last.max(rhs.last),
updated: self.updated || rhs.updated,
}
}
}
Expand Down Expand Up @@ -68,39 +71,39 @@ impl SpanState {
}
}

pub struct MetricsGroup<F: Fn(Timings)> {
consumer: F,
pub struct MetricsGroup {
consumer: Box<fn(Timings)>,
records: Vec<String>,
}

impl<F: Fn(Timings)> MetricsGroup<F> {
pub fn new(consumer: F, records: Vec<String>) -> Self {
impl MetricsGroup {
pub fn new(consumer: Box<fn(Timings)>, records: Vec<String>) -> Self {
Self { consumer, records }
}
}

pub struct MetricsLayer<F: Fn(Timings)> {
groups: HashMap<String, MetricsGroup<F>>,
pub struct MetricsLayer {
groups: HashMap<String, MetricsGroup>,
}

impl<F: Fn(Timings)> MetricsLayer<F> {
impl MetricsLayer {
pub fn new() -> Self {
Self {
groups: HashMap::new(),
}
}

pub fn gather(mut self, aggregate: &str, consumer: F, records: Vec<&str>) -> Self {
pub fn gather(mut self, aggregate: &str, consumer: fn(Timings), records: Vec<&str>) -> Self {
// TODO(adam-cattermole): does not handle case where aggregate already exists
let rec = records.iter().map(|r| r.to_string()).collect();
self.groups
.entry(aggregate.to_string())
.or_insert(MetricsGroup::new(consumer, rec));
.or_insert(MetricsGroup::new(Box::new(consumer), rec));
self
}
}

impl<S, F: Fn(Timings) + 'static> Layer<S> for MetricsLayer<F>
impl<S> Layer<S> for MetricsLayer
where
S: Subscriber,
S: for<'lookup> LookupSpan<'lookup>,
Expand Down Expand Up @@ -160,6 +163,7 @@ where
let now = Instant::now();
timings.idle += (now - timings.last).as_nanos() as u64;
timings.last = now;
timings.updated = true;
}
}

Expand All @@ -171,6 +175,7 @@ where
let now = Instant::now();
timings.busy += (now - timings.last).as_nanos() as u64;
timings.last = now;
timings.updated = true;
}
}

Expand Down Expand Up @@ -210,7 +215,9 @@ where
}
// IF we are aggregator call consume function
if let Some(metrics_group) = self.groups.get(name) {
(metrics_group.consumer)(*span_state.group_times.get(name).unwrap())
if let Some(t) = span_state.group_times.get(name).filter(|&t| t.updated) {
(metrics_group.consumer)(*t);
}
}
}
}
Expand All @@ -228,19 +235,22 @@ mod tests {
idle: 5,
busy: 5,
last: now,
updated: false,
};
let t2 = Timings {
idle: 3,
busy: 5,
last: now,
updated: false,
};
let t3 = t1 + t2;
assert_eq!(
t3,
Timings {
idle: 8,
busy: 10,
last: now
last: now,
updated: false,
}
)
}
Expand All @@ -252,19 +262,22 @@ mod tests {
idle: 5,
busy: 5,
last: now,
updated: false,
};
let t2 = Timings {
idle: 3,
busy: 5,
last: now,
updated: false,
};
t1 += t2;
assert_eq!(
t1,
Timings {
idle: 8,
busy: 10,
last: now
last: now,
updated: false,
}
)
}
Expand All @@ -277,6 +290,7 @@ mod tests {
idle: 5,
busy: 5,
last: Instant::now(),
updated: true,
};
span_state.increment(&group, t1);
assert_eq!(span_state.group_times.get(&group).unwrap().idle, t1.idle);
Expand Down
15 changes: 13 additions & 2 deletions limitador-server/src/prometheus_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge};
use metrics::{counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::sync::Arc;
use std::time::Duration;

use crate::metrics::Timings;
use limitador::limit::Namespace;

const NAMESPACE_LABEL: &str = "limitador_namespace";
Expand Down Expand Up @@ -40,13 +42,18 @@ impl PrometheusMetrics {
prometheus_handle: Arc<PrometheusHandle>,
) -> Self {
describe_histogram!(
"counter_latency",
"datastore_latency",
"Latency to the underlying counter datastore"
);
describe_counter!("authorized_calls", "Authorized calls");
describe_counter!("limited_calls", "Limited calls");
describe_gauge!("limitador_up", "Limitador is running");
gauge!("limitador_up").set(1);
describe_gauge!(
"datastore_partitioned",
"Limitador is partitioned from backing datastore"
);
gauge!("datastore_partitioned").set(0);
Self {
use_limit_name_label,
prometheus_handle,
Expand Down Expand Up @@ -86,6 +93,10 @@ impl PrometheusMetrics {
pub fn gather_metrics(&self) -> String {
self.prometheus_handle.render()
}

pub fn record_datastore_latency(timings: Timings) {
histogram!("datastore_latency").record(Duration::from(timings).as_secs_f64())
}
}

#[cfg(test)]
Expand Down
45 changes: 39 additions & 6 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::storage::atomic_expiring_value::AtomicExpiringValue;
use crate::storage::redis::DEFAULT_MAX_CACHED_COUNTERS;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use metrics::{counter, gauge, histogram};
use moka::notification::RemovalCause;
use moka::sync::Cache;
use std::collections::HashMap;
use std::future::Future;
Expand Down Expand Up @@ -42,8 +44,11 @@ impl CachedCounterValue {
}
}

pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime) {
self.value.add_and_set_expiry(delta, expire_at);
pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime, max_value: u64) {
let new_val = self.value.add_and_set_expiry(delta, expire_at);
if new_val > max_value {
histogram!("counter_overshoot").record((new_val - max_value) as f64);
}
self.initial_value.fetch_add(delta, Ordering::SeqCst);
self.from_authority.store(true, Ordering::Release);
}
Expand Down Expand Up @@ -150,6 +155,7 @@ impl Batcher {
}
Entry::Vacant(miss) => {
self.limiter.acquire().await.unwrap().forget();
gauge!("batcher_size").increment(1);
miss.insert_entry(value);
}
};
Expand Down Expand Up @@ -183,13 +189,15 @@ impl Batcher {
let value = self.updates.get(counter).unwrap().clone();
result.insert(counter.clone(), value);
}
histogram!("batcher_flush_size").record(result.len() as f64);
let result = consumer(result).await;
batch.iter().for_each(|counter| {
let prev = self
.updates
.remove_if(counter, |_, v| v.no_pending_writes());
if prev.is_some() {
self.limiter.add_permits(1);
gauge!("batcher_size").decrement(1);
}
});
return result;
Expand Down Expand Up @@ -232,6 +240,7 @@ impl CountersCache {
if option.is_none() {
let from_queue = self.batcher.updates.get(counter);
if let Some(entry) = from_queue {
gauge!("cache_size").increment(1);
self.cache.insert(counter.clone(), entry.value().clone());
return Some(entry.value().clone());
}
Expand All @@ -255,17 +264,22 @@ impl CountersCache {
if expiry_ts > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
gauge!("cache_size").increment(1);
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, expiry_ts);
cached_value.add_from_authority(
remote_deltas,
expiry_ts,
counter.max_value(),
);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(&counter, redis_val))
}
});
if from_cache {
cached.add_from_authority(remote_deltas, expiry_ts);
cached.add_from_authority(remote_deltas, expiry_ts, counter.max_value());
}
return cached;
}
Expand All @@ -277,6 +291,7 @@ impl CountersCache {

pub async fn increase_by(&self, counter: &Counter, delta: u64) {
let val = self.cache.get_with_by_ref(counter, || {
gauge!("cache_size").increment(1);
if let Some(entry) = self.batcher.updates.get(counter) {
entry.value().clone()
} else {
Expand Down Expand Up @@ -304,9 +319,23 @@ impl CountersCacheBuilder {
self
}

fn eviction_listener(
_key: Arc<Counter>,
value: Arc<CachedCounterValue>,
_removal_cause: RemovalCause,
) {
gauge!("cache_size").decrement(1);
if value.no_pending_writes().not() {
counter!("evicted_pending_writes").increment(1);
}
}

pub fn build(&self, period: Duration) -> CountersCache {
CountersCache {
cache: Cache::new(self.max_cached_counters as u64),
cache: Cache::builder()
.max_capacity(self.max_cached_counters as u64)
.eviction_listener(Self::eviction_listener)
.build(),
batcher: Batcher::new(period, self.max_cached_counters),
}
}
Expand Down Expand Up @@ -363,7 +392,11 @@ mod tests {
let value = CachedCounterValue::from_authority(&counter, 0);
value.delta(&counter, 5);
assert!(value.no_pending_writes().not());
value.add_from_authority(6, SystemTime::now().add(Duration::from_secs(1)));
value.add_from_authority(
6,
SystemTime::now().add(Duration::from_secs(1)),
counter.max_value(),
);
assert!(value.no_pending_writes().not());
assert_eq!(value.pending_writes(), Ok(5));
}
Expand Down
6 changes: 5 additions & 1 deletion limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use crate::storage::redis::is_limited;
use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
use metrics::histogram;
use redis::{AsyncCommands, RedisError};
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use tracing::{debug_span, Instrument};

// Note: this implementation does not guarantee exact limits. Ensuring that we
Expand Down Expand Up @@ -209,11 +210,14 @@ impl AsyncRedisStorage {
}

pub async fn is_alive(&self) -> bool {
let now = Instant::now();
self.conn_manager
.clone()
.incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1)
.await
.is_ok()
.then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64()))
.is_some()
}

async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> {
Expand Down
Loading
Loading