From aca220abf6216cca906a903236e0e5e358ad4656 Mon Sep 17 00:00:00 2001 From: boqiu <82121246@qq.com> Date: Thu, 12 Dec 2024 17:14:05 +0800 Subject: [PATCH] Supports rate limit for pubsub message --- node/network/src/rpc/mod.rs | 2 +- node/network/src/rpc/rate_limiter.rs | 32 +++++++----- node/router/src/lib.rs | 1 + node/router/src/libp2p_event_handler.rs | 2 +- node/router/src/rate_limit.rs | 68 +++++++++++++++++++++++++ node/router/src/service.rs | 47 ++++++++++++++--- node/src/client/builder.rs | 2 +- 7 files changed, 131 insertions(+), 23 deletions(-) create mode 100644 node/router/src/rate_limit.rs diff --git a/node/network/src/rpc/mod.rs b/node/network/src/rpc/mod.rs index d72b0da6..6a973f34 100644 --- a/node/network/src/rpc/mod.rs +++ b/node/network/src/rpc/mod.rs @@ -33,7 +33,7 @@ mod handler; pub mod methods; mod outbound; mod protocol; -mod rate_limiter; +pub mod rate_limiter; /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} diff --git a/node/network/src/rpc/rate_limiter.rs b/node/network/src/rpc/rate_limiter.rs index 75609581..bf3dc11b 100644 --- a/node/network/src/rpc/rate_limiter.rs +++ b/node/network/src/rpc/rate_limiter.rs @@ -54,6 +54,22 @@ pub struct Quota { max_tokens: u64, } +impl Quota { + pub fn one_every(period: Duration) -> Self { + Self { + replenish_all_every: period, + max_tokens: 1, + } + } + + pub fn n_every(n: u64, period: Duration) -> Self { + Self { + replenish_all_every: period, + max_tokens: n, + } + } +} + /// Manages rate limiting of requests per peer, with differentiated rates per protocol. pub struct RPCRateLimiter { /// Interval to prune peers for which their timer ran out. @@ -122,24 +138,12 @@ impl RPCRateLimiterBuilder { /// Allow one token every `time_period` to be used for this `protocol`. /// This produces a hard limit. pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self { - self.set_quota( - protocol, - Quota { - replenish_all_every: time_period, - max_tokens: 1, - }, - ) + self.set_quota(protocol, Quota::one_every(time_period)) } /// Allow `n` tokens to be use used every `time_period` for this `protocol`. pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self { - self.set_quota( - protocol, - Quota { - max_tokens: n, - replenish_all_every: time_period, - }, - ) + self.set_quota(protocol, Quota::n_every(n, time_period)) } pub fn build(self) -> Result { diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 4bb746c9..bcac33ef 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -5,6 +5,7 @@ mod batcher; mod libp2p_event_handler; mod metrics; mod peer_manager; +mod rate_limit; mod service; use duration_str::deserialize_duration; diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index f37f972d..c6251947 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -174,7 +174,7 @@ impl Libp2pEventHandler { } } - fn send_to_network(&self, message: NetworkMessage) { + pub fn send_to_network(&self, message: NetworkMessage) { self.network_send.send(message).unwrap_or_else(|err| { warn!(%err, "Could not send message to the network service"); }); diff --git a/node/router/src/rate_limit.rs b/node/router/src/rate_limit.rs new file mode 100644 index 00000000..ccdf26f0 --- /dev/null +++ b/node/router/src/rate_limit.rs @@ -0,0 +1,68 @@ +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +use network::{ + rpc::rate_limiter::{Limiter, Quota, RateLimitedErr}, + types::GossipKind, + PeerId, PubsubMessage, +}; + +pub struct PubsubRateLimiter { + init_time: Instant, + limiters: Limiter, + limiters_by_topic: HashMap>, +} + +impl PubsubRateLimiter { + pub fn new(n: u64, period: Duration) -> Result { + Ok(Self { + init_time: Instant::now(), + limiters: Limiter::from_quota(Quota::n_every(n, period))?, + limiters_by_topic: Default::default(), + }) + } + + pub fn limit_by_topic( + mut self, + kind: GossipKind, + n: u64, + period: Duration, + ) -> Result { + let limiter = Limiter::from_quota(Quota::n_every(n, period))?; + self.limiters_by_topic.insert(kind, limiter); + Ok(self) + } + + pub fn allows( + &mut self, + peer_id: &PeerId, + msg: &PubsubMessage, + ) -> Result<(), (Option, RateLimitedErr)> { + let time_since_start = self.init_time.elapsed(); + + if let Err(err) = self.limiters.allows(time_since_start, peer_id, 1) { + return Err((None, err)); + } + + let kind = msg.kind(); + if let Some(limiter) = self.limiters_by_topic.get_mut(&kind) { + if let Err(err) = limiter.allows(time_since_start, peer_id, 1) { + return Err((Some(kind), err)); + } + } + + Ok(()) + } + + pub fn prune(&mut self) { + let time_since_start = self.init_time.elapsed(); + + self.limiters.prune(time_since_start); + + for limiter in self.limiters_by_topic.values_mut() { + limiter.prune(time_since_start); + } + } +} diff --git a/node/router/src/service.rs b/node/router/src/service.rs index f9d4160e..8bd2ed2d 100644 --- a/node/router/src/service.rs +++ b/node/router/src/service.rs @@ -1,4 +1,5 @@ use crate::metrics; +use crate::rate_limit::PubsubRateLimiter; use crate::Config; use crate::{libp2p_event_handler::Libp2pEventHandler, peer_manager::PeerManager}; use chunk_pool::ChunkPoolMessage; @@ -6,14 +7,16 @@ use file_location_cache::FileLocationCache; use futures::{channel::mpsc::Sender, prelude::*}; use miner::MinerMessage; use network::rpc::GoodbyeReason; -use network::PeerId; +use network::types::GossipKind; use network::{ BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm, }; +use network::{MessageAcceptance, PeerAction, PeerId, ReportSource}; use pruner::PrunerMessage; use shared_types::ShardedFile; use std::sync::Arc; +use std::time::Duration; use storage::log_store::Store as LogStore; use storage_async::Store; use sync::{SyncMessage, SyncSender}; @@ -49,6 +52,8 @@ pub struct RouterService { upnp_mappings: (Option, Option), store: Arc, + + pubsub_rate_limiter: PubsubRateLimiter, } impl RouterService { @@ -67,9 +72,19 @@ impl RouterService { file_location_cache: Arc, local_keypair: Keypair, config: Config, - ) { + ) -> Result<(), String> { let peers = Arc::new(RwLock::new(PeerManager::new(config.clone()))); + let pubsub_rate_limiter = PubsubRateLimiter::new(100, Duration::from_secs(10))? + .limit_by_topic(GossipKind::Example, 10, Duration::from_secs(10))? + .limit_by_topic(GossipKind::NewFile, 50, Duration::from_secs(10))? + .limit_by_topic(GossipKind::AskFile, 50, Duration::from_secs(10))? + .limit_by_topic(GossipKind::FindFile, 10, Duration::from_secs(10))? + .limit_by_topic(GossipKind::AnnounceFile, 10, Duration::from_secs(10))? + .limit_by_topic(GossipKind::FindChunks, 10, Duration::from_secs(10))? + .limit_by_topic(GossipKind::AnnounceChunks, 10, Duration::from_secs(10))? + .limit_by_topic(GossipKind::AnnounceShardConfig, 50, Duration::from_secs(10))?; + // create the network service and spawn the task let router = RouterService { config: config.clone(), @@ -91,17 +106,21 @@ impl RouterService { ), upnp_mappings: (None, None), store, + pubsub_rate_limiter, }; // spawn service let shutdown_sender = executor.shutdown_sender(); executor.spawn(router.main(shutdown_sender), "router"); + + Ok(()) } async fn main(mut self, mut shutdown_sender: Sender) { let mut heartbeat_service = interval(self.config.heartbeat_interval); let mut heartbeat_batcher = interval(self.config.batcher_timeout); + let mut heartbeat_rate_limiter = interval(Duration::from_secs(30)); loop { tokio::select! { @@ -118,6 +137,8 @@ impl RouterService { // heartbeat for expire file batcher _ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await, + + _ = heartbeat_rate_limiter.tick() => self.pubsub_rate_limiter.prune(), } } } @@ -192,10 +213,24 @@ impl RouterService { message, .. } => { - let result = self - .libp2p_event_handler - .on_pubsub_message(propagation_source, source, &id, message) - .await; + let result = if let Err((rate_limit_kind, _)) = self + .pubsub_rate_limiter + .allows(&propagation_source, &message) + { + warn!(%propagation_source, kind=?message.kind(), ?rate_limit_kind, "Pubsub message rate limited"); + self.libp2p_event_handler + .send_to_network(NetworkMessage::ReportPeer { + peer_id: propagation_source, + action: PeerAction::LowToleranceError, + source: ReportSource::Gossipsub, + msg: "Pubsub message rate limited", + }); + MessageAcceptance::Reject + } else { + self.libp2p_event_handler + .on_pubsub_message(propagation_source, source, &id, message) + .await + }; self.libp2p .swarm diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 25442387..0298083c 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -265,7 +265,7 @@ impl ClientBuilder { file_location_cache, network.keypair.clone(), router_config, - ); + )?; Ok(self) }