Skip to content

Commit

Permalink
All ttls as Durations
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 21, 2024
1 parent 2813eb3 commit 18fa6e7
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 58 deletions.
2 changes: 1 addition & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub fn to_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
2 changes: 1 addition & 1 deletion limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn add_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
4 changes: 2 additions & 2 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Counter {
false
}

pub fn seconds(&self) -> u64 {
self.limit.seconds()
pub fn window(&self) -> Duration {
Duration::from_secs(self.limit.seconds())
}

pub fn namespace(&self) -> &Namespace {
Expand Down
22 changes: 13 additions & 9 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl AtomicExpiringValue {
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn update(&self, delta: u64, ttl: u64, when: SystemTime) -> u64 {
pub fn update(&self, delta: u64, ttl: Duration, when: SystemTime) -> u64 {
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
return delta;
Expand All @@ -42,7 +42,7 @@ impl AtomicExpiringValue {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}
}

Expand Down Expand Up @@ -70,7 +70,7 @@ impl AtomicExpiryTime {
.as_micros() as u64
}

pub fn duration(&self) -> Duration {
pub fn ttl(&self) -> Duration {
let expiry =
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst));
expiry
Expand All @@ -89,8 +89,8 @@ impl AtomicExpiryTime {
.store(Self::since_epoch(expiry), Ordering::SeqCst);
}

pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool {
let ttl_micros = ttl * 1_000_000;
pub fn update_if_expired(&self, ttl: Duration, when: SystemTime) -> bool {
let ttl_micros = u64::try_from(ttl.as_micros()).expect("Wow! The future is here!");
let when_micros = Self::since_epoch(when);
let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when_micros {
Expand Down Expand Up @@ -208,7 +208,7 @@ mod tests {
fn updates_when_valid() {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now + Duration::from_secs(1));
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -217,7 +217,7 @@ mod tests {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand All @@ -228,10 +228,14 @@ mod tests {

thread::scope(|s| {
s.spawn(|| {
atomic_expiring_value.update(1, 1, now);
atomic_expiring_value.update(1, Duration::from_secs(1), now);
});
s.spawn(|| {
atomic_expiring_value.update(2, 1, now + Duration::from_secs(11));
atomic_expiring_value.update(
2,
Duration::from_secs(1),
now + Duration::from_secs(11),
);
});
});
assert!([2u64, 3u64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst)));
Expand Down
12 changes: 8 additions & 4 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ impl ExpiringValue {
}

#[must_use]
pub fn update(self, delta: u64, ttl: u64, now: SystemTime) -> Self {
pub fn update(self, delta: u64, ttl: Duration, now: SystemTime) -> Self {
let expiry = if self.expiry <= now {
now + Duration::from_secs(ttl)
now + ttl
} else {
self.expiry
};
Expand Down Expand Up @@ -132,7 +132,11 @@ mod tests {
#[test]
fn updates_when_valid() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now);
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(
3,
Duration::from_secs(10),
now,
);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -141,7 +145,7 @@ mod tests {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
let val = val.update(3, 10, now);
let val = val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl RocksDbStorage {
let _entered = span.enter();
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
return Ok(value.update(delta, counter.window(), now));
}
Ok(value)
}
Expand Down
32 changes: 10 additions & 22 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,27 @@ impl CounterStorage for InMemoryStorage {
if counter.is_qualified() {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
value.update(delta, counter.seconds(), now);
value.update(delta, counter.window(), now);
} else {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
limits.insert(
counter.limit().clone(),
AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
),
AtomicExpiringValue::new(delta, now + counter.window()),
);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
v.insert(AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
));
v.insert(AtomicExpiringValue::new(delta, now + counter.window()));
}
Entry::Occupied(o) => {
o.get().update(delta, counter.seconds(), now);
o.get().update(delta, counter.window(), now);
}
},
}
Expand All @@ -102,8 +93,8 @@ impl CounterStorage for InMemoryStorage {
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.limits_for_namespace.read().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, Duration)> =
Vec::new();
let now = SystemTime::now();

Expand Down Expand Up @@ -138,17 +129,14 @@ impl CounterStorage for InMemoryStorage {
return Ok(limited);
}
}
counter_values_to_update.push((atomic_expiring_value, counter.seconds()));
counter_values_to_update.push((atomic_expiring_value, counter.window()));
}

// Process qualified counters
for counter in counters.iter_mut().filter(|c| c.is_qualified()) {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
Expand All @@ -159,7 +147,7 @@ impl CounterStorage for InMemoryStorage {
}
}

qualified_counter_values_to_updated.push((value, counter.seconds()));
qualified_counter_values_to_updated.push((value, counter.window()));
}

if let Some(limited) = first_limited {
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub mod bin {

CounterKey {
ns: counter.namespace().as_ref(),
seconds: counter.seconds(),
seconds: counter.window().as_secs(),
conditions,
variables: counter.variables_for_key(),
}
Expand Down
13 changes: 5 additions & 8 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl CachedCounterValue {
pub fn from_authority(counter: &Counter, value: u64) -> Self {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())),
value: AtomicExpiringValue::new(value, now + counter.window()),
initial_value: AtomicU64::new(value),
from_authority: AtomicBool::new(true),
}
Expand All @@ -35,10 +35,7 @@ impl CachedCounterValue {
pub fn load_from_authority_asap(counter: &Counter, temp_value: u64) -> Self {
let now = SystemTime::now();
Self {
value: AtomicExpiringValue::new(
temp_value,
now + Duration::from_secs(counter.seconds()),
),
value: AtomicExpiringValue::new(temp_value, now + counter.window()),
initial_value: AtomicU64::new(0),
from_authority: AtomicBool::new(false),
}
Expand All @@ -56,7 +53,7 @@ impl CachedCounterValue {
pub fn delta(&self, counter: &Counter, delta: u64) -> u64 {
let value = self
.value
.update(delta, counter.seconds(), SystemTime::now());
.update(delta, counter.window(), SystemTime::now());
if value == delta {
// new window, invalidate initial value
// which happens _after_ the self.value was reset, see `pending_writes`
Expand Down Expand Up @@ -132,7 +129,7 @@ impl CachedCounterValue {
self.hits(counter) as i128 + delta as i128 > counter.max_value() as i128
}

pub fn to_next_window(&self) -> Duration {
pub fn ttl(&self) -> Duration {
self.value.ttl()
}

Expand Down Expand Up @@ -472,7 +469,7 @@ mod tests {
let counter = test_counter(10, None);
let value = CachedCounterValue::from_authority(&counter, 0);
value.delta(&counter, hits);
assert!(value.to_next_window() > Duration::from_millis(59999));
assert!(value.ttl() > Duration::from_millis(59999));
assert_eq!(value.hits(&counter), hits);
let remaining = counter.max_value() - hits;
assert_eq!(value.remaining(&counter), remaining);
Expand Down
4 changes: 2 additions & 2 deletions limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ pub fn is_limited(
if x >= 0 {
Duration::from_millis(x as u64)
} else {
Duration::from_secs(counter.seconds())
counter.window()
}
})
.unwrap_or(Duration::from_secs(counter.seconds()));
.unwrap_or(counter.window());

counter.set_expires_in(expires_in);
if first_limited.is_none() && remaining.is_none() {
Expand Down
4 changes: 2 additions & 2 deletions limitador/src/storage/redis/redis_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key_for_counter(counter))
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.seconds())
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.instrument(debug_span!("datastore"))
Expand Down Expand Up @@ -117,7 +117,7 @@ impl AsyncCounterStorage for AsyncRedisStorage {
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.seconds())
.arg(counter.window().as_secs())
.arg(delta)
.invoke_async::<_, _>(&mut con)
.instrument(debug_span!("datastore"))
Expand Down
6 changes: 3 additions & 3 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
.checked_sub(delta)
.unwrap_or_default(),
);
counter.set_expires_in(val.to_next_window());
counter.set_expires_in(val.ttl());
}
}
_ => {
Expand All @@ -114,7 +114,7 @@ impl AsyncCounterStorage for CachedRedisStorage {
}
if load_counters {
counter.set_remaining(remaining - delta);
counter.set_expires_in(fake.to_next_window()); // todo: this is a plain lie!
counter.set_expires_in(fake.ttl()); // todo: this is a plain lie!
}
}
}
Expand Down Expand Up @@ -300,7 +300,7 @@ async fn update_counters<C: ConnectionLike>(
if delta > 0 {
script_invocation.key(key_for_counter(&counter));
script_invocation.key(key_for_counters_of_limit(counter.limit()));
script_invocation.arg(counter.seconds());
script_invocation.arg(counter.window().as_secs());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, last_value_from_redis, delta, 0));
Expand Down
4 changes: 2 additions & 2 deletions limitador/src/storage/redis/redis_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl CounterStorage for RedisStorage {
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key_for_counter(counter))
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.seconds())
.arg(counter.window().as_secs())
.arg(delta)
.invoke(&mut *con)?;

Expand Down Expand Up @@ -97,7 +97,7 @@ impl CounterStorage for RedisStorage {
redis::Script::new(SCRIPT_UPDATE_COUNTER)
.key(key)
.key(key_for_counters_of_limit(counter.limit()))
.arg(counter.seconds())
.arg(counter.window().as_secs())
.arg(delta)
.invoke(&mut *con)?;
}
Expand Down

0 comments on commit 18fa6e7

Please sign in to comment.