diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 03203fcade0..4a8d2b206d2 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -15,7 +15,8 @@ use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; -use libp2p::swarm::Stream; +use libp2p::swarm::{ConnectionId, Stream}; +use libp2p::PeerId; use slog::{crit, debug, trace}; use smallvec::SmallVec; use std::{ @@ -89,6 +90,12 @@ pub struct RPCHandler where E: EthSpec, { + /// This `ConnectionId`. + connection_id: ConnectionId, + + /// The matching `PeerId` of this connection. + peer_id: PeerId, + /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, ()>, @@ -138,7 +145,7 @@ where /// Logger for handling RPC streams log: slog::Logger, - /// Timeout that will me used for inbound and outbound responses. + /// Timeout that will be used for inbound and outbound responses. resp_timeout: Duration, } @@ -219,12 +226,16 @@ where E: EthSpec, { pub fn new( + connection_id: ConnectionId, + peer_id: PeerId, listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, log: &slog::Logger, resp_timeout: Duration, ) -> Self { RPCHandler { + connection_id, + peer_id, listen_protocol, events_out: SmallVec::new(), dial_queue: SmallVec::new(), @@ -302,6 +313,7 @@ where } return; }; + // If the response we are sending is an error, report back for handling if let RpcResponse::Error(ref code, ref reason) = response { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { @@ -314,9 +326,10 @@ where if matches!(self.state, HandlerState::Deactivated) { // we no longer send responses after the handler is deactivated debug!(self.log, "Response not sent. Deactivated handler"; - "response" => %response, "id" => inbound_id); + "response" => %response, "id" => inbound_id); return; } + inbound_info.pending_items.push_back(response); } } @@ -938,6 +951,8 @@ where self.events_out .push(HandlerEvent::Ok(RPCReceived::Request(Request { id: RequestId::next(), + peer_id: self.peer_id, + connection_id: self.connection_id, substream_id: self.current_inbound_substream_id, r#type: req, }))); diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 2f6200a836b..96111e3b39e 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -606,6 +606,20 @@ pub enum ResponseTermination { LightClientUpdatesByRange, } +impl ResponseTermination { + pub fn protocol(&self) -> Protocol { + match self { + ResponseTermination::BlocksByRange => Protocol::BlocksByRange, + ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlobsByRange => Protocol::BlobsByRange, + ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, + ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, + ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, + ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange, + } + } +} + /// The structured response containing a result/code indicating success or failure /// and the contents of the response #[derive(Debug, Clone)] diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 03f1395b8b5..8509e7b5dd2 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -4,7 +4,6 @@ //! direct peer-to-peer communication primarily for sending/receiving chain information for //! syncing. -use futures::future::FutureExt; use handler::RPCHandler; use libp2p::core::transport::PortUse; use libp2p::swarm::{ @@ -13,8 +12,8 @@ use libp2p::swarm::{ }; use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::PeerId; -use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}; -use slog::{crit, debug, o, trace}; +use slog::{debug, error, o, trace}; +use std::collections::HashMap; use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -28,6 +27,11 @@ pub(crate) use methods::{ }; pub use protocol::RequestType; +use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; +use self::protocol::RPCProtocol; +use self::self_limiter::SelfRateLimiter; +use crate::rpc::rate_limiter::RateLimiterItem; +use crate::rpc::response_limiter::ResponseLimiter; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, @@ -35,10 +39,6 @@ pub use methods::{ }; pub use protocol::{max_rpc_size, Protocol, RPCError}; -use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; -use self::protocol::RPCProtocol; -use self::self_limiter::SelfRateLimiter; - pub(crate) mod codec; pub mod config; mod handler; @@ -46,10 +46,14 @@ pub mod methods; mod outbound; mod protocol; mod rate_limiter; +mod response_limiter; mod self_limiter; static NEXT_REQUEST_ID: AtomicUsize = AtomicUsize::new(1); +// Maximum number of concurrent requests per protocol ID that a client may issue. +const MAX_CONCURRENT_REQUESTS: usize = 2; + /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} impl ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {} @@ -115,6 +119,8 @@ impl RequestId { #[derive(Debug, Clone)] pub struct Request { pub id: RequestId, + pub peer_id: PeerId, + pub connection_id: ConnectionId, pub substream_id: SubstreamId, pub r#type: RequestType, } @@ -151,10 +157,12 @@ pub struct NetworkParams { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { - /// Rate limiter - limiter: Option, + /// Rate limiter for our responses. + response_limiter: Option>, /// Rate limiter for our own requests. - self_limiter: Option>, + outbound_request_limiter: SelfRateLimiter, + /// Active inbound requests that are awaiting a response. + active_inbound_requests: HashMap>, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -179,20 +187,23 @@ impl RPC { ) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); - let inbound_limiter = inbound_rate_limiter_config.map(|config| { - debug!(log, "Using inbound rate limiting params"; "config" => ?config); - RateLimiter::new_with_config(config.0, fork_context.clone()) + let response_limiter = inbound_rate_limiter_config.clone().map(|config| { + debug!(log, "Using response rate limiting params"; "config" => ?config); + ResponseLimiter::new(config, fork_context.clone(), log.clone()) .expect("Inbound limiter configuration parameters are valid") }); - let self_limiter = outbound_rate_limiter_config.map(|config| { - SelfRateLimiter::new(config, fork_context.clone(), log.clone()) - .expect("Configuration parameters are valid") - }); + let outbound_request_limiter: SelfRateLimiter = SelfRateLimiter::new( + outbound_rate_limiter_config, + fork_context.clone(), + log.clone(), + ) + .expect("Outbound limiter configuration parameters are valid"); RPC { - limiter: inbound_limiter, - self_limiter, + response_limiter, + outbound_request_limiter, + active_inbound_requests: HashMap::new(), events: Vec::new(), fork_context, enable_light_client_server, @@ -208,14 +219,39 @@ impl RPC { pub fn send_response( &mut self, peer_id: PeerId, - id: (ConnectionId, SubstreamId), - _request_id: RequestId, + _id: (ConnectionId, SubstreamId), + request_id: RequestId, event: RpcResponse, ) { + let Some(request) = self.active_inbound_requests.remove(&request_id) else { + error!(self.log, "Request not found in active_inbound_requests. Response not sent"; "peer_id" => %peer_id, "request_id" => ?request_id, "response" => %event); + return; + }; + + // Add the request back to active requests if the response is not a stream termination. + if request.r#type.protocol().terminator().is_some() + && !matches!(event, RpcResponse::StreamTermination(_)) + { + self.active_inbound_requests + .insert(request_id, request.clone()); + } + + if let Some(response_limiter) = self.response_limiter.as_mut() { + if !response_limiter.allows( + peer_id, + request.r#type.protocol(), + request.connection_id, + request.substream_id, + event.clone(), + ) { + return; + } + } + self.events.push(ToSwarm::NotifyHandler { peer_id, - handler: NotifyHandler::One(id.0), - event: RPCSend::Response(id.1, event), + handler: NotifyHandler::One(request.connection_id), + event: RPCSend::Response(request.substream_id, event), }); } @@ -223,23 +259,15 @@ impl RPC { /// /// The peer must be connected for this to succeed. pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) { - let event = if let Some(self_limiter) = self.self_limiter.as_mut() { - match self_limiter.allows(peer_id, request_id, req) { - Ok(event) => event, - Err(_e) => { - // Request is logged and queued internally in the self rate limiter. - return; - } - } - } else { - ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: RPCSend::Request(request_id, req), + match self + .outbound_request_limiter + .allows(peer_id, request_id, req) + { + Ok(event) => self.events.push(event), + Err(_e) => { + // Request is logged and queued internally in the self rate limiter. } - }; - - self.events.push(event); + } } /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This @@ -295,6 +323,8 @@ where .log .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, @@ -328,6 +358,8 @@ where .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, @@ -355,20 +387,26 @@ where if remaining_established > 0 { return; } + // Get a list of pending requests from the self rate limiter - if let Some(limiter) = self.self_limiter.as_mut() { - for (id, proto) in limiter.peer_disconnected(peer_id) { - let error_msg = ToSwarm::GenerateEvent(RPCMessage { - peer_id, - conn_id: connection_id, - message: Err(HandlerErr::Outbound { - id, - proto, - error: RPCError::Disconnected, - }), - }); - self.events.push(error_msg); - } + for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) { + let error_msg = ToSwarm::GenerateEvent(RPCMessage { + peer_id, + conn_id: connection_id, + message: Err(HandlerErr::Outbound { + id, + proto, + error: RPCError::Disconnected, + }), + }); + self.events.push(error_msg); + } + + self.active_inbound_requests + .retain(|_request_id, request| request.peer_id != peer_id); + + if let Some(limiter) = self.response_limiter.as_mut() { + limiter.peer_disconnected(peer_id); } // Replace the pending Requests to the disconnected peer @@ -403,65 +441,52 @@ where match event { HandlerEvent::Ok(RPCReceived::Request(Request { id, + peer_id, + connection_id, substream_id, r#type, })) => { - if let Some(limiter) = self.limiter.as_mut() { - // check if the request is conformant to the quota - match limiter.allows(&peer_id, &r#type) { - Err(RateLimitedErr::TooLarge) => { - // we set the batch sizes, so this is a coding/config err for most protocols - let protocol = r#type.versioned_protocol().protocol(); - if matches!( - protocol, - Protocol::BlocksByRange - | Protocol::BlobsByRange - | Protocol::DataColumnsByRange - | Protocol::BlocksByRoot - | Protocol::BlobsByRoot - | Protocol::DataColumnsByRoot - ) { - debug!(self.log, "Request too large to process"; "request" => %r#type, "protocol" => %protocol); - } else { - // Other protocols shouldn't be sending large messages, we should flag the peer kind - crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol); - } - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, substream_id), - id, - RpcResponse::Error( - RpcErrorResponse::RateLimited, - "Rate limited. Request too large".into(), - ), - ); - return; - } - Err(RateLimitedErr::TooSoon(wait_time)) => { - debug!(self.log, "Request exceeds the rate limit"; - "request" => %r#type, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis()); - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - (conn_id, substream_id), - id, - RpcResponse::Error( - RpcErrorResponse::RateLimited, - format!("Wait {:?}", wait_time).into(), - ), - ); - return; - } - // No rate limiting, continue. - Ok(()) => {} - } + let request = Request { + id, + peer_id, + connection_id, + substream_id, + r#type, + }; + + let is_concurrent_request_limit_exceeded = self + .active_inbound_requests + .iter() + .filter(|(_request_id, active_request)| { + active_request.peer_id == peer_id + && active_request.r#type.protocol() == request.r#type.protocol() + }) + .count() + >= MAX_CONCURRENT_REQUESTS; + + // We need to insert the request regardless of whether it is allowed by the limiter, + // since we send an error response (RateLimited) if it is not allowed. + self.active_inbound_requests.insert(id, request.clone()); + + // Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer. + if is_concurrent_request_limit_exceeded { + // There is already an active request with the same protocol. Send an error code to the peer. + debug!(self.log, "There is an active request with the same protocol"; "peer_id" => peer_id.to_string(), "request" => %request.r#type, "protocol" => %request.r#type.versioned_protocol().protocol()); + self.send_response( + peer_id, + (conn_id, substream_id), + id, + RpcResponse::Error( + RpcErrorResponse::RateLimited, + "Rate limited. There is an active request with the same protocol" + .into(), + ), + ); + return; } // If we received a Ping, we queue a Pong response. - if let RequestType::Ping(_) = r#type { + if let RequestType::Ping(_) = request.r#type { trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %conn_id, "peer_id" => %peer_id); self.send_response( peer_id, @@ -476,14 +501,25 @@ where self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id, - message: Ok(RPCReceived::Request(Request { - id, - substream_id, - r#type, - })), + message: Ok(RPCReceived::Request(request)), })); } HandlerEvent::Ok(rpc) => { + // Inform the limiter that a response has been received. + match &rpc { + RPCReceived::Request(_) => unreachable!(), + RPCReceived::Response(_id, response) => { + if response.protocol().terminator().is_none() { + self.outbound_request_limiter + .request_completed(&peer_id, response.protocol()); + } + } + RPCReceived::EndOfStream(_id, response_termination) => { + self.outbound_request_limiter + .request_completed(&peer_id, response_termination.protocol()); + } + } + self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id, @@ -491,6 +527,14 @@ where })); } HandlerEvent::Err(err) => { + // Inform the limiter that the request has ended with an error. + let protocol = match err { + HandlerErr::Inbound { proto, .. } => proto, + HandlerErr::Outbound { proto, .. } => proto, + }; + self.outbound_request_limiter + .request_completed(&peer_id, protocol); + self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id, @@ -508,15 +552,20 @@ where } fn poll(&mut self, cx: &mut Context) -> Poll>> { - // let the rate limiter prune. - if let Some(limiter) = self.limiter.as_mut() { - let _ = limiter.poll_unpin(cx); + if let Some(response_limiter) = self.response_limiter.as_mut() { + if let Poll::Ready(responses) = response_limiter.poll_ready(cx) { + for response in responses { + self.events.push(ToSwarm::NotifyHandler { + peer_id: response.peer_id, + handler: NotifyHandler::One(response.connection_id), + event: RPCSend::Response(response.substream_id, response.response), + }); + } + } } - if let Some(self_limiter) = self.self_limiter.as_mut() { - if let Poll::Ready(event) = self_limiter.poll_ready(cx) { - self.events.push(event) - } + if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) { + self.events.push(event) } if !self.events.is_empty() { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index b9e82a5f1ee..f666c30d528 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder { lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. lc_optimistic_update_quota: Option, - /// Quota for the LightClientOptimisticUpdate protocol. + /// Quota for the LightClientFinalityUpdate protocol. lc_finality_update_quota: Option, /// Quota for the LightClientUpdatesByRange protocol. lc_updates_by_range_quota: Option, @@ -275,6 +275,17 @@ impl RateLimiterItem for super::RequestType { } } +impl RateLimiterItem for (super::RpcResponse, Protocol) { + fn protocol(&self) -> Protocol { + self.1 + } + + fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 { + // A response chunk consumes one token of the rate limiter. + 1 + } +} + impl RPCRateLimiter { pub fn new_with_config( config: RateLimiterConfig, diff --git a/beacon_node/lighthouse_network/src/rpc/response_limiter.rs b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs new file mode 100644 index 00000000000..fd6f886d9a6 --- /dev/null +++ b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs @@ -0,0 +1,177 @@ +use crate::rpc::config::InboundRateLimiterConfig; +use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr}; +use crate::rpc::{Protocol, RpcResponse, SubstreamId}; +use crate::PeerId; +use futures::FutureExt; +use libp2p::swarm::ConnectionId; +use slog::{crit, debug, Logger}; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio_util::time::DelayQueue; +use types::{EthSpec, ForkContext}; + +/// A response that was rate limited or waiting on rate limited responses for the same peer and +/// protocol. +#[derive(Clone)] +pub(super) struct QueuedResponse { + pub peer_id: PeerId, + pub connection_id: ConnectionId, + pub substream_id: SubstreamId, + pub response: RpcResponse, + pub protocol: Protocol, +} + +pub(super) struct ResponseLimiter { + /// Rate limiter for our responses. + limiter: RPCRateLimiter, + /// Responses queued for sending. These responses are stored when the response limiter rejects them. + delayed_responses: HashMap<(PeerId, Protocol), VecDeque>>, + /// The delay required to allow a peer's outbound response per protocol. + next_response: DelayQueue<(PeerId, Protocol)>, + /// Slog logger. + log: Logger, +} + +impl ResponseLimiter { + /// Creates a new [`ResponseLimiter`] based on configuration values. + pub fn new( + config: InboundRateLimiterConfig, + fork_context: Arc, + log: Logger, + ) -> Result { + Ok(ResponseLimiter { + limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?, + // .expect("Inbound limiter configuration parameters are valid"), + delayed_responses: HashMap::new(), + next_response: DelayQueue::new(), + log, + }) + } + + /// Checks if the rate limiter allows the response. When not allowed, the response is delayed + /// until it can be sent. + pub fn allows( + &mut self, + peer_id: PeerId, + protocol: Protocol, + connection_id: ConnectionId, + substream_id: SubstreamId, + response: RpcResponse, + ) -> bool { + // First check that there are not already other responses waiting to be sent. + if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) { + queue.push_back(QueuedResponse { + peer_id, + connection_id, + substream_id, + response, + protocol, + }); + return false; + } + + if let Err(wait_time) = Self::try_limiter( + &mut self.limiter, + peer_id, + response.clone(), + protocol, + &self.log, + ) { + self.delayed_responses + .entry((peer_id, protocol)) + .or_default() + .push_back(QueuedResponse { + peer_id, + connection_id, + substream_id, + response, + protocol, + }); + self.next_response.insert((peer_id, protocol), wait_time); + return false; + } + + true + } + + /// Checks if the limiter allows the response. If the response should be delayed, the duration + /// to wait is returned. + fn try_limiter( + limiter: &mut RPCRateLimiter, + peer_id: PeerId, + response: RpcResponse, + protocol: Protocol, + log: &Logger, + ) -> Result<(), Duration> { + match limiter.allows(&peer_id, &(response.clone(), protocol)) { + Ok(()) => Ok(()), + Err(e) => match e { + RateLimitedErr::TooLarge => { + // This should never happen with default parameters. Let's just send the response. + // Log a crit since this is a config issue. + crit!( + log, + "Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."; + "protocol" => %protocol + ); + Ok(()) + } + RateLimitedErr::TooSoon(wait_time) => { + debug!(log, "Response rate limiting"; "protocol" => %protocol, "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); + Err(wait_time) + } + }, + } + } + + /// Informs the limiter that a peer has disconnected. This removes any pending responses. + pub fn peer_disconnected(&mut self, peer_id: PeerId) { + self.delayed_responses + .retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id); + } + + /// When a peer and protocol are allowed to send a next response, this function checks the + /// queued responses and attempts marking as ready as many as the limiter allows. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>> { + let mut responses = vec![]; + while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) { + let (peer_id, protocol) = expired.into_inner(); + + if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) { + let queue = entry.get_mut(); + // Take delayed responses from the queue, as long as the limiter allows it. + while let Some(response) = queue.pop_front() { + match Self::try_limiter( + &mut self.limiter, + response.peer_id, + response.response.clone(), + response.protocol, + &self.log, + ) { + Ok(()) => responses.push(response), + Err(wait_time) => { + // The response was taken from the queue, but the limiter didn't allow it. + queue.push_front(response); + self.next_response.insert((peer_id, protocol), wait_time); + break; + } + } + } + if queue.is_empty() { + entry.remove(); + } + } + } + + // Prune the rate limiter. + let _ = self.limiter.poll_unpin(cx); + + if !responses.is_empty() { + return Poll::Ready(responses); + } + Poll::Pending + } +} diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index ae63e5cdb5a..3fbd1346afa 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -5,6 +5,12 @@ use std::{ time::Duration, }; +use super::{ + config::OutboundRateLimiterConfig, + rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, + BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS, +}; +use crate::rpc::rate_limiter::RateLimiterItem; use futures::FutureExt; use libp2p::{swarm::NotifyHandler, PeerId}; use slog::{crit, debug, Logger}; @@ -12,12 +18,6 @@ use smallvec::SmallVec; use tokio_util::time::DelayQueue; use types::{EthSpec, ForkContext}; -use super::{ - config::OutboundRateLimiterConfig, - rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, - BehaviourAction, Protocol, RPCSend, ReqId, RequestType, -}; - /// A request that was rate limited or waiting on rate limited requests for the same peer and /// protocol. struct QueuedRequest { @@ -25,15 +25,20 @@ struct QueuedRequest { request_id: Id, } +/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue. +const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100; + pub(crate) struct SelfRateLimiter { - /// Requests queued for sending per peer. This requests are stored when the self rate + /// Active requests that are awaiting a response. + active_requests: HashMap>, + /// Requests queued for sending per peer. These requests are stored when the self rate /// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore /// are stored in the same way. delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>, /// The delay required to allow a peer's outbound request per protocol. next_peer_request: DelayQueue<(PeerId, Protocol)>, /// Rate limiter for our own requests. - limiter: RateLimiter, + rate_limiter: Option, /// Requests that are ready to be sent. ready_requests: SmallVec<[BehaviourAction; 3]>, /// Slog logger. @@ -50,19 +55,24 @@ pub enum Error { } impl SelfRateLimiter { - /// Creates a new [`SelfRateLimiter`] based on configration values. + /// Creates a new [`SelfRateLimiter`] based on configuration values. pub fn new( - config: OutboundRateLimiterConfig, + config: Option, fork_context: Arc, log: Logger, ) -> Result { debug!(log, "Using self rate limiting params"; "config" => ?config); - let limiter = RateLimiter::new_with_config(config.0, fork_context)?; + let rate_limiter = if let Some(c) = config { + Some(RateLimiter::new_with_config(c.0, fork_context)?) + } else { + None + }; Ok(SelfRateLimiter { + active_requests: Default::default(), delayed_requests: Default::default(), next_peer_request: Default::default(), - limiter, + rate_limiter, ready_requests: Default::default(), log, }) @@ -80,11 +90,18 @@ impl SelfRateLimiter { let protocol = req.versioned_protocol().protocol(); // First check that there are not already other requests waiting to be sent. if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) { + debug!(self.log, "Self rate limiting since there are already other requests waiting to be sent"; "protocol" => %req.protocol(), "peer_id" => %peer_id); queued_requests.push_back(QueuedRequest { req, request_id }); - return Err(Error::PendingRequests); } - match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) { + match Self::try_send_request( + &mut self.active_requests, + &mut self.rate_limiter, + peer_id, + request_id, + req, + &self.log, + ) { Err((rate_limited_req, wait_time)) => { let key = (peer_id, protocol); self.next_peer_request.insert(key, wait_time); @@ -103,42 +120,60 @@ impl SelfRateLimiter { /// request, the [`ToSwarm`] that should be emitted is returned. If the request /// should be delayed, it's returned with the duration to wait. fn try_send_request( - limiter: &mut RateLimiter, + active_requests: &mut HashMap>, + rate_limiter: &mut Option, peer_id: PeerId, request_id: Id, req: RequestType, log: &Logger, ) -> Result, (QueuedRequest, Duration)> { - match limiter.allows(&peer_id, &req) { - Ok(()) => Ok(BehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: RPCSend::Request(request_id, req), - }), - Err(e) => { - let protocol = req.versioned_protocol(); - match e { - RateLimitedErr::TooLarge => { - // this should never happen with default parameters. Let's just send the request. - // Log a crit since this is a config issue. - crit!( - log, - "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."; - "protocol" => %req.versioned_protocol().protocol() - ); - Ok(BehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: RPCSend::Request(request_id, req), - }) - } - RateLimitedErr::TooSoon(wait_time) => { - debug!(log, "Self rate limiting"; "protocol" => %protocol.protocol(), "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); - Err((QueuedRequest { req, request_id }, wait_time)) + if let Some(active_request) = active_requests.get(&peer_id) { + if let Some(count) = active_request.get(&req.protocol()) { + if *count >= MAX_CONCURRENT_REQUESTS { + debug!(log, "Self rate limiting due to the number of concurrent requests"; "protocol" => %req.protocol(), "peer_id" => %peer_id); + return Err(( + QueuedRequest { req, request_id }, + Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS), + )); + } + } + } + + if let Some(limiter) = rate_limiter.as_mut() { + match limiter.allows(&peer_id, &req) { + Ok(()) => {} + Err(e) => { + let protocol = req.versioned_protocol(); + match e { + RateLimitedErr::TooLarge => { + // this should never happen with default parameters. Let's just send the request. + // Log a crit since this is a config issue. + crit!( + log, + "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."; + "protocol" => %req.versioned_protocol().protocol() + ); + } + RateLimitedErr::TooSoon(wait_time) => { + debug!(log, "Self rate limiting"; "protocol" => %protocol.protocol(), "wait_time_ms" => wait_time.as_millis(), "peer_id" => %peer_id); + return Err((QueuedRequest { req, request_id }, wait_time)); + } } } } } + + *active_requests + .entry(peer_id) + .or_default() + .entry(req.protocol()) + .or_default() += 1; + + Ok(BehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: RPCSend::Request(request_id, req), + }) } /// When a peer and protocol are allowed to send a next request, this function checks the @@ -147,8 +182,14 @@ impl SelfRateLimiter { if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) { let queued_requests = entry.get_mut(); while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() { - match Self::try_send_request(&mut self.limiter, peer_id, request_id, req, &self.log) - { + match Self::try_send_request( + &mut self.active_requests, + &mut self.rate_limiter, + peer_id, + request_id, + req, + &self.log, + ) { Err((rate_limited_req, wait_time)) => { let key = (peer_id, protocol); self.next_peer_request.insert(key, wait_time); @@ -170,6 +211,8 @@ impl SelfRateLimiter { /// Informs the limiter that a peer has disconnected. This removes any pending requests and /// returns their IDs. pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { + self.active_requests.remove(&peer_id); + // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // should never really be large. So we iterate for simplicity let mut failed_requests = Vec::new(); @@ -191,16 +234,32 @@ impl SelfRateLimiter { failed_requests } + /// Informs the limiter that a response has been received. + pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) { + if let Some(active_requests) = self.active_requests.get_mut(peer_id) { + if let Entry::Occupied(mut entry) = active_requests.entry(protocol) { + if *entry.get() > 1 { + *entry.get_mut() -= 1; + } else { + entry.remove(); + } + } + } + } + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // First check the requests that were self rate limited, since those might add events to - // the queue. Also do this this before rate limiter prunning to avoid removing and + // the queue. Also do this before rate limiter pruning to avoid removing and // immediately adding rate limiting keys. if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) { let (peer_id, protocol) = expired.into_inner(); self.next_peer_request_ready(peer_id, protocol); } + // Prune the rate limiter. - let _ = self.limiter.poll_unpin(cx); + if let Some(limiter) = self.rate_limiter.as_mut() { + let _ = limiter.poll_unpin(cx); + } // Finally return any queued events. if !self.ready_requests.is_empty() { @@ -216,7 +275,7 @@ mod tests { use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig}; use crate::rpc::rate_limiter::Quota; use crate::rpc::self_limiter::SelfRateLimiter; - use crate::rpc::{Ping, Protocol, RequestType}; + use crate::rpc::{BehaviourAction, Ping, Protocol, RPCSend, RequestType}; use crate::service::api_types::{AppRequestId, RequestId, SingleLookupReqId, SyncRequestId}; use libp2p::PeerId; use std::time::Duration; @@ -236,7 +295,7 @@ mod tests { &MainnetEthSpec::default_spec(), )); let mut limiter: SelfRateLimiter = - SelfRateLimiter::new(config, fork_context, log).unwrap(); + SelfRateLimiter::new(Some(config), fork_context, log).unwrap(); let peer_id = PeerId::random(); let lookup_id = 0; @@ -299,4 +358,156 @@ mod tests { assert_eq!(limiter.ready_requests.len(), 1); } } + + /// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting. + #[tokio::test] + async fn test_next_peer_request_ready_concurrent_requests() { + let log = logging::test_logger(); + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(None, fork_context, log).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5u32 { + let result = limiter.allows( + peer_id, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id: i, + req_id: i, + }, + })), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first two requests. + if i <= 2 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(3, queue.len()); + + // The delayed requests remain even after the next_peer_request_ready call because the responses have not been received. + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(3, queue.len()); + + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(2, queue.len()); + + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping)); + assert!(queue.is_none()); + + // Check that the three delayed requests have moved to ready_requests. + let mut it = limiter.ready_requests.iter(); + for i in 3..=5u32 { + let BehaviourAction::NotifyHandler { + peer_id: _, + handler: _, + event: RPCSend::Request(request_id, _), + } = it.next().unwrap() + else { + unreachable!() + }; + + assert!(matches!( + request_id, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if *req_id == i + )); + } + } + + #[tokio::test] + async fn test_peer_disconnected() { + let log = logging::test_logger(); + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(None, fork_context, log).unwrap(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + + for peer in [peer1, peer2] { + for i in 1..=5u32 { + let result = limiter.allows( + peer, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id: i, + req_id: i, + }, + })), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first two requests. + if i <= 2 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + } + + assert!(limiter.active_requests.contains_key(&peer1)); + assert!(limiter + .delayed_requests + .contains_key(&(peer1, Protocol::Ping))); + assert!(limiter.active_requests.contains_key(&peer2)); + assert!(limiter + .delayed_requests + .contains_key(&(peer2, Protocol::Ping))); + + // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly. + let mut failed_requests = limiter.peer_disconnected(peer1); + for i in 3..=5u32 { + let (request_id, _) = failed_requests.remove(0); + assert!(matches!( + request_id, + RequestId::Application(AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + })) if req_id == i + )); + } + + // Check that peer1’s active and delayed requests have been removed. + assert!(!limiter.active_requests.contains_key(&peer1)); + assert!(!limiter + .delayed_requests + .contains_key(&(peer1, Protocol::Ping))); + + assert!(limiter.active_requests.contains_key(&peer2)); + assert!(limiter + .delayed_requests + .contains_key(&(peer2, Protocol::Ping))); + } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index 6a3ec6dd322..2d061fa4a53 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -15,6 +15,7 @@ use types::{ type E = MinimalEthSpec; +use lighthouse_network::rpc::config::InboundRateLimiterConfig; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context @@ -79,7 +80,11 @@ pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { } } -pub fn build_config(mut boot_nodes: Vec) -> Arc { +pub fn build_config( + mut boot_nodes: Vec, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, +) -> Arc { let mut config = NetworkConfig::default(); // Find unused ports by using the 0 port. @@ -95,6 +100,8 @@ pub fn build_config(mut boot_nodes: Vec) -> Arc { config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None); config.boot_nodes_enr.append(&mut boot_nodes); config.network_dir = path.into_path(); + config.disable_peer_scoring = disable_peer_scoring; + config.inbound_rate_limiter_config = inbound_rate_limiter; Arc::new(config) } @@ -104,8 +111,10 @@ pub async fn build_libp2p_instance( log: slog::Logger, fork_name: ForkName, chain_spec: Arc, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, ) -> Libp2pInstance { - let config = build_config(boot_nodes); + let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter); // launch libp2p service let (signal, exit) = async_channel::bounded(1); @@ -147,14 +156,32 @@ pub async fn build_node_pair( fork_name: ForkName, spec: Arc, protocol: Protocol, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = - build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name, spec.clone()).await; - let mut receiver = - build_libp2p_instance(rt, vec![], receiver_log, fork_name, spec.clone()).await; + let mut sender = build_libp2p_instance( + rt.clone(), + vec![], + sender_log, + fork_name, + spec.clone(), + disable_peer_scoring, + inbound_rate_limiter.clone(), + ) + .await; + let mut receiver = build_libp2p_instance( + rt, + vec![], + receiver_log, + fork_name, + spec.clone(), + disable_peer_scoring, + inbound_rate_limiter, + ) + .await; // let the two nodes set up listeners let sender_fut = async { @@ -228,7 +255,16 @@ pub async fn build_linear( let mut nodes = Vec::with_capacity(n); for _ in 0..n { nodes.push( - build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name, spec.clone()).await, + build_libp2p_instance( + rt.clone(), + vec![], + log.clone(), + fork_name, + spec.clone(), + false, + None, + ) + .await, ); } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 4b54a24ddc8..89231ba9797 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -6,11 +6,11 @@ use common::Protocol; use lighthouse_network::rpc::{methods::*, RequestType}; use lighthouse_network::service::api_types::AppRequestId; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Response}; -use slog::{debug, warn, Level}; +use slog::{debug, error, warn, Level}; use ssz::Encode; use ssz_types::VariableList; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ @@ -71,6 +71,8 @@ fn test_tcp_status_rpc() { ForkName::Base, spec, Protocol::Tcp, + false, + None, ) .await; @@ -173,6 +175,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -320,6 +324,8 @@ fn test_blobs_by_range_chunked_rpc() { ForkName::Deneb, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -443,6 +449,8 @@ fn test_tcp_blocks_by_range_over_limit() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -550,6 +558,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -683,6 +693,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -806,6 +818,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -952,6 +966,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -1087,9 +1103,16 @@ fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) { // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base, spec, protocol) - .await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + spec, + protocol, + false, + None, + ) + .await; // build the sender future let sender_future = async { @@ -1152,3 +1175,243 @@ fn quic_test_goodbye_rpc() { let enable_logging = false; goodbye_test(log_level, enable_logging, Protocol::Quic); } + +// Test that the receiver delays the responses during response rate-limiting. +#[test] +fn test_delayed_rpc_response() { + let rt = Arc::new(Runtime::new().unwrap()); + let log = logging::test_logger(); + let spec = Arc::new(E::default_spec()); + + // Allow 1 token to be use used every 3 seconds. + const QUOTA_SEC: u64 = 3; + + rt.block_on(async { + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + spec, + Protocol::Tcp, + false, + // Configure a quota for STATUS responses of 1 token every 3 seconds. + Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()), + ) + .await; + + // Dummy STATUS RPC message + let rpc_request = RequestType::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // Dummy STATUS RPC message + let rpc_response = Response::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // build the sender future + let sender_future = async { + let mut request_id = 1; + let mut request_sent_at = Instant::now(); + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(log, "Sending RPC request"; "request_id" => request_id); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + request_sent_at = Instant::now(); + } + NetworkEvent::ResponseReceived { + peer_id, + id: _, + response, + } => { + debug!(log, "Sender received"; "request_id" => request_id); + assert_eq!(response, rpc_response); + + match request_id { + 1 => { + // The first response is returned instantly. + assert!(request_sent_at.elapsed() < Duration::from_millis(100)); + } + 2..=5 => { + // The second and subsequent responses are delayed due to the response rate-limiter on the receiver side. + // Adding a slight margin to the elapsed time check to account for potential timing issues caused by system + // scheduling or execution delays during testing. + assert!( + request_sent_at.elapsed() + > (Duration::from_secs(QUOTA_SEC) + - Duration::from_millis(100)) + ); + if request_id == 5 { + // End the test + return; + } + } + _ => unreachable!(), + } + + request_id += 1; + debug!(log, "Sending RPC request"; "request_id" => request_id); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + request_sent_at = Instant::now(); + } + NetworkEvent::RPCFailed { + id: _, + peer_id: _, + error, + } => { + error!(log, "RPC Failed"; "error" => ?error); + panic!("Rpc failed."); + } + _ => {} + } + } + }; + + // build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { + peer_id, + id, + request, + } = receiver.next_event().await + { + assert_eq!(request.r#type, rpc_request); + debug!(log, "Receiver received request"); + receiver.send_response(peer_id, id, request.id, rpc_response.clone()); + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +} + +// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at +// once, thanks to the self-limiter on the sender side. +#[test] +fn test_active_requests() { + let rt = Arc::new(Runtime::new().unwrap()); + let log = logging::test_logger(); + let spec = Arc::new(E::default_spec()); + + rt.block_on(async { + // Get sender/receiver. + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + spec, + Protocol::Tcp, + false, + None, + ) + .await; + + // Dummy STATUS RPC request. + let rpc_request = RequestType::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // Dummy STATUS RPC response. + let rpc_response = Response::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::zero(), + finalized_epoch: Epoch::new(1), + head_root: Hash256::zero(), + head_slot: Slot::new(1), + }); + + // Number of requests. + const REQUESTS: u8 = 10; + + // Build the sender future. + let sender_future = async { + let mut response_received = 0; + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(log, "Sending RPC request"); + // Send requests in quick succession to intentionally trigger request queueing in the self-limiter. + for _ in 0..REQUESTS { + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + } + } + NetworkEvent::ResponseReceived { response, .. } => { + debug!(log, "Sender received response"; "response" => ?response); + if matches!(response, Response::Status(_)) { + response_received += 1; + } + } + NetworkEvent::RPCFailed { + id: _, + peer_id: _, + error, + } => panic!("RPC failed: {:?}", error), + _ => {} + } + + if response_received == REQUESTS { + return; + } + } + }; + + // Build the receiver future. + let receiver_future = async { + let mut received_requests = vec![]; + loop { + tokio::select! { + event = receiver.next_event() => { + if let NetworkEvent::RequestReceived { peer_id, id, request } = event { + debug!(log, "Receiver received request"; "request" => ?request); + if matches!(request.r#type, RequestType::Status(_)) { + received_requests.push((peer_id, id, request.id)); + } + } + } + // Introduce a delay in sending responses to trigger request queueing on the sender side. + _ = sleep(Duration::from_secs(3)) => { + for (peer_id, id, request_id) in received_requests.drain(..) { + receiver.send_response(peer_id, id, request_id, rpc_response.clone()); + } + } + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +}