diff --git a/Cargo.lock b/Cargo.lock index 30a471da..2c588c6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2515,9 +2515,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.23.3" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" +checksum = "14c442de91f2a085154b1e1b374d5d5edf5bc49d2ebbfdf47e67edd6c2df568d" dependencies = [ "arc-swap", "async-trait", @@ -2531,7 +2531,7 @@ dependencies = [ "pin-project-lite", "ryu", "sha1_smol", - "socket2 0.4.9", + "socket2 0.5.4", "tokio", "tokio-native-tls", "tokio-retry", diff --git a/limitador-server/examples/limits.yaml b/limitador-server/examples/limits.yaml index f0ea815b..4178b413 100644 --- a/limitador-server/examples/limits.yaml +++ b/limitador-server/examples/limits.yaml @@ -14,4 +14,4 @@ conditions: - "req.method == 'POST'" variables: - - user_id + - user_id \ No newline at end of file diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index 53037cf2..c659d8cc 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -166,6 +166,7 @@ pub struct RedisStorageCacheConfiguration { pub max_ttl: u64, pub ttl_ratio: u64, pub max_counters: usize, + pub response_timeout: u64, } #[derive(PartialEq, Eq, Debug)] diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 6081b324..dcc44569 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -23,7 +23,7 @@ use limitador::storage::disk::DiskStorage; use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder}; use limitador::storage::redis::{ AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC, - DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, + DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, DEFAULT_RESPONSE_TIMEOUT_MS, DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; @@ -137,29 +137,17 @@ impl Limiter { ) -> CachedRedisStorage { // TODO: Not all the options are configurable via ENV. Add them as needed. - let mut cached_redis_storage = CachedRedisStorageBuilder::new(redis_url); + let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url) + .flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64)) + .max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl)) + .ttl_ratio_cached_counters(cache_cfg.ttl_ratio) + .max_cached_counters(cache_cfg.max_counters) + .response_timeout(Duration::from_millis(cache_cfg.response_timeout)); - if cache_cfg.flushing_period < 0 { - cached_redis_storage = cached_redis_storage.flushing_period(None) - } else { - cached_redis_storage = cached_redis_storage.flushing_period(Some( - Duration::from_millis(cache_cfg.flushing_period as u64), - )) - } - - cached_redis_storage = - cached_redis_storage.max_ttl_cached_counters(Duration::from_millis(cache_cfg.max_ttl)); - - cached_redis_storage = cached_redis_storage.ttl_ratio_cached_counters(cache_cfg.ttl_ratio); - cached_redis_storage = cached_redis_storage.max_cached_counters(cache_cfg.max_counters); - - match cached_redis_storage.build().await { - Ok(storage) => storage, - Err(err) => { - eprintln!("Failed to connect to Redis at {redis_url}: {err}"); - process::exit(1) - } - } + cached_redis_storage.build().await.unwrap_or_else(|err| { + eprintln!("Failed to connect to Redis at {redis_url}: {err}"); + process::exit(1) + }) } #[cfg(feature = "infinispan")] @@ -653,6 +641,15 @@ fn create_config() -> (Configuration, &'static str) { .default_value(leak(DEFAULT_MAX_CACHED_COUNTERS)) .display_order(5) .help("Maximum amount of counters cached"), + ) + .arg( + Arg::new("timeout") + .long("response-timeout") + .action(ArgAction::Set) + .value_parser(clap::value_parser!(u64)) + .default_value(leak(DEFAULT_RESPONSE_TIMEOUT_MS)) + .display_order(6) + .help("Timeout for Redis commands in milliseconds"), ), ); @@ -760,6 +757,7 @@ fn create_config() -> (Configuration, &'static str) { max_ttl: *sub.get_one("TTL").unwrap(), ttl_ratio: *sub.get_one("ratio").unwrap(), max_counters: *sub.get_one("max").unwrap(), + response_timeout: *sub.get_one("timeout").unwrap(), }), }), #[cfg(feature = "infinispan")] @@ -851,6 +849,7 @@ fn storage_config_from_env() -> Result { .parse() .expect("Expected an u64"), max_counters: DEFAULT_MAX_CACHED_COUNTERS, + response_timeout: DEFAULT_RESPONSE_TIMEOUT_MS, }) } else { None diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 0f2336b7..6cbf3db4 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -38,7 +38,7 @@ tracing = "0.1.40" # Optional dependencies rocksdb = { version = "0.21.0", optional = true, features = ["multi-threaded-cf"] } -redis = { version = "0.23.1", optional = true, features = [ +redis = { version = "0.25.1", optional = true, features = [ "connection-manager", "tokio-comp", "tls-native-tls", diff --git a/limitador/src/storage/disk/expiring_value.rs b/limitador/src/storage/disk/expiring_value.rs index c01de782..b7c0cc81 100644 --- a/limitador/src/storage/disk/expiring_value.rs +++ b/limitador/src/storage/disk/expiring_value.rs @@ -98,6 +98,7 @@ impl From for StorageErr { fn from(_: TryFromSliceError) -> Self { Self { msg: "Corrupted byte sequence while reading 8 bytes for 64-bit integer".to_owned(), + transient: false, } } } diff --git a/limitador/src/storage/disk/mod.rs b/limitador/src/storage/disk/mod.rs index 011f6e2a..1203313a 100644 --- a/limitador/src/storage/disk/mod.rs +++ b/limitador/src/storage/disk/mod.rs @@ -1,4 +1,5 @@ use crate::storage::StorageErr; +use rocksdb::ErrorKind; mod expiring_value; mod rocksdb_storage; @@ -9,6 +10,7 @@ impl From for StorageErr { fn from(error: rocksdb::Error) -> Self { Self { msg: format!("Underlying storage error: {error}"), + transient: error.kind() == ErrorKind::TimedOut || error.kind() == ErrorKind::TryAgain, } } } diff --git a/limitador/src/storage/infinispan/dist_lock.rs b/limitador/src/storage/infinispan/dist_lock.rs index aae86ade..f35d700e 100644 --- a/limitador/src/storage/infinispan/dist_lock.rs +++ b/limitador/src/storage/infinispan/dist_lock.rs @@ -49,6 +49,7 @@ pub async fn lock( if retries >= RETRIES { return Err(StorageErr { msg: "can't acquire lock".into(), + transient: false, }); } diff --git a/limitador/src/storage/infinispan/mod.rs b/limitador/src/storage/infinispan/mod.rs index 549bc559..8289415e 100644 --- a/limitador/src/storage/infinispan/mod.rs +++ b/limitador/src/storage/infinispan/mod.rs @@ -12,13 +12,19 @@ pub use infinispan_storage::InfinispanStorageBuilder; impl From for StorageErr { fn from(e: reqwest::Error) -> Self { - Self { msg: e.to_string() } + Self { + msg: e.to_string(), + transient: false, + } } } impl From for StorageErr { fn from(e: InfinispanError) -> Self { - Self { msg: e.to_string() } + Self { + msg: e.to_string(), + transient: false, + } } } diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index c6cb1c92..3e941d4a 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -299,10 +299,15 @@ pub trait AsyncCounterStorage: Sync + Send { #[error("error while accessing the limits storage: {msg}")] pub struct StorageErr { msg: String, + transient: bool, } impl StorageErr { pub fn msg(&self) -> &str { &self.msg } + + pub fn is_transient(&self) -> bool { + self.transient + } } diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 0ac73b98..01f8a146 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -8,7 +8,7 @@ use ttl_cache::TtlCache; pub struct CountersCache { max_ttl_cached_counters: Duration, - ttl_ratio_cached_counters: u64, + pub ttl_ratio_cached_counters: u64, cache: TtlCache, } @@ -204,7 +204,7 @@ mod tests { } #[test] - fn insert_saves_0_when_redis_val_is_none() { + fn insert_saves_zero_when_redis_val_is_none() { let max_val = 10; let mut values = HashMap::new(); values.insert("app_id".to_string(), "1".to_string()); diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index 2c2f3d1f..487b5cb9 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -11,6 +11,7 @@ pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1; pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000; pub const DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC: u64 = 5; pub const DEFAULT_TTL_RATIO_CACHED_COUNTERS: u64 = 10; +pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350; use crate::counter::Counter; use crate::storage::{Authorization, StorageErr}; @@ -21,13 +22,10 @@ pub use redis_sync::RedisStorage; impl From for StorageErr { fn from(e: RedisError) -> Self { - Self { msg: e.to_string() } - } -} - -impl From<::r2d2::Error> for StorageErr { - fn from(e: ::r2d2::Error) -> Self { - Self { msg: e.to_string() } + Self { + msg: e.to_string(), + transient: e.is_timeout() || e.is_connection_dropped() || e.is_cluster_error(), + } } } diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 23782254..e59b1e24 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -241,6 +241,14 @@ impl AsyncRedisStorage { Self { conn_manager } } + pub async fn is_alive(&self) -> bool { + self.conn_manager + .clone() + .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) + .await + .is_ok() + } + async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index a3e329c3..f0123bf3 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -7,7 +7,7 @@ use crate::storage::redis::redis_async::AsyncRedisStorage; use crate::storage::redis::scripts::VALUES_AND_TTLS; use crate::storage::redis::{ DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC, - DEFAULT_TTL_RATIO_CACHED_COUNTERS, + DEFAULT_RESPONSE_TIMEOUT_MS, DEFAULT_TTL_RATIO_CACHED_COUNTERS, }; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; @@ -15,8 +15,10 @@ use redis::aio::ConnectionManager; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; +use tracing::{error, warn}; // This is just a first version. // @@ -41,7 +43,7 @@ pub struct CachedRedisStorage { batcher_counter_updates: Arc>>, async_redis_storage: AsyncRedisStorage, redis_conn_manager: ConnectionManager, - batching_is_enabled: bool, + partitioned: Arc, } #[async_trait] @@ -72,8 +74,6 @@ impl AsyncCounterStorage for CachedRedisStorage { load_counters: bool, _counter_access: CounterAccess<'a>, ) -> Result { - let mut con = self.redis_conn_manager.clone(); - let mut not_cached: Vec<&mut Counter> = vec![]; let mut first_limited = None; @@ -109,8 +109,18 @@ impl AsyncCounterStorage for CachedRedisStorage { if !not_cached.is_empty() { let time_start_get_ttl = Instant::now(); - let (counter_vals, counter_ttls_msecs) = - Self::values_with_ttls(¬_cached, &mut con).await?; + let (counter_vals, counter_ttls_msecs) = if self.is_partitioned() { + self.fallback_vals_ttls(¬_cached) + } else { + self.values_with_ttls(¬_cached).await.or_else(|err| { + if err.is_transient() { + self.partitioned(true); + Ok(self.fallback_vals_ttls(¬_cached)) + } else { + Err(err) + } + })? + }; // Some time could have passed from the moment we got the TTL from Redis. // This margin is not exact, because we don't know exactly the @@ -161,22 +171,9 @@ impl AsyncCounterStorage for CachedRedisStorage { } // Batch or update depending on configuration - if self.batching_is_enabled { - let mut batcher = self.batcher_counter_updates.lock().unwrap(); - for counter in counters.iter() { - match batcher.get_mut(counter) { - Some(val) => { - *val += delta; - } - None => { - batcher.insert(counter.clone(), delta); - } - } - } - } else { - for counter in counters.iter() { - self.update_counter(counter, delta).await? - } + let mut batcher = self.batcher_counter_updates.lock().unwrap(); + for counter in counters.iter() { + Self::batch_counter(delta, &mut batcher, counter); } Ok(Authorization::Ok) @@ -202,49 +199,74 @@ impl CachedRedisStorage { pub async fn new(redis_url: &str) -> Result { Self::new_with_options( redis_url, - Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), DEFAULT_MAX_CACHED_COUNTERS, Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), DEFAULT_TTL_RATIO_CACHED_COUNTERS, + Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), ) .await } async fn new_with_options( redis_url: &str, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, + response_timeout: Duration, ) -> Result { let info = ConnectionInfo::from_str(redis_url)?; - let redis_conn_manager = ConnectionManager::new( + let redis_conn_manager = ConnectionManager::new_with_backoff_and_timeouts( redis::Client::open(info) .expect("This couldn't fail in the past, yet now it did somehow!"), + 2, + 100, + 6, + response_timeout, + Duration::from_secs(5), ) .await?; + let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); let storage = async_redis_storage.clone(); - let batcher = Arc::new(Mutex::new(Default::default())); - if let Some(flushing_period) = flushing_period { - let batcher_flusher = batcher.clone(); - let mut interval = tokio::time::interval(flushing_period); - tokio::spawn(async move { - loop { + let batcher: Arc>> = Arc::new(Mutex::new(Default::default())); + let p = Arc::clone(&partitioned); + let batcher_flusher = batcher.clone(); + let mut interval = tokio::time::interval(flushing_period); + tokio::spawn(async move { + loop { + if p.load(Ordering::Acquire) { + if storage.is_alive().await { + warn!("Partition to Redis resolved!"); + p.store(false, Ordering::Release); + } + } else { let counters = { let mut batch = batcher_flusher.lock().unwrap(); std::mem::take(&mut *batch) }; for (counter, delta) in counters { - storage.update_counter(&counter, delta).await.unwrap(); + storage + .update_counter(&counter, delta) + .await + .or_else(|err| { + if err.is_transient() { + p.store(true, Ordering::Release); + Ok(()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); } - interval.tick().await; } - }); - } + interval.tick().await; + } + }); let cached_counters = CountersCacheBuilder::new() .max_cached_counters(max_cached_counters) @@ -257,14 +279,39 @@ impl CachedRedisStorage { batcher_counter_updates: batcher, redis_conn_manager, async_redis_storage, - batching_is_enabled: flushing_period.is_some(), + partitioned, }) } + fn is_partitioned(&self) -> bool { + self.partitioned.load(Ordering::Acquire) + } + + fn partitioned(&self, partition: bool) -> bool { + if partition { + error!("Partition to Redis detected!") + } + self.partitioned + .compare_exchange(!partition, partition, Ordering::Release, Ordering::Acquire) + .is_ok() + } + + fn fallback_vals_ttls(&self, counters: &Vec<&mut Counter>) -> (Vec>, Vec) { + let mut vals = Vec::with_capacity(counters.len()); + let mut ttls = Vec::with_capacity(counters.len()); + for counter in counters { + vals.push(Some(0i64)); + ttls.push(counter.limit().seconds() as i64 * 1000); + } + (vals, ttls) + } + async fn values_with_ttls( + &self, counters: &[&mut Counter], - redis_con: &mut ConnectionManager, ) -> Result<(Vec>, Vec), StorageErr> { + let mut redis_con = self.redis_conn_manager.clone(); + let counter_keys: Vec = counters .iter() .map(|counter| key_for_counter(counter)) @@ -278,7 +325,7 @@ impl CachedRedisStorage { } let script_res: Vec> = script_invocation - .invoke_async::<_, _>(&mut redis_con.clone()) + .invoke_async::<_, _>(&mut redis_con) .await?; let mut counter_vals: Vec> = vec![]; @@ -291,28 +338,45 @@ impl CachedRedisStorage { Ok((counter_vals, counter_ttls_msecs)) } + + fn batch_counter( + delta: i64, + batcher: &mut MutexGuard>, + counter: &Counter, + ) { + match batcher.get_mut(counter) { + Some(val) => { + *val += delta; + } + None => { + batcher.insert(counter.clone(), delta); + } + } + } } pub struct CachedRedisStorageBuilder { redis_url: String, - flushing_period: Option, + flushing_period: Duration, max_cached_counters: usize, max_ttl_cached_counters: Duration, ttl_ratio_cached_counters: u64, + response_timeout: Duration, } impl CachedRedisStorageBuilder { pub fn new(redis_url: &str) -> Self { Self { redis_url: redis_url.to_string(), - flushing_period: Some(Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC)), + flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC), max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS, max_ttl_cached_counters: Duration::from_secs(DEFAULT_MAX_TTL_CACHED_COUNTERS_SEC), ttl_ratio_cached_counters: DEFAULT_TTL_RATIO_CACHED_COUNTERS, + response_timeout: Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS), } } - pub fn flushing_period(mut self, flushing_period: Option) -> Self { + pub fn flushing_period(mut self, flushing_period: Duration) -> Self { self.flushing_period = flushing_period; self } @@ -332,6 +396,11 @@ impl CachedRedisStorageBuilder { self } + pub fn response_timeout(mut self, response_timeout: Duration) -> Self { + self.response_timeout = response_timeout; + self + } + pub async fn build(self) -> Result { CachedRedisStorage::new_with_options( &self.redis_url, @@ -339,6 +408,7 @@ impl CachedRedisStorageBuilder { self.max_cached_counters, self.max_ttl_cached_counters, self.ttl_ratio_cached_counters, + self.response_timeout, ) .await } diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index adc9ba3a..cf7088f6 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -222,6 +222,15 @@ impl Default for RedisStorage { } } +impl From<::r2d2::Error> for StorageErr { + fn from(e: ::r2d2::Error) -> Self { + Self { + msg: e.to_string(), + transient: false, + } + } +} + #[cfg(test)] mod test { use crate::storage::redis::RedisStorage; diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index a6ccf1a5..5cdb6b88 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -59,7 +59,7 @@ macro_rules! test_with_all_storage_impls { #[serial] async fn [<$function _with_async_redis_and_local_cache>]() { let storage_builder = CachedRedisStorageBuilder::new("redis://127.0.0.1:6379"). - flushing_period(Some(Duration::from_millis(200))). + flushing_period(Duration::from_millis(200)). max_ttl_cached_counters(Duration::from_secs(3600)). ttl_ratio_cached_counters(1). max_cached_counters(10000);