Skip to content

Commit

Permalink
Base Cr(dt)CounterValue
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 11, 2024
1 parent 00bc5c1 commit c177adb
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 0 deletions.
34 changes: 34 additions & 0 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,40 @@ impl AtomicExpiryTime {
}
false
}

pub fn merge(&self, other: Self) {
let mut other = other;
loop {
let now = SystemTime::now();
other = match self.merge_at(other, now) {
Ok(_) => return,
Err(other) => other,
};
}
}

pub fn merge_at(&self, other: Self, when: SystemTime) -> Result<(), Self> {
let other_exp = other.expiry.load(Ordering::SeqCst);
let expiry = self.expiry.load(Ordering::SeqCst);
if other_exp < expiry && other_exp > Self::since_epoch(when) {
// if our expiry changed, some thread observed the time window as elapsed...
// `other` can't be in the future anymore! Safely ignoring the failure scenario
return match self.expiry.compare_exchange(
expiry,
other_exp,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => Ok(()),
Err(_) => Err(other),
};
}
Ok(())
}

pub fn into_inner(self) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst))
}
}

impl Clone for AtomicExpiryTime {
Expand Down
209 changes: 209 additions & 0 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
use crate::storage::atomic_expiring_value::AtomicExpiryTime;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::ops::Not;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::time::{Duration, SystemTime};

struct CrCounterValue<A: Ord> {
ourselves: A,
value: AtomicU64,
others: RwLock<BTreeMap<A, u64>>,
expiry: AtomicExpiryTime,
}

impl<A: Ord> CrCounterValue<A> {
pub fn new(actor: A, time_window: Duration) -> Self {
Self {
ourselves: actor,
value: Default::default(),
others: RwLock::default(),
expiry: AtomicExpiryTime::from_now(time_window),
}
}

pub fn read(&self) -> u64 {
self.read_at(SystemTime::now())
}

pub fn read_at(&self, when: SystemTime) -> u64 {
if self.expiry.expired_at(when) {
0
} else {
let guard = self.others.read().unwrap();
let others: u64 = guard.values().sum();
others + self.value.load(Ordering::Relaxed)
}
}

pub fn inc(&self, increment: u64, time_window: Duration) {
self.inc_at(increment, time_window, SystemTime::now())
}

pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) {
if self
.expiry
.update_if_expired(time_window.as_micros() as u64, when)
{
self.value.store(increment, Ordering::SeqCst);
} else {
self.value.fetch_add(increment, Ordering::SeqCst);
}
}

pub fn inc_actor(&self, actor: A, increment: u64, time_window: Duration) {
self.inc_actor_at(actor, increment, time_window, SystemTime::now());
}

pub fn inc_actor_at(&self, actor: A, increment: u64, time_window: Duration, when: SystemTime) {
if actor == self.ourselves {
self.inc_at(increment, time_window, when);
} else {
let mut guard = self.others.write().unwrap();
if self
.expiry
.update_if_expired(time_window.as_micros() as u64, when)
{
guard.insert(actor, increment);
} else {
*guard.entry(actor).or_insert(0) += increment;
}
}
}

pub fn merge(&self, other: Self) {
self.merge_at(other, SystemTime::now());
}

pub fn merge_at(&self, other: Self, when: SystemTime) {
if self.expiry.expired_at(when).not() && other.expiry.expired_at(when).not() {
let (expiry, other_values) = other.into_inner();
let _ = self.expiry.merge_at(AtomicExpiryTime::new(expiry), when);
let ourselves = self.value.load(Ordering::SeqCst);
let mut others = self.others.write().unwrap();
for (actor, other_value) in other_values {
if actor == self.ourselves {
if other_value > ourselves {
self.value
.fetch_add(other_value - ourselves, Ordering::SeqCst);
}
} else {
match others.entry(actor) {
Entry::Vacant(entry) => {
entry.insert(other_value);
}
Entry::Occupied(mut known) => {
let local = known.get_mut();
if other_value > *local {
*local = other_value;
}
}
}
}
}
}
}

fn into_inner(self) -> (SystemTime, BTreeMap<A, u64>) {
let Self {
ourselves,
value,
others,
expiry,
} = self;
let mut map = others.into_inner().unwrap();
map.insert(ourselves, value.into_inner());
(expiry.into_inner(), map)
}
}

#[cfg(test)]
mod tests {
use crate::storage::distributed::cr_counter_value::CrCounterValue;
use std::time::{Duration, SystemTime};

#[test]
fn local_increments_are_readable() {
let window = Duration::from_secs(1);
let a = CrCounterValue::new('A', window);
a.inc(3, window);
assert_eq!(3, a.read());
a.inc(2, window);
assert_eq!(5, a.read());
}

#[test]
fn local_increments_expire() {
let window = Duration::from_secs(1);
let a = CrCounterValue::new('A', window);
let now = SystemTime::now();
a.inc_at(3, window, now);
assert_eq!(3, a.read());
a.inc_at(2, window, now + window);
assert_eq!(2, a.read());
}

#[test]
fn other_increments_are_readable() {
let window = Duration::from_secs(1);
let a = CrCounterValue::new('A', window);
a.inc_actor('B', 3, window);
assert_eq!(3, a.read());
a.inc_actor('B', 2, window);
assert_eq!(5, a.read());
}

#[test]
fn other_increments_expire() {
let window = Duration::from_secs(1);
let a = CrCounterValue::new('A', window);
let now = SystemTime::now();
a.inc_actor_at('B', 3, window, now);
assert_eq!(3, a.read());
a.inc_actor_at('B', 2, window, now + window);
assert_eq!(2, a.read());
}

#[test]
fn merges() {
let window = Duration::from_secs(1);
{
let a = CrCounterValue::new('A', window);
let b = CrCounterValue::new('B', window);
a.inc(3, window);
b.inc(2, window);
a.merge(b);
assert_eq!(a.read(), 5);
}

{
let a = CrCounterValue::new('A', window);
let b = CrCounterValue::new('B', window);
a.inc(3, window);
b.inc(2, window);
b.merge(a);
assert_eq!(b.read(), 5);
}

{
let a = CrCounterValue::new('A', window);
let b = CrCounterValue::new('B', window);
a.inc(3, window);
b.inc(2, window);
b.inc_actor('A', 2, window); // older value!
b.merge(a); // merges the 3
assert_eq!(b.read(), 5);
}

{
let a = CrCounterValue::new('A', window);
let b = CrCounterValue::new('B', window);
a.inc(3, window);
b.inc(2, window);
b.inc_actor('A', 5, window); // newer value!
b.merge(a); // ignores the 3 and keeps its own 5 for a
assert_eq!(b.read(), 7);
}
}
}
1 change: 1 addition & 0 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod cr_counter_value;
1 change: 1 addition & 0 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use thiserror::Error;

#[cfg(feature = "disk_storage")]
pub mod disk;
pub mod distributed;
pub mod in_memory;
pub mod wasm;

Expand Down

0 comments on commit c177adb

Please sign in to comment.