Skip to content

Commit

Permalink
loadtest for the gossipnet endpoint (#132)
Browse files Browse the repository at this point in the history
Implemented a utility for saturating non-authorized throughput of a
node. It should be executed against a real node, so that a load on the
system can be observed.
  • Loading branch information
pompon0 authored Jun 27, 2024
1 parent 4fa8121 commit c18f603
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 8 deletions.
8 changes: 7 additions & 1 deletion node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
use zksync_protobuf::kB;

pub use network::RpcConfig;

mod io;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -58,6 +60,10 @@ pub struct Config {
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub gossip_static_outbound: HashMap<node::PublicKey, net::Host>,
/// RPC rate limits config.
/// Use `RpcConfig::default()` for defaults.
pub rpc: RpcConfig,

/// Http debug page configuration.
/// If None, debug page is disabled
pub debug_page: Option<http::DebugPageConfig>,
Expand Down Expand Up @@ -106,7 +112,7 @@ impl Executor {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
rpc: network::RpcConfig::default(),
rpc: self.config.rpc.clone(),
}
}

Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn config(cfg: &network::Config) -> Config {
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
rpc: cfg.rpc.clone(),
debug_page: None,
}
}
Expand Down
183 changes: 183 additions & 0 deletions node/actors/network/src/gossip/loadtest/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//! Loadtest of the gossip endpoint of a node.
use crate::{gossip, mux, noise, preface, rpc, GossipConfig};
use anyhow::Context as _;
use async_trait::async_trait;
use rand::Rng;
use zksync_concurrency::{ctx, error::Wrap as _, limiter, net, scope, sync, time};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStoreState;
use zksync_protobuf::kB;

#[cfg(test)]
mod tests;

struct PushBlockStoreStateServer(sync::watch::Sender<Option<BlockStoreState>>);

impl<'a> PushBlockStoreStateServer {
fn new() -> Self {
Self(sync::watch::channel(None).0)
}

/// Waits until the peer tells us which blocks it has.
/// Guaranteed to return a nonempty range of `BlockNumber`s.
async fn wait_for_peer(
&self,
ctx: &ctx::Ctx,
) -> ctx::OrCanceled<std::ops::Range<validator::BlockNumber>> {
let sub = &mut self.0.subscribe();
// unwraps are safe, because we have checked that in `wait_for`.
let state =
sync::wait_for(ctx, sub, |s| (|| s.as_ref()?.last.as_ref())().is_some()).await?;
let state = state.as_ref().unwrap();
Ok(state.first..state.last.as_ref().unwrap().header().number + 1)
}
}

#[async_trait]
impl rpc::Handler<rpc::push_block_store_state::Rpc> for &PushBlockStoreStateServer {
fn max_req_size(&self) -> usize {
10 * kB
}
async fn handle(
&self,
_ctx: &ctx::Ctx,
req: rpc::push_block_store_state::Req,
) -> anyhow::Result<()> {
self.0.send_replace(Some(req.0));
Ok(())
}
}

/// Traffic pattern to generate.
pub enum TrafficPattern {
/// Fetch always a random available blocks
/// Bypasess all the caches that a node can have.
/// This is an adversary traffic pattern.
Random,
/// Fetch blocks sequentially starting from a random one.
/// Bypasses era-consensus cache, but is potentially DB friendly.
/// This is the expected traffic pattern for the nodes that are syncing up.
Sequential,
/// Fetch always the latest available block.
/// Hits the era-consensus cache - it is an in-memory query.
/// This is the expected traffic pattern for the nodes that are up to date.
Latest,
}

/// Loadtest saturating the unauthenticated gossip connections of a peer
/// and spamming it with maximal get_blocks throughput.
pub struct Loadtest {
/// Address of the peer to spam.
pub addr: net::Host,
/// Key of the peer to spam.
pub peer: node::PublicKey,
/// Genesis of the chain.
pub genesis: validator::Genesis,
/// Traffic pattern to generate.
pub traffic_pattern: TrafficPattern,
/// Channel to send the received responses to.
pub output: Option<ctx::channel::Sender<Option<validator::FinalBlock>>>,
}

impl Loadtest {
async fn connect(&self, ctx: &ctx::Ctx) -> ctx::Result<noise::Stream> {
let addr = *self
.addr
.resolve(ctx)
.await?
.context("resolve()")?
.first()
.context("resolution failed")?;
let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet)
.await
.context("connect()")?;
let cfg = GossipConfig {
key: node::SecretKey::generate(),
dynamic_inbound_limit: 0,
static_inbound: [].into(),
static_outbound: [].into(),
};
gossip::handshake::outbound(ctx, &cfg, self.genesis.hash(), &mut stream, &self.peer)
.await
.context("handshake")?;
Ok(stream)
}

