Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadtest for the gossipnet endpoint #132

Merged
merged 13 commits into from
Jun 27, 2024
Prev Previous commit
Next Next commit
random block, less logs
  • Loading branch information
pompon0 committed Jun 24, 2024
commit 83b6eb8aefbd54254ac5bdca3e0ddfd4e930492e
51 changes: 31 additions & 20 deletions node/actors/network/src/gossip/loadtest/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! Loadtest of the gossip endpoint of a node.
use crate::gossip;
use crate::mux;
use crate::noise;
use crate::preface;
use crate::rpc;
use crate::GossipConfig;
use anyhow::Context as _;
use async_trait::async_trait;
use rand::Rng;
use zksync_concurrency::{ctx, error::Wrap as _, limiter, net, scope, sync, time};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStoreState;
Expand All @@ -21,25 +23,27 @@ impl<'a> PushBlockStoreStateServer {
Self(sync::watch::channel(None).0)
}

/// Waits until peer tells us which blocks it has and constructs a `get_block()` request for
/// one of these blocks.
async fn build_request(&self, ctx: &ctx::Ctx) -> ctx::OrCanceled<rpc::get_block::Req> {
/// Waits until the peer tells us which blocks it has and constructs a `get_block()` request for
/// a random one from these blocks.
///
/// Requesting a recent would hit the peer's in-memory cache.
/// Fetching random block is potentially more resource consuming.
/// It is also more realistic because nodes will generate the largest load
/// when being out-of-sync.
async fn build_request(
&self,
ctx: &ctx::Ctx,
rng: &mut impl Rng,
) -> ctx::OrCanceled<rpc::get_block::Req> {
let sub = &mut self.0.subscribe();
// unwraps are safe, because we have checked that in `wait_for`.
let block_number = sync::wait_for(ctx, sub, |s| (|| s.as_ref()?.last.as_ref())().is_some())
.await?
.as_ref()
.unwrap()
.last
.as_ref()
.unwrap()
.header()
.number;
// NOTE: requesting latest block will hit the peer's in-memory cache.
// More resource-consuming will be fetching a random block.
// It will be also more realistic because nodes will generate the largest load
// when being out-of-sync.
Ok(rpc::get_block::Req(block_number))
let state =
sync::wait_for(ctx, sub, |s| (|| s.as_ref()?.last.as_ref())().is_some()).await?;
let state = state.as_ref().unwrap();
let range = state.first.0..state.last.as_ref().unwrap().header().number.0;
Ok(rpc::get_block::Req(validator::BlockNumber(
rng.gen_range(range),
)))
}
}

Expand Down Expand Up @@ -98,6 +102,7 @@ impl Loadtest {
async fn spam(&self, ctx: &ctx::Ctx, stream: noise::Stream) -> ctx::Result<()> {
let push_block_store_state_server = PushBlockStoreStateServer::new();
let get_block_client = rpc::Client::<rpc::get_block::Rpc>::new(ctx, limiter::Rate::INF);
let rng = &mut ctx.rng();
scope::run!(ctx, |ctx, s| async {
let service = rpc::Service::new()
.add_client(&get_block_client)
Expand All @@ -106,12 +111,18 @@ impl Loadtest {
// We respond to pings, so that peer considers connection healthy.
.add_server(ctx, rpc::ping::Server, limiter::Rate::INF);
s.spawn(async {
service.run(ctx, stream).await.context("service.run()")?;
Ok(())
match service.run(ctx, stream).await {
// Peer didn't want to talk with us.
// This allows to avoid "EarlyEof" errors from being logged all the time.
Ok(()) | Err(mux::RunError::Protocol(_)) => Ok(()),
Err(err) => Err(err.into()),
}
});
loop {
let call = get_block_client.reserve(ctx).await?;
let req = push_block_store_state_server.build_request(ctx).await?;
let req = push_block_store_state_server
.build_request(ctx, rng)
.await?;
s.spawn(async {
let req = req;
let resp = call.call(ctx, &req, usize::MAX).await.wrap("call")?;
Expand Down
Loading