diff --git a/Cargo.lock b/Cargo.lock index 1c361111cea12e..2362c0e8de3801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2370,6 +2370,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -2542,6 +2548,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if 1.0.0", + "dashmap", + "futures 0.3.30", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.3", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.3.26" @@ -3573,6 +3599,12 @@ dependencies = [ "memoffset 0.9.1", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.0.0" @@ -3584,6 +3616,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -4120,9 +4158,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.3.2" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc59d1bcc64fc5d021d67521f818db868368028108d37f0e98d74e33f68297b5" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" [[package]] name = "ppv-lite86" @@ -4344,6 +4382,21 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -4539,6 +4592,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -7585,6 +7647,7 @@ dependencies = [ "dashmap", "futures 0.3.30", "futures-util", + "governor", "histogram", "indexmap 2.3.0", "itertools 0.12.1", @@ -8227,6 +8290,15 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spl-associated-token-account" version = "4.0.0" diff --git a/Cargo.toml b/Cargo.toml index 482ef6a0c4e21e..ae808b2ae00012 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -237,6 +237,7 @@ generic-array = { version = "0.14.7", default-features = false } gethostname = "0.2.3" getrandom = "0.2.10" goauth = "0.13.1" +governor = "0.6.3" hex = "0.4.3" hidapi = { version = "2.6.3", default-features = false } histogram = "0.6.9" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 251f3565969319..d83fab753de125 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -1794,6 +1794,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -1903,6 +1909,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if 1.0.0", + "dashmap", + "futures 0.3.30", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.2", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.3.26" @@ -2942,6 +2968,12 @@ dependencies = [ "memoffset 0.9.0", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -2952,6 +2984,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -3424,9 +3462,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.3.2" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc59d1bcc64fc5d021d67521f818db868368028108d37f0e98d74e33f68297b5" +checksum = "da544ee218f0d287a911e9c99a39a8c9bc8fcad3cb8db5959940044ecfc67265" [[package]] name = "powerfmt" @@ -3626,6 +3664,21 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quinn" version = "0.10.2" @@ -3763,6 +3816,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "rayon" version = "1.10.0" @@ -6331,6 +6393,7 @@ dependencies = [ "dashmap", "futures 0.3.30", "futures-util", + "governor", "histogram", "indexmap 2.3.0", "itertools 0.12.1", @@ -6775,6 +6838,15 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spl-associated-token-account" version = "4.0.0" diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index b6051bc604451f..89ce80c910c80f 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -16,6 +16,7 @@ crossbeam-channel = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index b14b88f6ee3af0..fa781f8d6e0d44 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -1,25 +1,27 @@ use { - crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter}, - std::{net::IpAddr, time::Duration}, + governor::{DefaultDirectRateLimiter, DefaultKeyedRateLimiter, Quota, RateLimiter}, + std::{net::IpAddr, num::NonZeroU32}, }; pub struct ConnectionRateLimiter { - limiter: KeyedRateLimiter, + limiter: DefaultKeyedRateLimiter, } impl ConnectionRateLimiter { /// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for /// less frequent connections. pub fn new(limit_per_minute: u64) -> Self { + let quota = + Quota::per_minute(NonZeroU32::new(u32::try_from(limit_per_minute).unwrap()).unwrap()); Self { - limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)), + limiter: DefaultKeyedRateLimiter::keyed(quota), } } /// Check if the connection from the said `ip` is allowed. pub fn is_allowed(&self, ip: &IpAddr) -> bool { // Acquire a permit from the rate limiter for the given IP address - if self.limiter.check_and_update(*ip) { + if self.limiter.check_key(ip).is_ok() { debug!("Request from IP {:?} allowed", ip); true // Request allowed } else { @@ -48,20 +50,26 @@ impl ConnectionRateLimiter { /// Connection rate limiter for enforcing connection rates from /// all clients. pub struct TotalConnectionRateLimiter { - limiter: RateLimiter, + limiter: DefaultDirectRateLimiter, } impl TotalConnectionRateLimiter { /// Create a new rate limiter. The rate is specified as the count per second. pub fn new(limit_per_second: u64) -> Self { + let quota = + Quota::per_second(NonZeroU32::new(u32::try_from(limit_per_second).unwrap()).unwrap()); Self { - limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)), + limiter: RateLimiter::direct(quota), } } /// Check if a connection is allowed. - pub fn is_allowed(&mut self) -> bool { - self.limiter.check_and_update() + pub fn is_allowed(&self) -> bool { + if self.limiter.check().is_ok() { + true // Request allowed + } else { + false // Request blocked + } } } @@ -71,7 +79,7 @@ pub mod test { #[tokio::test] async fn test_total_connection_rate_limiter() { - let mut limiter = TotalConnectionRateLimiter::new(2); + let limiter = TotalConnectionRateLimiter::new(2); assert!(limiter.is_allowed()); assert!(limiter.is_allowed()); assert!(!limiter.is_allowed()); diff --git a/streamer/src/nonblocking/keyed_rate_limiter.rs b/streamer/src/nonblocking/keyed_rate_limiter.rs deleted file mode 100644 index c73682c8add542..00000000000000 --- a/streamer/src/nonblocking/keyed_rate_limiter.rs +++ /dev/null @@ -1,103 +0,0 @@ -use { - crate::nonblocking::rate_limiter::RateLimiter, - dashmap::DashMap, - std::{hash::Hash, time::Duration}, -}; - -pub struct KeyedRateLimiter { - limiters: DashMap, - interval: Duration, - limit: u64, -} - -impl KeyedRateLimiter -where - K: Eq + Hash, -{ - /// Create a keyed rate limiter with `limit` count with a rate limit `interval` - pub fn new(limit: u64, interval: Duration) -> Self { - Self { - limiters: DashMap::default(), - interval, - limit, - } - } - - /// Check if the connection from the said `key` is allowed to pass through the rate limiter. - /// When it is allowed, the rate limiter state is updated to reflect it has been - /// allowed. For a unique request, the caller should call it only once when it is allowed. - pub fn check_and_update(&self, key: K) -> bool { - let allowed = match self.limiters.entry(key) { - dashmap::mapref::entry::Entry::Occupied(mut entry) => { - let limiter = entry.get_mut(); - limiter.check_and_update() - } - dashmap::mapref::entry::Entry::Vacant(entry) => entry - .insert(RateLimiter::new(self.limit, self.interval)) - .value_mut() - .check_and_update(), - }; - allowed - } - - /// retain only keys whose rate-limiting start date is within the set up interval. - /// Otherwise drop them as inactive - pub fn retain_recent(&self) { - let now = tokio::time::Instant::now(); - self.limiters - .retain(|_key, limiter| now.duration_since(*limiter.start_instant()) <= self.interval); - } - - /// Returns the number of "live" keys in the rate limiter. - pub fn len(&self) -> usize { - self.limiters.len() - } - - /// Returns `true` if the rate limiter has no keys in it. - pub fn is_empty(&self) -> bool { - self.limiters.is_empty() - } -} - -#[cfg(test)] -pub mod test { - use {super::*, tokio::time::sleep}; - - #[allow(clippy::len_zero)] - #[tokio::test] - async fn test_rate_limiter() { - let limiter = KeyedRateLimiter::::new(2, Duration::from_millis(100)); - assert!(limiter.len() == 0); - assert!(limiter.is_empty()); - assert!(limiter.check_and_update(1)); - assert!(limiter.check_and_update(1)); - assert!(!limiter.check_and_update(1)); - assert!(limiter.len() == 1); - assert!(limiter.check_and_update(2)); - assert!(limiter.check_and_update(2)); - assert!(!limiter.check_and_update(2)); - assert!(limiter.len() == 2); - - // sleep 150 ms, the rate-limiting parameters should have been reset. - sleep(Duration::from_millis(150)).await; - assert!(limiter.len() == 2); - - assert!(limiter.check_and_update(1)); - assert!(limiter.check_and_update(1)); - assert!(!limiter.check_and_update(1)); - - assert!(limiter.check_and_update(2)); - assert!(limiter.check_and_update(2)); - assert!(!limiter.check_and_update(2)); - assert!(limiter.len() == 2); - - // sleep another 150 and clean outdatated, key 2 will be removed - sleep(Duration::from_millis(150)).await; - assert!(limiter.check_and_update(1)); - assert!(limiter.check_and_update(1)); - assert!(!limiter.check_and_update(1)); - - limiter.retain_recent(); - assert!(limiter.len() == 1); - } -} diff --git a/streamer/src/nonblocking/mod.rs b/streamer/src/nonblocking/mod.rs index d7205e42468235..61bd021ae0651e 100644 --- a/streamer/src/nonblocking/mod.rs +++ b/streamer/src/nonblocking/mod.rs @@ -1,7 +1,5 @@ pub mod connection_rate_limiter; -pub mod keyed_rate_limiter; pub mod quic; -pub mod rate_limiter; pub mod recvmmsg; pub mod sendmmsg; mod stream_throttle; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 4d5c2326f5a0a6..f1b0a5a7efd5ed 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -253,7 +253,7 @@ async fn run_server( coalesce: Duration, ) { let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min); - let mut overall_connection_rate_limiter = + let overall_connection_rate_limiter = TotalConnectionRateLimiter::new(TOTAL_CONNECTIONS_PER_SECOND); const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); @@ -340,9 +340,9 @@ async fn run_server( stats .connection_rate_limiter_length .store(rate_limiter.len(), Ordering::Relaxed); - info!("Got a connection {remote_address:?}"); + debug!("Got a connection {remote_address:?}"); if !rate_limiter.is_allowed(&remote_address.ip()) { - info!( + debug!( "Reject connection from {:?} -- rate limiting exceeded", remote_address ); @@ -351,6 +351,7 @@ async fn run_server( .fetch_add(1, Ordering::Relaxed); continue; } + stats .outstanding_incoming_connection_attempts .fetch_add(1, Ordering::Relaxed); diff --git a/streamer/src/nonblocking/rate_limiter.rs b/streamer/src/nonblocking/rate_limiter.rs deleted file mode 100644 index 96ce89391fa1ac..00000000000000 --- a/streamer/src/nonblocking/rate_limiter.rs +++ /dev/null @@ -1,74 +0,0 @@ -use {std::time::Duration, tokio::time::Instant}; - -#[derive(Debug)] -pub struct RateLimiter { - /// count of requests in an interval - pub(crate) count: u64, - - /// Rate limit start time - start_instant: Instant, - interval: Duration, - limit: u64, -} - -/// A naive rate limiter, to be replaced by using governor which has more even -/// distribution of requests passing through using GCRA algorithm. -impl RateLimiter { - pub fn new(limit: u64, interval: Duration) -> Self { - Self { - count: 0, - start_instant: Instant::now(), - interval, - limit, - } - } - - /// Reset the counter and start instant if needed. - pub fn reset_params_if_needed(&mut self) { - if Instant::now().duration_since(self.start_instant) > self.interval { - self.start_instant = Instant::now(); - self.count = 0; - } - } - - /// Check if a single request should be allowed to pass through the rate limiter - /// When it is allowed, the rate limiter state is updated to reflect it has been - /// allowed. For a unique request, the caller should call it only once when it is allowed. - pub fn check_and_update(&mut self) -> bool { - self.reset_params_if_needed(); - if self.count >= self.limit { - return false; - } - - self.count = self.count.saturating_add(1); - true - } - - /// Return the start instant for the current rate-limiting interval. - pub fn start_instant(&self) -> &Instant { - &self.start_instant - } -} - -#[cfg(test)] -pub mod test { - use {super::*, tokio::time::sleep}; - - #[tokio::test] - async fn test_rate_limiter() { - let mut limiter = RateLimiter::new(2, Duration::from_millis(100)); - assert!(limiter.check_and_update()); - assert!(limiter.check_and_update()); - assert!(!limiter.check_and_update()); - let instant1 = *limiter.start_instant(); - - // sleep 150 ms, the rate-limiting parameters should have been reset. - sleep(Duration::from_millis(150)).await; - assert!(limiter.check_and_update()); - assert!(limiter.check_and_update()); - assert!(!limiter.check_and_update()); - - let instant2 = *limiter.start_instant(); - assert!(instant2 > instant1); - } -}