From 18fa6e72a593f2a50856fe874c6204b431480513 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 16 May 2024 10:08:59 -0400 Subject: [PATCH] All ttls as Durations --- limitador-server/src/envoy_rls/server.rs | 2 +- limitador-server/src/http_api/server.rs | 2 +- limitador/src/counter.rs | 4 +-- .../src/storage/atomic_expiring_value.rs | 22 +++++++------ limitador/src/storage/disk/expiring_value.rs | 12 ++++--- limitador/src/storage/disk/rocksdb_storage.rs | 2 +- limitador/src/storage/in_memory.rs | 32 ++++++------------- limitador/src/storage/keys.rs | 2 +- limitador/src/storage/redis/counters_cache.rs | 13 +++----- limitador/src/storage/redis/mod.rs | 4 +-- limitador/src/storage/redis/redis_async.rs | 4 +-- limitador/src/storage/redis/redis_cached.rs | 6 ++-- limitador/src/storage/redis/redis_sync.rs | 4 +-- 13 files changed, 51 insertions(+), 58 deletions(-) diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index 7735f8f6..e6d5f08f 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -175,7 +175,7 @@ pub fn to_response_header( let mut all_limits_text = String::with_capacity(20 * counters.len()); counters.iter_mut().for_each(|counter| { all_limits_text.push_str( - format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(), + format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(), ); if let Some(name) = counter.limit().name() { all_limits_text diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index ec0606b6..97937d69 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -253,7 +253,7 @@ pub fn add_response_header( let mut all_limits_text = String::with_capacity(20 * counters.len()); counters.iter_mut().for_each(|counter| { all_limits_text.push_str( - format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(), + format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(), ); if let Some(name) = counter.limit().name() { all_limits_text diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index 702d0c70..efd486e7 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -68,8 +68,8 @@ impl Counter { false } - pub fn seconds(&self) -> u64 { - self.limit.seconds() + pub fn window(&self) -> Duration { + Duration::from_secs(self.limit.seconds()) } pub fn namespace(&self) -> &Namespace { diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 8b00c7bd..0353e041 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -33,7 +33,7 @@ impl AtomicExpiringValue { self.value.fetch_add(delta, Ordering::SeqCst) + delta } - pub fn update(&self, delta: u64, ttl: u64, when: SystemTime) -> u64 { + pub fn update(&self, delta: u64, ttl: Duration, when: SystemTime) -> u64 { if self.expiry.update_if_expired(ttl, when) { self.value.store(delta, Ordering::SeqCst); return delta; @@ -42,7 +42,7 @@ impl AtomicExpiringValue { } pub fn ttl(&self) -> Duration { - self.expiry.duration() + self.expiry.ttl() } } @@ -70,7 +70,7 @@ impl AtomicExpiryTime { .as_micros() as u64 } - pub fn duration(&self) -> Duration { + pub fn ttl(&self) -> Duration { let expiry = SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst)); expiry @@ -89,8 +89,8 @@ impl AtomicExpiryTime { .store(Self::since_epoch(expiry), Ordering::SeqCst); } - pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool { - let ttl_micros = ttl * 1_000_000; + pub fn update_if_expired(&self, ttl: Duration, when: SystemTime) -> bool { + let ttl_micros = u64::try_from(ttl.as_micros()).expect("Wow! The future is here!"); let when_micros = Self::since_epoch(when); let expiry = self.expiry.load(Ordering::SeqCst); if expiry <= when_micros { @@ -208,7 +208,7 @@ mod tests { fn updates_when_valid() { let now = SystemTime::now(); let val = AtomicExpiringValue::new(42, now + Duration::from_secs(1)); - val.update(3, 10, now); + val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); } @@ -217,7 +217,7 @@ mod tests { let now = SystemTime::now(); let val = AtomicExpiringValue::new(42, now); assert_eq!(val.ttl(), Duration::ZERO); - val.update(3, 10, now); + val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); } @@ -228,10 +228,14 @@ mod tests { thread::scope(|s| { s.spawn(|| { - atomic_expiring_value.update(1, 1, now); + atomic_expiring_value.update(1, Duration::from_secs(1), now); }); s.spawn(|| { - atomic_expiring_value.update(2, 1, now + Duration::from_secs(11)); + atomic_expiring_value.update( + 2, + Duration::from_secs(1), + now + Duration::from_secs(11), + ); }); }); assert!([2u64, 3u64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst))); diff --git a/limitador/src/storage/disk/expiring_value.rs b/limitador/src/storage/disk/expiring_value.rs index 948a85db..2932d6db 100644 --- a/limitador/src/storage/disk/expiring_value.rs +++ b/limitador/src/storage/disk/expiring_value.rs @@ -25,9 +25,9 @@ impl ExpiringValue { } #[must_use] - pub fn update(self, delta: u64, ttl: u64, now: SystemTime) -> Self { + pub fn update(self, delta: u64, ttl: Duration, now: SystemTime) -> Self { let expiry = if self.expiry <= now { - now + Duration::from_secs(ttl) + now + ttl } else { self.expiry }; @@ -132,7 +132,11 @@ mod tests { #[test] fn updates_when_valid() { let now = SystemTime::now(); - let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now); + let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update( + 3, + Duration::from_secs(10), + now, + ); assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); } @@ -141,7 +145,7 @@ mod tests { let now = SystemTime::now(); let val = ExpiringValue::new(42, now); assert_eq!(val.ttl(), Duration::ZERO); - let val = val.update(3, 10, now); + let val = val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); } diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index 4304af09..1e19c2c6 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -219,7 +219,7 @@ impl RocksDbStorage { let _entered = span.enter(); self.db .merge(key, >>::into(expiring_value))?; - return Ok(value.update(delta, counter.seconds(), now)); + return Ok(value.update(delta, counter.window(), now)); } Ok(value) } diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index b26d44d5..f32e2a22 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -56,36 +56,27 @@ impl CounterStorage for InMemoryStorage { if counter.is_qualified() { let value = match self.qualified_counters.get(counter) { None => self.qualified_counters.get_with(counter.clone(), || { - Arc::new(AtomicExpiringValue::new( - 0, - now + Duration::from_secs(counter.seconds()), - )) + Arc::new(AtomicExpiringValue::new(0, now + counter.window())) }), Some(counter) => counter, }; - value.update(delta, counter.seconds(), now); + value.update(delta, counter.window(), now); } else { match limits_by_namespace.entry(counter.limit().namespace().clone()) { Entry::Vacant(v) => { let mut limits = HashMap::new(); limits.insert( counter.limit().clone(), - AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - ), + AtomicExpiringValue::new(delta, now + counter.window()), ); v.insert(limits); } Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { Entry::Vacant(v) => { - v.insert(AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - )); + v.insert(AtomicExpiringValue::new(delta, now + counter.window())); } Entry::Occupied(o) => { - o.get().update(delta, counter.seconds(), now); + o.get().update(delta, counter.window(), now); } }, } @@ -102,8 +93,8 @@ impl CounterStorage for InMemoryStorage { ) -> Result { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); let mut first_limited = None; - let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); - let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = + let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new(); + let mut qualified_counter_values_to_updated: Vec<(Arc, Duration)> = Vec::new(); let now = SystemTime::now(); @@ -138,17 +129,14 @@ impl CounterStorage for InMemoryStorage { return Ok(limited); } } - counter_values_to_update.push((atomic_expiring_value, counter.seconds())); + counter_values_to_update.push((atomic_expiring_value, counter.window())); } // Process qualified counters for counter in counters.iter_mut().filter(|c| c.is_qualified()) { let value = match self.qualified_counters.get(counter) { None => self.qualified_counters.get_with(counter.clone(), || { - Arc::new(AtomicExpiringValue::new( - 0, - now + Duration::from_secs(counter.seconds()), - )) + Arc::new(AtomicExpiringValue::new(0, now + counter.window())) }), Some(counter) => counter, }; @@ -159,7 +147,7 @@ impl CounterStorage for InMemoryStorage { } } - qualified_counter_values_to_updated.push((value, counter.seconds())); + qualified_counter_values_to_updated.push((value, counter.window())); } if let Some(limited) = first_limited { diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index 7b8a3596..6d32977c 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -153,7 +153,7 @@ pub mod bin { CounterKey { ns: counter.namespace().as_ref(), - seconds: counter.seconds(), + seconds: counter.window().as_secs(), conditions, variables: counter.variables_for_key(), } diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 67591f44..2d6e31f9 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -26,7 +26,7 @@ impl CachedCounterValue { pub fn from_authority(counter: &Counter, value: u64) -> Self { let now = SystemTime::now(); Self { - value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), + value: AtomicExpiringValue::new(value, now + counter.window()), initial_value: AtomicU64::new(value), from_authority: AtomicBool::new(true), } @@ -35,10 +35,7 @@ impl CachedCounterValue { pub fn load_from_authority_asap(counter: &Counter, temp_value: u64) -> Self { let now = SystemTime::now(); Self { - value: AtomicExpiringValue::new( - temp_value, - now + Duration::from_secs(counter.seconds()), - ), + value: AtomicExpiringValue::new(temp_value, now + counter.window()), initial_value: AtomicU64::new(0), from_authority: AtomicBool::new(false), } @@ -56,7 +53,7 @@ impl CachedCounterValue { pub fn delta(&self, counter: &Counter, delta: u64) -> u64 { let value = self .value - .update(delta, counter.seconds(), SystemTime::now()); + .update(delta, counter.window(), SystemTime::now()); if value == delta { // new window, invalidate initial value // which happens _after_ the self.value was reset, see `pending_writes` @@ -132,7 +129,7 @@ impl CachedCounterValue { self.hits(counter) as i128 + delta as i128 > counter.max_value() as i128 } - pub fn to_next_window(&self) -> Duration { + pub fn ttl(&self) -> Duration { self.value.ttl() } @@ -472,7 +469,7 @@ mod tests { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, hits); - assert!(value.to_next_window() > Duration::from_millis(59999)); + assert!(value.ttl() > Duration::from_millis(59999)); assert_eq!(value.hits(&counter), hits); let remaining = counter.max_value() - hits; assert_eq!(value.remaining(&counter), remaining); diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index 2b5c14e2..93167908 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -56,10 +56,10 @@ pub fn is_limited( if x >= 0 { Duration::from_millis(x as u64) } else { - Duration::from_secs(counter.seconds()) + counter.window() } }) - .unwrap_or(Duration::from_secs(counter.seconds())); + .unwrap_or(counter.window()); counter.set_expires_in(expires_in); if first_limited.is_none() && remaining.is_none() { diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index d77a16bd..dc12404c 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -53,7 +53,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key_for_counter(counter)) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) @@ -117,7 +117,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 1a5e036b..3e1125bf 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -93,7 +93,7 @@ impl AsyncCounterStorage for CachedRedisStorage { .checked_sub(delta) .unwrap_or_default(), ); - counter.set_expires_in(val.to_next_window()); + counter.set_expires_in(val.ttl()); } } _ => { @@ -114,7 +114,7 @@ impl AsyncCounterStorage for CachedRedisStorage { } if load_counters { counter.set_remaining(remaining - delta); - counter.set_expires_in(fake.to_next_window()); // todo: this is a plain lie! + counter.set_expires_in(fake.ttl()); // todo: this is a plain lie! } } } @@ -300,7 +300,7 @@ async fn update_counters( if delta > 0 { script_invocation.key(key_for_counter(&counter)); script_invocation.key(key_for_counters_of_limit(counter.limit())); - script_invocation.arg(counter.seconds()); + script_invocation.arg(counter.window().as_secs()); script_invocation.arg(delta); // We need to store the counter in the actual order we are sending it to the script res.push((counter, last_value_from_redis, delta, 0)); diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index 11113de1..002e9444 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -45,7 +45,7 @@ impl CounterStorage for RedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key_for_counter(counter)) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke(&mut *con)?; @@ -97,7 +97,7 @@ impl CounterStorage for RedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke(&mut *con)?; }