Skip to content

Commit

Permalink
Supports rate limit for pubsub message
Browse files Browse the repository at this point in the history
  • Loading branch information
boqiu committed Dec 12, 2024
1 parent 349e13e commit aca220a
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 23 deletions.
2 changes: 1 addition & 1 deletion node/network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod handler;
pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
pub mod rate_limiter;

/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
Expand Down
32 changes: 18 additions & 14 deletions node/network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ pub struct Quota {
max_tokens: u64,
}

impl Quota {
pub fn one_every(period: Duration) -> Self {
Self {
replenish_all_every: period,
max_tokens: 1,
}
}

pub fn n_every(n: u64, period: Duration) -> Self {
Self {
replenish_all_every: period,
max_tokens: n,
}
}
}

/// Manages rate limiting of requests per peer, with differentiated rates per protocol.
pub struct RPCRateLimiter {
/// Interval to prune peers for which their timer ran out.
Expand Down Expand Up @@ -122,24 +138,12 @@ impl RPCRateLimiterBuilder {
/// Allow one token every `time_period` to be used for this `protocol`.
/// This produces a hard limit.
pub fn one_every(self, protocol: Protocol, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
replenish_all_every: time_period,
max_tokens: 1,
},
)
self.set_quota(protocol, Quota::one_every(time_period))
}

/// Allow `n` tokens to be use used every `time_period` for this `protocol`.
pub fn n_every(self, protocol: Protocol, n: u64, time_period: Duration) -> Self {
self.set_quota(
protocol,
Quota {
max_tokens: n,
replenish_all_every: time_period,
},
)
self.set_quota(protocol, Quota::n_every(n, time_period))
}

pub fn build(self) -> Result<RPCRateLimiter, &'static str> {
Expand Down
1 change: 1 addition & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod batcher;
mod libp2p_event_handler;
mod metrics;
mod peer_manager;
mod rate_limit;
mod service;

use duration_str::deserialize_duration;
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/libp2p_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl Libp2pEventHandler {
}
}

