Skip to content

Commit

Permalink
Wired distributed storage in
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 14, 2024
1 parent d364a01 commit 33d9762
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
9 changes: 9 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub enum StorageConfiguration {
InMemory(InMemoryStorageConfiguration),
Disk(DiskStorageConfiguration),
Redis(RedisStorageConfiguration),
Distributed(DistributedStorageConfiguration),
#[cfg(feature = "infinispan")]
Infinispan(InfinispanStorageConfiguration),
}
Expand All @@ -150,6 +151,14 @@ pub struct InMemoryStorageConfiguration {
pub cache_size: Option<u64>,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DistributedStorageConfiguration {
pub name: String,
pub cache_size: Option<u64>,
pub local: String,
pub broadcast: String,
}

#[derive(PartialEq, Eq, Debug)]
pub struct DiskStorageConfiguration {
pub path: String,
Expand Down
63 changes: 61 additions & 2 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ extern crate clap;
#[cfg(feature = "infinispan")]
use crate::config::InfinispanStorageConfiguration;
use crate::config::{
Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration,
RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration,
Configuration, DiskStorageConfiguration, DistributedStorageConfiguration,
InMemoryStorageConfiguration, RedisStorageCacheConfiguration, RedisStorageConfiguration,
StorageConfiguration,
};
use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders};
use crate::http_api::server::run_http_server;
Expand All @@ -27,6 +28,7 @@ use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
};
use limitador::storage::DistributedInMemoryStorage;
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder,
Expand Down Expand Up @@ -93,6 +95,7 @@ impl Limiter {
#[cfg(feature = "infinispan")]
StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await,
StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg),
StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg),
StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg),
};

Expand Down Expand Up @@ -207,6 +210,19 @@ impl Limiter {
Self::Blocking(rate_limiter_builder.build())
}

fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self {
let storage = DistributedInMemoryStorage::new(
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
cfg.broadcast,
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));

Self::Blocking(rate_limiter_builder.build())
}

pub async fn load_limits_from_file<P: AsRef<Path>>(
&self,
path: &P,
Expand Down Expand Up @@ -631,6 +647,41 @@ fn create_config() -> (Configuration, &'static str) {
.display_order(6)
.help("Timeout for Redis commands in milliseconds"),
),
)
.subcommand(
Command::new("distributed")
.about("Replicates CRDT-based counters across multiple Limitador servers")
.display_order(5)
.arg(
Arg::new("NAME")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
)
.arg(
Arg::new("CACHE_SIZE")
.long("cache")
.short('c')
.action(ArgAction::Set)
.value_parser(value_parser!(u64))
.display_order(4)
.help("Sets the size of the cache for 'qualified counters'"),
),
);

#[cfg(feature = "infinispan")]
Expand Down Expand Up @@ -750,6 +801,14 @@ fn create_config() -> (Configuration, &'static str) {
Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration {
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
}),
Some(("distributed", sub)) => {
StorageConfiguration::Distributed(DistributedStorageConfiguration {
name: sub.get_one::<String>("NAME").unwrap().to_owned(),
local: sub.get_one::<String>("LOCAL").unwrap().to_owned(),
broadcast: sub.get_one::<String>("BROADCAST").unwrap().to_owned(),
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
})
}
None => match storage_config_from_env() {
Ok(storage_cfg) => storage_cfg,
Err(_) => {
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ use std::collections::{HashMap, HashSet};
use crate::counter::Counter;
use crate::errors::LimitadorError;
use crate::limit::{Limit, Namespace};
use crate::storage::distributed::CrInMemoryStorage as InMemoryStorage;
use crate::storage::in_memory::InMemoryStorage;
use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage};

#[macro_use]
Expand Down
5 changes: 1 addition & 4 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) {
if self
.expiry
.update_if_expired(time_window.as_secs(), when)
{
if self.expiry.update_if_expired(time_window.as_secs(), when) {
self.value.store(increment, Ordering::SeqCst);
} else {
self.value.fetch_add(increment, Ordering::SeqCst);
Expand Down
14 changes: 3 additions & 11 deletions limitador/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::limit::{Limit, Namespace};
use crate::InMemoryStorage;
use async_trait::async_trait;
use std::collections::{HashMap, HashSet};
use std::env;
use std::sync::RwLock;
use thiserror::Error;

Expand All @@ -13,6 +12,8 @@ pub mod distributed;
pub mod in_memory;
pub mod wasm;

pub use crate::storage::distributed::CrInMemoryStorage as DistributedInMemoryStorage;

#[cfg(feature = "redis_storage")]
pub mod redis;

Expand Down Expand Up @@ -44,18 +45,9 @@ pub struct AsyncStorage {

impl Storage {
pub fn new(cache_size: u64) -> Self {
let local =
env::var("LOCAL").expect("We need the env var LOCAL to be set to your local <IP>:port");
let broadcast = env::var("BROADCAST")
.expect("We need the env var BROADCAST to be set to your broadcast <IP>:port");
Self {
limits: RwLock::new(HashMap::new()),
counters: Box::new(InMemoryStorage::new(
local.to_owned(),
cache_size,
local,
broadcast,
)),
counters: Box::new(InMemoryStorage::new(cache_size)),
}
}

Expand Down

0 comments on commit 33d9762

Please sign in to comment.