async fn spam(&self, ctx: &ctx::Ctx, stream: noise::Stream) -> ctx::Result<()> {
let push_block_store_state_server = PushBlockStoreStateServer::new();
let get_block_client = rpc::Client::<rpc::get_block::Rpc>::new(ctx, limiter::Rate::INF);
let mut rng = ctx.rng();
scope::run!(ctx, |ctx, s| async {
let service = rpc::Service::new()
.add_client(&get_block_client)
// We listen to PushBlockStore messages to know which blocks we can fetch.
.add_server(ctx, &push_block_store_state_server, limiter::Rate::INF)
// We respond to pings, so that peer considers connection healthy.
.add_server(ctx, rpc::ping::Server, limiter::Rate::INF);
s.spawn(async {
match service.run(ctx, stream).await {
// Peer didn't want to talk with us.
// This allows to avoid "EarlyEof" errors from being logged all the time.
Ok(()) | Err(mux::RunError::Protocol(_)) => Ok(()),
Err(err) => Err(err.into()),
}
});
let mut next = validator::BlockNumber(0);
loop {
let call = get_block_client.reserve(ctx).await?;
let range = push_block_store_state_server.wait_for_peer(ctx).await?;
let mut sample =
|| validator::BlockNumber(rng.gen_range(range.start.0..range.end.0));
match self.traffic_pattern {
// unwrap is safe, because the range is guaranteed to be non-empty by
// `wait_for_peer`.
TrafficPattern::Latest => next = range.end.prev().unwrap(),
TrafficPattern::Random => next = sample(),
TrafficPattern::Sequential => {
next = next + 1;
if !range.contains(&next) {
next = sample();
}
}
}
let req = rpc::get_block::Req(next);
s.spawn(async {
let req = req;
let resp = call.call(ctx, &req, usize::MAX).await.wrap("call")?;
if let Some(send) = &self.output {
send.send(ctx, resp.0).await?;
}
Ok(())
});
}
})
.await
}

/// Run the loadtest.
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let res = scope::run!(ctx, |ctx, s| async {
loop {
match self.connect(ctx).await {
Ok(stream) => {
s.spawn(async {
if let Err(err) = self.spam(ctx, stream).await {
tracing::warn!("spam(): {err:#}");
}
Ok(())
});
}
Err(err) => {
tracing::warn!("connect(): {err:#}");
ctx.sleep(time::Duration::seconds(10)).await?;
}
}
}
})
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}
}
77 changes: 77 additions & 0 deletions node/actors/network/src/gossip/loadtest/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use super::*;
use crate::testonly;
use zksync_concurrency::{ctx, scope, sync, testonly::abort_on_panic};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::testonly::TestMemoryStorage;

#[tokio::test]
async fn test_loadtest() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();

let mut setup = validator::testonly::Setup::new(rng, 1);
setup.push_blocks(rng, 10);
let mut cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
cfg.gossip.dynamic_inbound_limit = 7;

scope::run!(ctx, |ctx, s| async {
// Spawn the node.
let stores = TestMemoryStorage::new(ctx, &setup.genesis).await;
s.spawn_bg(stores.runner.run(ctx));
let (node, runner) =
testonly::Instance::new(cfg.clone(), stores.blocks.clone(), stores.batches.clone());
s.spawn_bg(runner.run(ctx));

// Fill the storage with some blocks.
for b in &setup.blocks {
stores
.blocks
.queue_block(ctx, b.clone())
.await
.context("queue_block()")?;
}

let (send, recv) = ctx::channel::bounded(10);

// Run the loadtest.
s.spawn_bg(async {
Loadtest {
addr: cfg.public_addr.clone(),
peer: cfg.gossip.key.public(),
genesis: setup.genesis.clone(),
traffic_pattern: TrafficPattern::Random,
output: Some(send),
}
.run(ctx)
.await?;
Ok(())
});

s.spawn(async {
// Wait for a bunch of blocks to be received.
let mut recv = recv;
let mut count = 0;
while count < 100 {
// Count only responses with actual blocks.
if recv.recv(ctx).await?.is_some() {
count += 1;
}
}
Ok(())
});
s.spawn(async {
// Wait for the inbound connections to get saturated.
let node = node;
let sub = &mut node.net.gossip.inbound.subscribe();
sync::wait_for(ctx, sub, |pool| {
pool.current().len() == cfg.gossip.dynamic_inbound_limit
})
.await?;
Ok(())
});
Ok(())
})
.await
.unwrap();
}
1 change: 1 addition & 0 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use zksync_consensus_storage::{BatchStore, BlockStore};
mod batch_votes;
mod fetch;
mod handshake;
pub mod loadtest;
mod runner;
#[cfg(test)]
mod testonly;
Expand Down
9 changes: 9 additions & 0 deletions node/actors/network/src/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ pub(crate) enum RunError {
IO(#[from] io::Error),
}

impl From<RunError> for ctx::Error {
fn from(err: RunError) -> Self {
match err {
RunError::Canceled(err) => Self::Canceled(err),
err => Self::Internal(err.into()),
}
}
}

impl Mux {
async fn spawn_streams<'env>(
&self,
Expand Down
10 changes: 3 additions & 7 deletions node/actors/network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ impl<R: Rpc> ReservedCall<R> {
ctx: &ctx::Ctx,
req: &R::Req,
max_resp_size: usize,
) -> anyhow::Result<R::Resp> {
) -> ctx::Result<R::Resp> {
let send_time = ctx.now();
let mut stream = self.stream.open(ctx).await??;
let mut stream = self.stream.open(ctx).await?.context("open()")?;
let res = async {
let metric_labels = CallType::Client.to_labels::<R>(req);
let _guard = RPC_METRICS.inflight[&metric_labels].inc_guard(1);
Expand Down Expand Up @@ -148,11 +148,7 @@ impl<R: Rpc> Client<R> {
req: &R::Req,
max_resp_size: usize,
) -> ctx::Result<R::Resp> {
Ok(self
.reserve(ctx)
.await?
.call(ctx, req, max_resp_size)
.await?)
self.reserve(ctx).await?.call(ctx, req, max_resp_size).await
}
}

Expand Down
1 change: 1 addition & 0 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ impl Configs {
gossip_static_inbound: self.app.gossip_static_inbound.clone(),
gossip_static_outbound: self.app.gossip_static_outbound.clone(),
max_payload_size: self.app.max_payload_size,
rpc: executor::RpcConfig::default(),
debug_page: self.app.debug_page.as_ref().map(|debug_page_config| {
http::DebugPageConfig {
addr: debug_page_config.addr,
Expand Down

0 comments on commit c18f603

Please sign in to comment.