From c18f603500c5bb5d1a1b4aff96f2fae13641d3fe Mon Sep 17 00:00:00 2001 From: pompon0 Date: Thu, 27 Jun 2024 18:08:46 +0200 Subject: [PATCH] loadtest for the gossipnet endpoint (#132) Implemented a utility for saturating non-authorized throughput of a node. It should be executed against a real node, so that a load on the system can be observed. --- node/actors/executor/src/lib.rs | 8 +- node/actors/executor/src/tests.rs | 1 + .../actors/network/src/gossip/loadtest/mod.rs | 183 ++++++++++++++++++ .../network/src/gossip/loadtest/tests.rs | 77 ++++++++ node/actors/network/src/gossip/mod.rs | 1 + node/actors/network/src/mux/mod.rs | 9 + node/actors/network/src/rpc/mod.rs | 10 +- node/tools/src/config.rs | 1 + 8 files changed, 282 insertions(+), 8 deletions(-) create mode 100644 node/actors/network/src/gossip/loadtest/mod.rs create mode 100644 node/actors/network/src/gossip/loadtest/tests.rs diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index aaf8c5a7..bfa5be6a 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -14,6 +14,8 @@ use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore}; use zksync_consensus_utils::pipe; use zksync_protobuf::kB; +pub use network::RpcConfig; + mod io; #[cfg(test)] mod tests; @@ -58,6 +60,10 @@ pub struct Config { /// Outbound connections that the node should actively try to /// establish and maintain. pub gossip_static_outbound: HashMap, + /// RPC rate limits config. + /// Use `RpcConfig::default()` for defaults. + pub rpc: RpcConfig, + /// Http debug page configuration. /// If None, debug page is disabled pub debug_page: Option, @@ -106,7 +112,7 @@ impl Executor { burst: 10, refresh: time::Duration::milliseconds(100), }, - rpc: network::RpcConfig::default(), + rpc: self.config.rpc.clone(), } } diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index d0245e3a..2781e613 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -20,6 +20,7 @@ fn config(cfg: &network::Config) -> Config { gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, gossip_static_inbound: cfg.gossip.static_inbound.clone(), gossip_static_outbound: cfg.gossip.static_outbound.clone(), + rpc: cfg.rpc.clone(), debug_page: None, } } diff --git a/node/actors/network/src/gossip/loadtest/mod.rs b/node/actors/network/src/gossip/loadtest/mod.rs new file mode 100644 index 00000000..b54098ce --- /dev/null +++ b/node/actors/network/src/gossip/loadtest/mod.rs @@ -0,0 +1,183 @@ +//! Loadtest of the gossip endpoint of a node. +use crate::{gossip, mux, noise, preface, rpc, GossipConfig}; +use anyhow::Context as _; +use async_trait::async_trait; +use rand::Rng; +use zksync_concurrency::{ctx, error::Wrap as _, limiter, net, scope, sync, time}; +use zksync_consensus_roles::{node, validator}; +use zksync_consensus_storage::BlockStoreState; +use zksync_protobuf::kB; + +#[cfg(test)] +mod tests; + +struct PushBlockStoreStateServer(sync::watch::Sender>); + +impl<'a> PushBlockStoreStateServer { + fn new() -> Self { + Self(sync::watch::channel(None).0) + } + + /// Waits until the peer tells us which blocks it has. + /// Guaranteed to return a nonempty range of `BlockNumber`s. + async fn wait_for_peer( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled> { + let sub = &mut self.0.subscribe(); + // unwraps are safe, because we have checked that in `wait_for`. + let state = + sync::wait_for(ctx, sub, |s| (|| s.as_ref()?.last.as_ref())().is_some()).await?; + let state = state.as_ref().unwrap(); + Ok(state.first..state.last.as_ref().unwrap().header().number + 1) + } +} + +#[async_trait] +impl rpc::Handler for &PushBlockStoreStateServer { + fn max_req_size(&self) -> usize { + 10 * kB + } + async fn handle( + &self, + _ctx: &ctx::Ctx, + req: rpc::push_block_store_state::Req, + ) -> anyhow::Result<()> { + self.0.send_replace(Some(req.0)); + Ok(()) + } +} + +/// Traffic pattern to generate. +pub enum TrafficPattern { + /// Fetch always a random available blocks + /// Bypasess all the caches that a node can have. + /// This is an adversary traffic pattern. + Random, + /// Fetch blocks sequentially starting from a random one. + /// Bypasses era-consensus cache, but is potentially DB friendly. + /// This is the expected traffic pattern for the nodes that are syncing up. + Sequential, + /// Fetch always the latest available block. + /// Hits the era-consensus cache - it is an in-memory query. + /// This is the expected traffic pattern for the nodes that are up to date. + Latest, +} + +/// Loadtest saturating the unauthenticated gossip connections of a peer +/// and spamming it with maximal get_blocks throughput. +pub struct Loadtest { + /// Address of the peer to spam. + pub addr: net::Host, + /// Key of the peer to spam. + pub peer: node::PublicKey, + /// Genesis of the chain. + pub genesis: validator::Genesis, + /// Traffic pattern to generate. + pub traffic_pattern: TrafficPattern, + /// Channel to send the received responses to. + pub output: Option>>, +} + +impl Loadtest { + async fn connect(&self, ctx: &ctx::Ctx) -> ctx::Result { + let addr = *self + .addr + .resolve(ctx) + .await? + .context("resolve()")? + .first() + .context("resolution failed")?; + let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet) + .await + .context("connect()")?; + let cfg = GossipConfig { + key: node::SecretKey::generate(), + dynamic_inbound_limit: 0, + static_inbound: [].into(), + static_outbound: [].into(), + }; + gossip::handshake::outbound(ctx, &cfg, self.genesis.hash(), &mut stream, &self.peer) + .await + .context("handshake")?; + Ok(stream) + } + + async fn spam(&self, ctx: &ctx::Ctx, stream: noise::Stream) -> ctx::Result<()> { + let push_block_store_state_server = PushBlockStoreStateServer::new(); + let get_block_client = rpc::Client::::new(ctx, limiter::Rate::INF); + let mut rng = ctx.rng(); + scope::run!(ctx, |ctx, s| async { + let service = rpc::Service::new() + .add_client(&get_block_client) + // We listen to PushBlockStore messages to know which blocks we can fetch. + .add_server(ctx, &push_block_store_state_server, limiter::Rate::INF) + // We respond to pings, so that peer considers connection healthy. + .add_server(ctx, rpc::ping::Server, limiter::Rate::INF); + s.spawn(async { + match service.run(ctx, stream).await { + // Peer didn't want to talk with us. + // This allows to avoid "EarlyEof" errors from being logged all the time. + Ok(()) | Err(mux::RunError::Protocol(_)) => Ok(()), + Err(err) => Err(err.into()), + } + }); + let mut next = validator::BlockNumber(0); + loop { + let call = get_block_client.reserve(ctx).await?; + let range = push_block_store_state_server.wait_for_peer(ctx).await?; + let mut sample = + || validator::BlockNumber(rng.gen_range(range.start.0..range.end.0)); + match self.traffic_pattern { + // unwrap is safe, because the range is guaranteed to be non-empty by + // `wait_for_peer`. + TrafficPattern::Latest => next = range.end.prev().unwrap(), + TrafficPattern::Random => next = sample(), + TrafficPattern::Sequential => { + next = next + 1; + if !range.contains(&next) { + next = sample(); + } + } + } + let req = rpc::get_block::Req(next); + s.spawn(async { + let req = req; + let resp = call.call(ctx, &req, usize::MAX).await.wrap("call")?; + if let Some(send) = &self.output { + send.send(ctx, resp.0).await?; + } + Ok(()) + }); + } + }) + .await + } + + /// Run the loadtest. + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let res = scope::run!(ctx, |ctx, s| async { + loop { + match self.connect(ctx).await { + Ok(stream) => { + s.spawn(async { + if let Err(err) = self.spam(ctx, stream).await { + tracing::warn!("spam(): {err:#}"); + } + Ok(()) + }); + } + Err(err) => { + tracing::warn!("connect(): {err:#}"); + ctx.sleep(time::Duration::seconds(10)).await?; + } + } + } + }) + .await; + match res { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } + } +} diff --git a/node/actors/network/src/gossip/loadtest/tests.rs b/node/actors/network/src/gossip/loadtest/tests.rs new file mode 100644 index 00000000..b4c40be9 --- /dev/null +++ b/node/actors/network/src/gossip/loadtest/tests.rs @@ -0,0 +1,77 @@ +use super::*; +use crate::testonly; +use zksync_concurrency::{ctx, scope, sync, testonly::abort_on_panic}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::testonly::TestMemoryStorage; + +#[tokio::test] +async fn test_loadtest() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 1); + setup.push_blocks(rng, 10); + let mut cfg = testonly::new_configs(rng, &setup, 0)[0].clone(); + cfg.gossip.dynamic_inbound_limit = 7; + + scope::run!(ctx, |ctx, s| async { + // Spawn the node. + let stores = TestMemoryStorage::new(ctx, &setup.genesis).await; + s.spawn_bg(stores.runner.run(ctx)); + let (node, runner) = + testonly::Instance::new(cfg.clone(), stores.blocks.clone(), stores.batches.clone()); + s.spawn_bg(runner.run(ctx)); + + // Fill the storage with some blocks. + for b in &setup.blocks { + stores + .blocks + .queue_block(ctx, b.clone()) + .await + .context("queue_block()")?; + } + + let (send, recv) = ctx::channel::bounded(10); + + // Run the loadtest. + s.spawn_bg(async { + Loadtest { + addr: cfg.public_addr.clone(), + peer: cfg.gossip.key.public(), + genesis: setup.genesis.clone(), + traffic_pattern: TrafficPattern::Random, + output: Some(send), + } + .run(ctx) + .await?; + Ok(()) + }); + + s.spawn(async { + // Wait for a bunch of blocks to be received. + let mut recv = recv; + let mut count = 0; + while count < 100 { + // Count only responses with actual blocks. + if recv.recv(ctx).await?.is_some() { + count += 1; + } + } + Ok(()) + }); + s.spawn(async { + // Wait for the inbound connections to get saturated. + let node = node; + let sub = &mut node.net.gossip.inbound.subscribe(); + sync::wait_for(ctx, sub, |pool| { + pool.current().len() == cfg.gossip.dynamic_inbound_limit + }) + .await?; + Ok(()) + }); + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index b7fe09e3..905c7806 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -27,6 +27,7 @@ use zksync_consensus_storage::{BatchStore, BlockStore}; mod batch_votes; mod fetch; mod handshake; +pub mod loadtest; mod runner; #[cfg(test)] mod testonly; diff --git a/node/actors/network/src/mux/mod.rs b/node/actors/network/src/mux/mod.rs index b8b4d0c3..9256045c 100644 --- a/node/actors/network/src/mux/mod.rs +++ b/node/actors/network/src/mux/mod.rs @@ -158,6 +158,15 @@ pub(crate) enum RunError { IO(#[from] io::Error), } +impl From for ctx::Error { + fn from(err: RunError) -> Self { + match err { + RunError::Canceled(err) => Self::Canceled(err), + err => Self::Internal(err.into()), + } + } +} + impl Mux { async fn spawn_streams<'env>( &self, diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 8b28bf73..8c4d6d7f 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -88,9 +88,9 @@ impl ReservedCall { ctx: &ctx::Ctx, req: &R::Req, max_resp_size: usize, - ) -> anyhow::Result { + ) -> ctx::Result { let send_time = ctx.now(); - let mut stream = self.stream.open(ctx).await??; + let mut stream = self.stream.open(ctx).await?.context("open()")?; let res = async { let metric_labels = CallType::Client.to_labels::(req); let _guard = RPC_METRICS.inflight[&metric_labels].inc_guard(1); @@ -148,11 +148,7 @@ impl Client { req: &R::Req, max_resp_size: usize, ) -> ctx::Result { - Ok(self - .reserve(ctx) - .await? - .call(ctx, req, max_resp_size) - .await?) + self.reserve(ctx).await?.call(ctx, req, max_resp_size).await } } diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 511361f0..3ed82d7c 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -257,6 +257,7 @@ impl Configs { gossip_static_inbound: self.app.gossip_static_inbound.clone(), gossip_static_outbound: self.app.gossip_static_outbound.clone(), max_payload_size: self.app.max_payload_size, + rpc: executor::RpcConfig::default(), debug_page: self.app.debug_page.as_ref().map(|debug_page_config| { http::DebugPageConfig { addr: debug_page_config.addr,