Skip to content

Commit

Permalink
Distributed storage to use the new sigs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 21, 2024
1 parent 6ca0978 commit ee25596
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
13 changes: 5 additions & 8 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<A: Ord> CrCounterValue<A> {
ourselves: actor,
value: Default::default(),
others: RwLock::default(),
expiry: AtomicExpiryTime::from_now(time_window),
expiry: AtomicExpiryTime::new(SystemTime::now() + time_window),
}
}

Expand All @@ -43,7 +43,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) {
if self.expiry.update_if_expired(time_window.as_secs(), when) {
if self.expiry.update_if_expired(time_window, when) {
self.value.store(increment, Ordering::SeqCst);
} else {
self.value.fetch_add(increment, Ordering::SeqCst);
Expand All @@ -59,10 +59,7 @@ impl<A: Ord> CrCounterValue<A> {
self.inc_at(increment, time_window, when);
} else {
let mut guard = self.others.write().unwrap();
if self
.expiry
.update_if_expired(time_window.as_micros() as u64, when)
{
if self.expiry.update_if_expired(time_window, when) {
guard.insert(actor, increment);
} else {
*guard.entry(actor).or_insert(0) += increment;
Expand Down Expand Up @@ -109,7 +106,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}

pub fn expiry(&self) -> SystemTime {
Expand Down Expand Up @@ -282,6 +279,6 @@ mod tests {
a.inc(3, later);
b.inc(2, later);
a.merge(b);
assert!(a.expiry.duration() < sooner);
assert!(a.expiry.ttl() < sooner);
}
}
16 changes: 8 additions & 8 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand All @@ -82,16 +82,16 @@ impl CounterStorage for CrInMemoryStorage {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
let duration = Duration::from_secs(counter.seconds());
let counter_val = CrCounterValue::new(self.identifier.clone(), duration);
let counter_val =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_val, delta, now);
limits.insert(counter.limit().clone(), counter_val);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
let duration = Duration::from_secs(counter.seconds());
let counter_value = CrCounterValue::new(self.identifier.clone(), duration);
let counter_value =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_value, delta, now);
v.insert(counter_value);
}
Expand Down Expand Up @@ -158,7 +158,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand Down Expand Up @@ -394,7 +394,7 @@ impl CrInMemoryStorage {
delta: u64,
when: SystemTime,
) {
counter.inc_at(delta, Duration::from_secs(key.seconds()), when);
counter.inc_at(delta, key.window(), when);
let sender = self.sender.clone();
let counter = counter.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -429,7 +429,7 @@ impl From<Counter> for CounterKey {
fn from(value: Counter) -> Self {
Self {
namespace: value.namespace().clone(),
seconds: value.seconds(),
seconds: value.window().as_secs(),
variables: value.limit().variables(),
conditions: value.limit().conditions(),
vars: value.set_variables().clone(),
Expand Down

0 comments on commit ee25596

Please sign in to comment.