Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: loadtest improvements #223

Merged
merged 7 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/loadtests/ansible/playbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
mode: "0755"

- name: Start executor binary in background
shell: "nohup ./executor > /var/log/zksync-bft.log 2>&1 &"
shell: "nohup ./executor --database=./database > /var/log/zksync-bft.log 2>&1 &"
args:
chdir: /etc/zksync-bft/
environment:
Expand Down
38 changes: 28 additions & 10 deletions node/components/bft/src/chonky_bft/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,25 @@ impl StateMachine {
let message = &signed_message.msg;
let author = &signed_message.key;

// If the replica is already in this view (and the message is NOT from the leader), then ignore it.
// See `start_new_view()` for explanation why we need to process the message from the
// leader.
if message.view().number < self.view_number
|| (message.view().number == self.view_number
&& author != &self.config.genesis().view_leader(self.view_number))
{
return Err(Error::Old {
current_view: self.view_number,
});
}

// Check that the message signer is in the validator committee.
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone().into(),
});
}

// If the message is from a past view, ignore it.
if message.view().number < self.view_number {
return Err(Error::Old {
current_view: self.view_number,
});
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
Expand Down Expand Up @@ -118,9 +123,23 @@ impl StateMachine {
) -> ctx::Result<()> {
// Update the state machine.
self.view_number = view;
metrics::METRICS.replica_view_number.set(self.view_number.0);
self.phase = validator::Phase::Prepare;
// It is important that the proposal and new_view messages from the leader
// will contain the same justification.
// Proposal cannot be produced before previous block is processed,
// therefore leader needs to make sure that the high_commit_qc is delivered
// to all replicas, so that the finalized block is distributed over the network.
// In particular it is not guaranteed that the leader has the finalized block when
// sending the NewView, so it might need to wait for the finalized block.
//
// Note that for this process to work e2e, the replicas should NOT ignore the NewView from
// the leader, even if they already advanced to the given view.
// Note that the order of NewView and proposal messages doesn't matter, because
// proposal is a superset of NewView message.
let justification = self.get_justification();
self.proposer_sender
.send(Some(self.get_justification()))
.send(Some(justification.clone()))
.expect("justification_watch.send() failed");

// Clear the block proposal cache.
Expand All @@ -139,15 +158,14 @@ impl StateMachine {
.secret_key
.sign_msg(validator::ConsensusMsg::ReplicaNewView(
validator::ReplicaNewView {
justification: self.get_justification(),
justification: justification.clone(),
},
)),
};
self.outbound_channel.send(output_message);

// Log the event and update the metrics.
tracing::info!("Starting view {}", self.view_number);
metrics::METRICS.replica_view_number.set(self.view_number.0);
let now = ctx.now();
metrics::METRICS
.view_latency
Expand Down
33 changes: 17 additions & 16 deletions node/components/bft/src/chonky_bft/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ impl UTHarness {
self.try_recv().unwrap().msg
}

pub(crate) async fn new_replica_new_view(&self) -> validator::ReplicaNewView {
let justification = self.replica.get_justification();
validator::ReplicaNewView { justification }
pub(crate) async fn new_replica_new_view(
&mut self,
ctx: &ctx::Ctx,
) -> validator::ReplicaNewView {
validator::ReplicaNewView {
justification: validator::ProposalJustification::Timeout(
self.new_timeout_qc(ctx).await,
),
}
}

pub(crate) async fn new_commit_qc(
Expand All @@ -169,19 +175,14 @@ impl UTHarness {
qc
}

// #[allow(dead_code)]
// pub(crate) fn new_timeout_qc(
// &mut self,
// mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout),
// ) -> validator::TimeoutQC {
// let mut msg = self.new_replica_timeout();
// mutate_fn(&mut msg);
// let mut qc = validator::TimeoutQC::new(msg.view.clone());
// for key in &self.keys {
// qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap();
// }
// qc
// }
pub(crate) async fn new_timeout_qc(&mut self, ctx: &ctx::Ctx) -> validator::TimeoutQC {
let msg = self.new_replica_timeout(ctx).await;
let mut qc = validator::TimeoutQC::new(msg.view);
for key in &self.keys {
qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap();
}
qc
}

pub(crate) async fn process_leader_proposal(
&mut self,
Expand Down
18 changes: 12 additions & 6 deletions node/components/bft/src/chonky_bft/tests/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async fn new_view_non_validator_signer() {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let replica_new_view = util.new_replica_new_view().await;
let replica_new_view = util.new_replica_new_view(ctx).await;
let non_validator_key: validator::SecretKey = ctx.rng().gen();
let res = util
.process_replica_new_view(ctx, non_validator_key.sign_msg(replica_new_view))
Expand All @@ -141,13 +141,19 @@ async fn replica_new_view_old() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
let (mut util, runner) = UTHarness::new(ctx, 2).await;
s.spawn_bg(runner.run(ctx));

let replica_new_view = util.new_replica_new_view().await;
util.produce_block(ctx).await;
let replica_new_view = util.new_replica_new_view(ctx).await;
// Sign the messages with non-leader key.
let replica_new_view = util.keys[1].sign_msg(replica_new_view);

// Process new_view twice. The second time it shouldn't be accepted.
util.process_replica_new_view(ctx, replica_new_view.clone())
.await
.unwrap();
let res = util
.process_replica_new_view(ctx, util.owner_key().sign_msg(replica_new_view))
.process_replica_new_view(ctx, replica_new_view.clone())
.await;

assert_matches!(
Expand All @@ -171,7 +177,7 @@ async fn new_view_invalid_sig() {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let msg = util.new_replica_new_view().await;
let msg = util.new_replica_new_view(ctx).await;
let mut replica_new_view = util.owner_key().sign_msg(msg);
replica_new_view.sig = ctx.rng().gen();

Expand Down
3 changes: 1 addition & 2 deletions node/components/bft/src/testonly/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::{testonly, FromNetworkMessage, PayloadManager, ToNetworkMessage};
use anyhow::Context as _;
use std::sync::Arc;
use zksync_concurrency::time;
use zksync_concurrency::{ctx, ctx::channel, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, scope, sync, time};
use zksync_consensus_network as network;
use zksync_consensus_storage as storage;
use zksync_consensus_storage::testonly::in_memory;
Expand Down
17 changes: 1 addition & 16 deletions node/components/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,10 @@ pub struct RpcConfig {
pub push_validator_addrs_rate: limiter::Rate,
/// Max rate of sending/receiving push_block_store_state messages.
pub push_block_store_state_rate: limiter::Rate,
/// Max rate of sending/receiving push_batch_store_state messages.
pub push_batch_store_state_rate: limiter::Rate,
/// Max rate of sending/receiving `get_block` RPCs.
pub get_block_rate: limiter::Rate,
/// Max rate of sending/receiving `get_batch` RPCs.
pub get_batch_rate: limiter::Rate,
/// Timeout for the `get_block` RPC.
pub get_block_timeout: Option<time::Duration>,
/// Timeout for the `get_batch` RPC.
pub get_batch_timeout: Option<time::Duration>,
/// Max rate of sending/receiving consensus messages.
pub consensus_rate: limiter::Rate,
/// Max rate of sending/receiving PushBatchVotes RPCs.
Expand All @@ -39,22 +33,13 @@ impl Default for RpcConfig {
},
push_block_store_state_rate: limiter::Rate {
burst: 2,
refresh: time::Duration::milliseconds(300),
},
push_batch_store_state_rate: limiter::Rate {
burst: 2,
refresh: time::Duration::milliseconds(300),
refresh: time::Duration::milliseconds(200),
},
get_block_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
get_batch_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
get_block_timeout: Some(time::Duration::seconds(10)),
get_batch_timeout: Some(time::Duration::seconds(10)),
consensus_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::ZERO,
Expand Down
2 changes: 1 addition & 1 deletion node/components/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub(crate) struct Network {
impl rpc::Handler<rpc::consensus::Rpc> for &Network {
/// Here we bound the buffering of incoming consensus messages.
fn max_req_size(&self) -> usize {
self.gossip.cfg.max_block_size.saturating_add(kB)
self.gossip.cfg.max_block_size.saturating_add(100 * kB)
}

async fn handle(
Expand Down
4 changes: 3 additions & 1 deletion node/libs/concurrency/src/ctx/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ pub struct RealClock;
impl RealClock {
/// Current time according to the monotone clock.
pub fn now(&self) -> time::Instant {
time::Instant::now()
// We use `now()` from tokio, so that `tokio::time::pause()`
// works in tests.
tokio::time::Instant::now().into_std().into()
}
/// Current time according to the system/walltime clock.
pub fn now_utc(&self) -> time::Utc {
Expand Down
27 changes: 26 additions & 1 deletion node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct BlockStoreInner {
genesis: validator::Genesis,
persisted: sync::watch::Sender<BlockStoreState>,
blocks: Mutex<VecDeque<validator::Block>>,
capacity: Option<usize>,
pregenesis_blocks: HashMap<validator::BlockNumber, validator::PreGenesisBlock>,
}

Expand All @@ -39,6 +40,7 @@ impl BlockStore {
genesis: setup.genesis.clone(),
persisted: sync::watch::channel(BlockStoreState { first, last: None }).0,
blocks: Mutex::default(),
capacity: None,
pregenesis_blocks: setup
.blocks
.iter()
Expand All @@ -50,6 +52,22 @@ impl BlockStore {
}))
}

/// New bounded storage. Old blocks get GC'ed onse the storage capacity is full.
pub fn bounded(
genesis: validator::Genesis,
first: validator::BlockNumber,
capacity: usize,
) -> Self {
assert!(genesis.first_block <= first);
Self(Arc::new(BlockStoreInner {
genesis,
persisted: sync::watch::channel(BlockStoreState { first, last: None }).0,
blocks: Mutex::default(),
capacity: Some(capacity),
pregenesis_blocks: [].into(),
}))
}

/// Truncates the storage to blocks `>=first`.
pub fn truncate(&mut self, first: validator::BlockNumber) {
let mut blocks = self.0.blocks.lock().unwrap();
Expand Down Expand Up @@ -114,9 +132,16 @@ impl PersistentBlockStore for BlockStore {
// It may happen that a block gets fetched which is not needed any more.
return Ok(());
}
if let Some(c) = self.0.capacity {
if blocks.len() >= c {
blocks.pop_front();
}
}
if block.number() > want {
// Blocks should be stored in order though.
return Err(anyhow::anyhow!("got block {:?}, want {want:?}", block.number()).into());
return Err(
anyhow::format_err!("got block {:?}, want {want:?}", block.number()).into(),
);
}
self.0
.persisted
Expand Down
2 changes: 1 addition & 1 deletion node/tools/src/bin/localnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn main() -> anyhow::Result<()> {
.metrics_server_port
.map(|port| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port)),
genesis: setup.genesis.clone(),
max_payload_size: 1000000,
max_payload_size: args.payload_size,
view_timeout: time::Duration::milliseconds(2000),
node_key: node_keys[i].clone(),
validator_key: validator_keys.get(i).cloned(),
Expand Down
33 changes: 28 additions & 5 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, Text
use zksync_consensus_executor::{self as executor, attestation};
use zksync_consensus_network as network;
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BlockStore, BlockStoreRunner};
use zksync_consensus_storage::{
testonly::in_memory, BlockStore, BlockStoreRunner, PersistentBlockStore, ReplicaStore,
};
use zksync_protobuf::{
read_optional, read_optional_repr, read_required, required, ProtoFmt, ProtoRepr,
};
Expand Down Expand Up @@ -251,16 +253,37 @@ pub struct DebugPage {
#[derive(Debug)]
pub struct Configs {
pub app: App,
pub database: PathBuf,
pub database: Option<PathBuf>,
}

impl Configs {
pub async fn make_executor(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<(executor::Executor, BlockStoreRunner)> {
let store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?;
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())).await?;
struct Stores {
block: Box<dyn PersistentBlockStore>,
replica: Box<dyn ReplicaStore>,
}
let stores = if let Some(path) = &self.database {
let store = store::RocksDB::open(self.app.genesis.clone(), path).await?;
Stores {
block: Box::new(store.clone()),
replica: Box::new(store),
}
} else {
let block = in_memory::BlockStore::bounded(
self.app.genesis.clone(),
self.app.genesis.first_block,
200,
);
let replica = in_memory::ReplicaStore::default();
Stores {
block: Box::new(block),
replica: Box::new(replica),
}
};
let (block_store, runner) = BlockStore::new(ctx, stores.block).await?;
let attestation = Arc::new(attestation::Controller::new(self.app.attester_key.clone()));

let e = executor::Executor {
Expand Down Expand Up @@ -308,7 +331,7 @@ impl Configs {
.as_ref()
.map(|key| executor::Validator {
key: key.clone(),
replica_store: Box::new(store.clone()),
replica_store: stores.replica,
payload_manager: Box::new(bft::testonly::RandomPayload(
self.app.max_payload_size,
)),
Expand Down
5 changes: 3 additions & 2 deletions node/tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ struct Cli {
#[arg(long, conflicts_with = "config_path")]
config: Option<String>,
/// Path to the rocksdb database of the node.
#[arg(long, default_value = "./database")]
database: PathBuf,
/// If not provided, an in-memory database will be used instead.
#[arg(long)]
database: Option<PathBuf>,
}

impl Cli {
Expand Down
Loading