fn send_to_network(&self, message: NetworkMessage) {
pub fn send_to_network(&self, message: NetworkMessage) {
self.network_send.send(message).unwrap_or_else(|err| {
warn!(%err, "Could not send message to the network service");
});
Expand Down
68 changes: 68 additions & 0 deletions node/router/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::{
collections::HashMap,
time::{Duration, Instant},
};

use network::{
rpc::rate_limiter::{Limiter, Quota, RateLimitedErr},
types::GossipKind,
PeerId, PubsubMessage,
};

pub struct PubsubRateLimiter {
init_time: Instant,
limiters: Limiter<PeerId>,
limiters_by_topic: HashMap<GossipKind, Limiter<PeerId>>,
}

impl PubsubRateLimiter {
pub fn new(n: u64, period: Duration) -> Result<Self, String> {
Ok(Self {
init_time: Instant::now(),
limiters: Limiter::from_quota(Quota::n_every(n, period))?,
limiters_by_topic: Default::default(),
})
}

pub fn limit_by_topic(
mut self,
kind: GossipKind,
n: u64,
period: Duration,
) -> Result<Self, String> {
let limiter = Limiter::from_quota(Quota::n_every(n, period))?;
self.limiters_by_topic.insert(kind, limiter);
Ok(self)
}

pub fn allows(
&mut self,
peer_id: &PeerId,
msg: &PubsubMessage,
) -> Result<(), (Option<GossipKind>, RateLimitedErr)> {
let time_since_start = self.init_time.elapsed();

if let Err(err) = self.limiters.allows(time_since_start, peer_id, 1) {
return Err((None, err));
}

let kind = msg.kind();
if let Some(limiter) = self.limiters_by_topic.get_mut(&kind) {
if let Err(err) = limiter.allows(time_since_start, peer_id, 1) {
return Err((Some(kind), err));
}
}

Ok(())
}

pub fn prune(&mut self) {
let time_since_start = self.init_time.elapsed();

self.limiters.prune(time_since_start);

for limiter in self.limiters_by_topic.values_mut() {
limiter.prune(time_since_start);
}
}
}
47 changes: 41 additions & 6 deletions node/router/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::metrics;
use crate::rate_limit::PubsubRateLimiter;
use crate::Config;
use crate::{libp2p_event_handler::Libp2pEventHandler, peer_manager::PeerManager};
use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::rpc::GoodbyeReason;
use network::PeerId;
use network::types::GossipKind;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, NetworkReceiver,
NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
};
use network::{MessageAcceptance, PeerAction, PeerId, ReportSource};
use pruner::PrunerMessage;
use shared_types::ShardedFile;
use std::sync::Arc;
use std::time::Duration;
use storage::log_store::Store as LogStore;
use storage_async::Store;
use sync::{SyncMessage, SyncSender};
Expand Down Expand Up @@ -49,6 +52,8 @@ pub struct RouterService {
upnp_mappings: (Option<u16>, Option<u16>),

store: Arc<dyn LogStore>,

pubsub_rate_limiter: PubsubRateLimiter,
}

impl RouterService {
Expand All @@ -67,9 +72,19 @@ impl RouterService {
file_location_cache: Arc<FileLocationCache>,
local_keypair: Keypair,
config: Config,
) {
) -> Result<(), String> {
let peers = Arc::new(RwLock::new(PeerManager::new(config.clone())));

let pubsub_rate_limiter = PubsubRateLimiter::new(100, Duration::from_secs(10))?
.limit_by_topic(GossipKind::Example, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::NewFile, 50, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AskFile, 50, Duration::from_secs(10))?
.limit_by_topic(GossipKind::FindFile, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceFile, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::FindChunks, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceChunks, 10, Duration::from_secs(10))?
.limit_by_topic(GossipKind::AnnounceShardConfig, 50, Duration::from_secs(10))?;

// create the network service and spawn the task
let router = RouterService {
config: config.clone(),
Expand All @@ -91,17 +106,21 @@ impl RouterService {
),
upnp_mappings: (None, None),
store,
pubsub_rate_limiter,
};

// spawn service
let shutdown_sender = executor.shutdown_sender();

executor.spawn(router.main(shutdown_sender), "router");

Ok(())
}

async fn main(mut self, mut shutdown_sender: Sender<ShutdownReason>) {
let mut heartbeat_service = interval(self.config.heartbeat_interval);
let mut heartbeat_batcher = interval(self.config.batcher_timeout);
let mut heartbeat_rate_limiter = interval(Duration::from_secs(30));

loop {
tokio::select! {
Expand All @@ -118,6 +137,8 @@ impl RouterService {

// heartbeat for expire file batcher
_ = heartbeat_batcher.tick() => self.libp2p_event_handler.expire_batcher().await,

_ = heartbeat_rate_limiter.tick() => self.pubsub_rate_limiter.prune(),
}
}
}
Expand Down Expand Up @@ -192,10 +213,24 @@ impl RouterService {
message,
..
} => {
let result = self
.libp2p_event_handler
.on_pubsub_message(propagation_source, source, &id, message)
.await;
let result = if let Err((rate_limit_kind, _)) = self
.pubsub_rate_limiter
.allows(&propagation_source, &message)
{
warn!(%propagation_source, kind=?message.kind(), ?rate_limit_kind, "Pubsub message rate limited");
self.libp2p_event_handler
.send_to_network(NetworkMessage::ReportPeer {
peer_id: propagation_source,
action: PeerAction::LowToleranceError,
source: ReportSource::Gossipsub,
msg: "Pubsub message rate limited",
});
MessageAcceptance::Reject
} else {
self.libp2p_event_handler
.on_pubsub_message(propagation_source, source, &id, message)
.await
};

self.libp2p
.swarm
Expand Down
2 changes: 1 addition & 1 deletion node/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ impl ClientBuilder {
file_location_cache,
network.keypair.clone(),
router_config,
);
)?;

Ok(self)
}
Expand Down

0 comments on commit aca220a

Please sign in to comment.