Skip to content

Commit

Permalink
added pregenesis sync to tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Sep 30, 2024
1 parent a398cf7 commit ae5b6b4
Show file tree
Hide file tree
Showing 19 changed files with 131 additions and 95 deletions.
7 changes: 7 additions & 0 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ impl Config {
anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT);
genesis.verify().context("genesis().verify()")?;

if let Some(prev) = genesis.first_block.prev() {
tracing::info!("Waiting for the pre-genesis blocks to be persisted");
if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await {
return Ok(());
}
}

let cfg = Arc::new(self);
let (leader, leader_send) = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let (replica, replica_send) =
Expand Down
15 changes: 6 additions & 9 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use zksync_concurrency::{
oneshot, scope,
};
use zksync_consensus_network::{self as network};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::{validator, validator::testonly::Setup};
use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore};
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -114,26 +114,23 @@ impl Test {
/// Run a test with the given parameters and a random network setup.
pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> Result<(), TestError> {
let rng = &mut ctx.rng();
let setup = validator::testonly::Setup::new_with_weights(
rng,
self.nodes.iter().map(|(_, w)| *w).collect(),
);
let setup = Setup::new_with_weights(rng, self.nodes.iter().map(|(_, w)| *w).collect());
let nets: Vec<_> = network::testonly::new_configs(rng, &setup, 1);
self.run_with_config(ctx, nets, &setup.genesis).await
self.run_with_config(ctx, nets, &setup).await
}

/// Run a test with the given parameters and network configuration.
pub(crate) async fn run_with_config(
&self,
ctx: &ctx::Ctx,
nets: Vec<Config>,
genesis: &validator::Genesis,
setup: &Setup,
) -> Result<(), TestError> {
let mut nodes = vec![];
let mut honest = vec![];
scope::run!(ctx, |ctx, s| async {
for (i, net) in nets.into_iter().enumerate() {
let store = TestMemoryStorage::new(ctx, genesis).await;
let store = TestMemoryStorage::new(ctx, setup).await;
s.spawn_bg(async { Ok(store.runner.run(ctx).await?) });

if self.nodes[i].0 == Behavior::Honest {
Expand All @@ -151,7 +148,7 @@ impl Test {

// Run the nodes until all honest nodes store enough finalized blocks.
assert!(self.blocks_to_finalize > 0);
let first = genesis.first_block;
let first = setup.genesis.first_block;
let last = first + (self.blocks_to_finalize as u64 - 1);
for store in &honest {
store.wait_until_queued(ctx, last).await?;
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/testonly/ut_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl UTHarness {
) -> (UTHarness, BlockStoreRunner) {
let rng = &mut ctx.rng();
let setup = validator::testonly::Setup::new(rng, num_validators);
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
let (send, recv) = ctx::channel::unbounded();

let cfg = Arc::new(Config {
Expand Down
8 changes: 4 additions & 4 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ async fn run_twins(
LeaderSelectionMode::Rota(scenario.rounds.iter().map(|rc| rc.leader.clone()).collect());

// Generate a new setup with this leadership schedule.
let setup = Setup::from(spec.clone());
let setup = Setup::from_spec(rng, spec.clone());

// Create a network with the partition schedule of the scenario.
let splits: PortSplitSchedule = scenario
Expand Down Expand Up @@ -424,7 +424,7 @@ async fn run_twins(
nodes: nodes.clone(),
blocks_to_finalize,
}
.run_with_config(ctx, nets.clone(), &setup.genesis)
.run_with_config(ctx, nets.clone(), &setup)
.await?
}

Expand Down Expand Up @@ -581,7 +581,7 @@ async fn run_with_custom_router(
.collect(),
);

let setup: Setup = spec.into();
let setup = Setup::from_spec(rng, spec);

let port_to_id = nets
.iter()
Expand All @@ -605,6 +605,6 @@ async fn run_with_custom_router(
nodes,
blocks_to_finalize,
}
.run_with_config(ctx, nets, &setup.genesis)
.run_with_config(ctx, nets, &setup)
.await
}
16 changes: 8 additions & 8 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn test_single_validator() {
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
let replica_store = in_memory::ReplicaStore::default();
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store.blocks.clone(), replica_store).run(ctx));
store
Expand All @@ -93,7 +93,7 @@ async fn test_many_validators() {
scope::run!(ctx, |ctx, s| async {
for cfg in cfgs {
let replica_store = in_memory::ReplicaStore::default();
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
s.spawn_bg(validator(&cfg, store.blocks.clone(), replica_store).run(ctx));

Expand Down Expand Up @@ -121,13 +121,13 @@ async fn test_inactive_validator() {
scope::run!(ctx, |ctx, s| async {
// Spawn validator.
let replica_store = in_memory::ReplicaStore::default();
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store.blocks.clone(), replica_store).run(ctx));

// Spawn a validator node, which doesn't belong to the consensus.
// Therefore it should behave just like a fullnode.
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let mut cfg = new_fullnode(rng, &cfgs[0]);
cfg.validator_key = Some(rng.gen());
Expand Down Expand Up @@ -156,12 +156,12 @@ async fn test_fullnode_syncing_from_validator() {
scope::run!(ctx, |ctx, s| async {
// Spawn validator.
let replica_store = in_memory::ReplicaStore::default();
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store.blocks.clone(), replica_store).run(ctx));

// Spawn full node.
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.blocks.clone()).run(ctx));

Expand All @@ -188,7 +188,7 @@ async fn test_validator_syncing_from_fullnode() {
scope::run!(ctx, |ctx, s| async {
// Run validator and produce some blocks.
let replica_store = in_memory::ReplicaStore::default();
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(
Expand Down Expand Up @@ -221,7 +221,7 @@ async fn test_validator_syncing_from_fullnode() {
let last_block = store.blocks.queued().last.as_ref().unwrap().number();
let store2 = TestMemoryStorage::new_store_with_first_block(
ctx,
&setup.genesis,
&setup,
setup.genesis.first_block + 2,
)
.await;
Expand Down
10 changes: 5 additions & 5 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async fn test_one_connection_per_validator() {
let nodes = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx,s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let nodes : Vec<_> = nodes.into_iter().enumerate().map(|(i,node)| {
let (node,runner) = testonly::Instance::new(node, store.blocks.clone());
Expand Down Expand Up @@ -156,7 +156,7 @@ async fn test_genesis_mismatch() {
let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?;

tracing::info!("Start one node, we will simulate the other one.");
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let (node, runner) = testonly::Instance::new(cfgs[0].clone(), store.blocks.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -222,7 +222,7 @@ async fn test_address_change() {
let setup = validator::testonly::Setup::new(rng, 5);
let mut cfgs = testonly::new_configs(rng, &setup, 1);
scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let mut nodes: Vec<_> = cfgs
.iter()
Expand Down Expand Up @@ -277,7 +277,7 @@ async fn test_transmission() {
let cfgs = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let mut nodes: Vec<_> = cfgs
.iter()
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn test_retransmission() {
let cfgs = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));

// Spawn the first node.
Expand Down
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/loadtest/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn test_loadtest() {

scope::run!(ctx, |ctx, s| async {
// Spawn the node.
let stores = TestMemoryStorage::new(ctx, &setup.genesis).await;
let stores = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(stores.runner.run(ctx));
let (node, runner) = testonly::Instance::new(cfg.clone(), stores.blocks.clone());
s.spawn_bg(runner.run(ctx));
Expand Down
11 changes: 5 additions & 6 deletions node/actors/network/src/gossip/tests/fetch_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn test_simple() {
cfg.validator_key = None;

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));

let (_node, runner) = crate::testonly::Instance::new(cfg.clone(), store.blocks.clone());
Expand Down Expand Up @@ -133,7 +133,7 @@ async fn test_concurrent_requests() {
cfg.max_block_queue_size = setup.blocks.len();

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let (_node, runner) = crate::testonly::Instance::new(cfg.clone(), store.blocks.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn test_bad_responses() {
cfg.validator_key = None;

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let (_node, runner) = crate::testonly::Instance::new(cfg.clone(), store.blocks.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -284,7 +284,7 @@ async fn test_retry() {
cfg.validator_key = None;

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let (_node, runner) = crate::testonly::Instance::new(cfg.clone(), store.blocks.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -349,8 +349,7 @@ async fn test_announce_truncated_block_range() {

scope::run!(ctx, |ctx, s| async {
// Build a custom persistent store, so that we can tweak it later.
let mut persistent =
in_memory::BlockStore::new(setup.genesis.clone(), setup.genesis.first_block);
let mut persistent = in_memory::BlockStore::new(&setup, setup.genesis.first_block);
let (block_store, runner) = BlockStore::new(ctx, Box::new(persistent.clone())).await?;
s.spawn_bg(runner.run(ctx));
let (_node, runner) = crate::testonly::Instance::new(cfg.clone(), block_store);
Expand Down
12 changes: 6 additions & 6 deletions node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn test_one_connection_per_node() {
let cfgs = testonly::new_configs(rng, &setup, 2);

scope::run!(ctx, |ctx,s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let mut nodes : Vec<_> = cfgs.iter().enumerate().map(|(i,cfg)| {
let (node,runner) = testonly::Instance::new(cfg.clone(), store.blocks.clone());
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn test_validator_addrs_propagation() {
let cfgs = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let nodes: Vec<_> = cfgs
.iter()
Expand Down Expand Up @@ -284,7 +284,7 @@ async fn test_genesis_mismatch() {
let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?;

tracing::info!("Start one node, we will simulate the other one.");
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
let (_node, runner) = testonly::Instance::new(cfgs[0].clone(), store.blocks);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -343,7 +343,7 @@ async fn validator_node_restart() {
for cfg in &mut cfgs {
cfg.rpc.push_validator_addrs_rate.refresh = time::Duration::ZERO;
}
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
let (node1, node1_runner) = testonly::Instance::new(cfgs[1].clone(), store.blocks.clone());
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(store.runner.run(ctx));
Expand Down Expand Up @@ -426,7 +426,7 @@ async fn rate_limiting() {
}
let mut nodes = vec![];
scope::run!(ctx, |ctx, s| async {
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));
// Spawn the satellite nodes and wait until they register
// their own address.
Expand Down Expand Up @@ -511,7 +511,7 @@ async fn test_batch_votes_propagation() {

scope::run!(ctx, |ctx, s| async {
// All nodes share the same store - store is not used in this test anyway.
let store = TestMemoryStorage::new(ctx, &setup.genesis).await;
let store = TestMemoryStorage::new(ctx, &setup).await;
s.spawn_bg(store.runner.run(ctx));

// Start all nodes.
Expand Down
Loading

0 comments on commit ae5b6b4

Please sign in to comment.