From 5329a809cfc06d4939fb5ece26c9ad1e1741c50a Mon Sep 17 00:00:00 2001 From: pompon0 Date: Mon, 18 Mar 2024 14:06:47 +0100 Subject: [PATCH] support for running a node from a state snapshot (BFT-418) (#75) Support for a situation in which a node is started with a blockchain state at block X > 0. Such a node doesn't want to fetch and store blocks { let fork = &cfg.genesis().fork; - let (parent, number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (fork.first_parent, fork.first_block), + let number = match high_qc { + Some(qc) => qc.header().number.next(), + None => fork.first_block, }; // Defensively assume that PayloadManager cannot propose until the previous block is stored. if let Some(prev) = number.prev() { @@ -189,7 +189,6 @@ impl StateMachine { .observe(payload.0.len()); let proposal = validator::BlockHeader { number, - parent, payload: payload.hash(), }; (proposal, Some(payload)) diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index b74e4411..9974c5c8 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -37,13 +37,6 @@ async fn replica_prepare_sanity_yield_leader_prepare() { .unwrap() .unwrap(); assert_eq!(leader_prepare.msg.view(), &replica_prepare.view); - assert_eq!( - leader_prepare.msg.proposal.parent, - replica_prepare - .high_vote - .as_ref() - .map(|v| v.proposal.hash()), - ); assert_eq!( leader_prepare.msg.justification, util.new_prepare_qc(|msg| *msg = replica_prepare) diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index eb00cd9f..20ca6f3b 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -39,7 +39,7 @@ impl StateMachine { tracing::info!( "Finalized block {}: {:#?}", block.header().number, - block.header().hash(), + block.header().payload, ); self.config .block_store diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/leader_prepare.rs index 7e017bcb..daa57825 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/leader_prepare.rs @@ -42,8 +42,9 @@ pub(crate) enum Error { /// Invalid message. #[error("invalid message: {0:#}")] InvalidMessage(#[source] validator::LeaderPrepareVerifyError), - /// Previous proposal was not finalized. - + /// Leader proposed a block that was already pruned from replica's storage. + #[error("leader proposed a block that was already pruned from replica's storage")] + ProposalAlreadyPruned, /// Oversized payload. #[error("block proposal with an oversized payload (payload size: {payload_size})")] ProposalOversizedPayload { @@ -110,6 +111,14 @@ impl StateMachine { }); } + // Replica MUSTN'T vote for blocks which have been already pruned for storage. + // (because it won't be able to persist and broadcast them once finalized). + // TODO(gprusak): it should never happen, we should add safety checks to prevent + // pruning blocks not known to be finalized. + if message.proposal.number < self.config.block_store.subscribe().borrow().first { + return Err(Error::ProposalAlreadyPruned); + } + // ----------- Checking the message -------------- signed_message.verify().map_err(Error::InvalidSignature)?; diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 6f70568d..c1b5be08 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -7,7 +7,7 @@ use assert_matches::assert_matches; use rand::Rng; use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::{ - self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber, + self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, }; /// Sanity check of the happy path. @@ -101,10 +101,6 @@ async fn leader_prepare_invalid_leader() { let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - let view = ViewNumber(2); - util.set_view(view); - assert_eq!(util.view_leader(view), util.keys[0].public()); - let replica_prepare = util.new_replica_prepare(); assert!(util .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) @@ -167,6 +163,35 @@ async fn leader_prepare_old_view() { .unwrap(); } +#[tokio::test] +async fn leader_prepare_pruned_block() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut leader_prepare = util.new_leader_prepare(ctx).await; + // We assume default replica state and nontrivial `genesis.fork.first_block` here. + leader_prepare.proposal.number = util + .replica + .config + .block_store + .subscribe() + .borrow() + .first + .prev() + .unwrap(); + let res = util + .process_leader_prepare(ctx, util.sign(leader_prepare)) + .await; + assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned)); + Ok(()) + }) + .await + .unwrap(); +} + /// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. #[tokio::test] async fn leader_prepare_invalid_payload() { @@ -338,33 +363,6 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { .unwrap(); } -#[tokio::test] -async fn leader_prepare_bad_parent_hash() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Produce initial block."); - util.produce_block(ctx).await; - tracing::info!("Make leader propose the next block."); - let mut leader_prepare = util.new_leader_prepare(ctx).await; - tracing::info!("Modify the proposal.parent so that it doesn't match the previous block"); - leader_prepare.proposal.parent = Some(ctx.rng().gen()); - let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await; - assert_matches!(res, Err(leader_prepare::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::BadParentHash { got, want } - )) => { - assert_eq!(want, Some(leader_prepare.justification.high_qc().unwrap().message.proposal.hash())); - assert_eq!(got, leader_prepare.proposal.parent); - }); - Ok(()) - }) - .await - .unwrap(); -} - #[tokio::test] async fn leader_prepare_bad_block_number() { zksync_concurrency::testonly::abort_on_panic(); diff --git a/node/actors/bft/src/testonly/fuzz.rs b/node/actors/bft/src/testonly/fuzz.rs index 077be553..fd0cdd4a 100644 --- a/node/actors/bft/src/testonly/fuzz.rs +++ b/node/actors/bft/src/testonly/fuzz.rs @@ -174,11 +174,9 @@ impl Fuzz for validator::Payload { impl Fuzz for validator::BlockHeader { fn mutate(&mut self, rng: &mut impl Rng) { - match rng.gen_range(0..3) { - 0 => self.parent = rng.gen(), - 1 => self.number = rng.gen(), - 2 => self.payload = rng.gen(), - _ => unreachable!(), + match rng.gen_range(0..2) { + 0 => self.number = rng.gen(), + _ => self.payload = rng.gen(), } } } diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 8880788c..29590cce 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -137,19 +137,6 @@ impl UTHarness { self.replica.view = view; } - pub(crate) fn set_view(&mut self, view: ViewNumber) { - self.set_replica_view(view); - self.set_leader_view(view); - } - - pub(crate) fn set_leader_view(&mut self, view: ViewNumber) { - self.leader.view = view - } - - pub(crate) fn set_replica_view(&mut self, view: ViewNumber) { - self.replica.view = view - } - pub(crate) fn replica_view(&self) -> validator::View { validator::View { protocol_version: self.protocol_version(), diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index f95b6641..2b34e7a0 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -5,32 +5,48 @@ use zksync_consensus_bft as bft; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ - testonly::{in_memory, new_store}, + testonly::{in_memory, new_store, new_store_with_first}, BlockStore, }; -fn make_executor(cfg: &network::Config, block_store: Arc) -> Executor { +fn config(cfg: &network::Config) -> Config { + Config { + server_addr: *cfg.server_addr, + public_addr: cfg.public_addr, + max_payload_size: usize::MAX, + node_key: cfg.gossip.key.clone(), + gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, + gossip_static_inbound: cfg.gossip.static_inbound.clone(), + gossip_static_outbound: cfg.gossip.static_outbound.clone(), + } +} + +fn validator( + cfg: &network::Config, + block_store: Arc, + replica_store: impl ReplicaStore, +) -> Executor { Executor { - config: Config { - server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, - max_payload_size: usize::MAX, - node_key: cfg.gossip.key.clone(), - gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, - gossip_static_inbound: cfg.gossip.static_inbound.clone(), - gossip_static_outbound: cfg.gossip.static_outbound.clone(), - }, + config: config(cfg), block_store, - validator: cfg.validator_key.as_ref().map(|key| Validator { - key: key.clone(), - replica_store: Box::new(in_memory::ReplicaStore::default()), + validator: Some(Validator { + key: cfg.validator_key.clone().unwrap(), + replica_store: Box::new(replica_store), payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), } } +fn fullnode(cfg: &network::Config, block_store: Arc) -> Executor { + Executor { + config: config(cfg), + block_store, + validator: None, + } +} + #[tokio::test] -async fn executing_single_validator() { +async fn test_single_validator() { abort_on_panic(); let ctx = &ctx::root(); let rng = &mut ctx.rng(); @@ -38,9 +54,10 @@ async fn executing_single_validator() { let setup = Setup::new(rng, 1); let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store.clone()).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store.clone(), replica_store).run(ctx)); store.wait_until_persisted(ctx, BlockNumber(5)).await?; Ok(()) }) @@ -49,7 +66,7 @@ async fn executing_single_validator() { } #[tokio::test] -async fn executing_validator_and_full_node() { +async fn test_fullnode_syncing_from_validator() { abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); let rng = &mut ctx.rng(); @@ -58,17 +75,73 @@ async fn executing_validator_and_full_node() { let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { // Spawn validator. + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); // Spawn full node. let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); // Wait for blocks in full node store. - store.wait_until_persisted(ctx, BlockNumber(5)).await?; + store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 5) + .await?; + Ok(()) + }) + .await + .unwrap(); +} + +/// Test in which validator is syncing missing blocks from a full node before producing blocks. +#[tokio::test] +async fn test_validator_syncing_from_fullnode() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); + let rng = &mut ctx.rng(); + + let setup = Setup::new(rng, 1); + let cfgs = new_configs(rng, &setup, 0); + scope::run!(ctx, |ctx, s| async { + // Spawn full node. + let (node_store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), node_store.clone()).run(ctx)); + + // Run validator and produce some blocks. + // Wait for the blocks to be fetched by the full node. + let replica_store = in_memory::ReplicaStore::default(); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store.clone()).run(ctx)); + node_store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 4) + .await?; + Ok(()) + }) + .await + .unwrap(); + + // Restart the validator with empty store (but preserved replica state) and non-trivial + // `store.state.first`. + // Validator should fetch the past blocks from the full node before producing next blocks. + let last_block = node_store + .subscribe() + .borrow() + .last + .as_ref() + .unwrap() + .header() + .number; + let (store, runner) = + new_store_with_first(ctx, &setup.genesis, setup.genesis.fork.first_block + 2).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); + node_store.wait_until_persisted(ctx, last_block + 3).await?; + Ok(()) }) .await diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index f57ea8bd..bc589ccb 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -63,17 +63,14 @@ impl PeerStates { /// Updates the known `BlockStore` state of the given peer. /// This information is used to decide from which peer to fetch - /// a given block from. + /// a given block. pub(crate) fn update( &self, peer: &node::PublicKey, state: BlockStoreState, ) -> anyhow::Result<()> { use std::collections::hash_map::Entry; - let Some(last) = &state.last else { - return Ok(()); - }; - last.verify(self.genesis()).context("state.last.verify()")?; + state.verify(self.genesis()).context("state.verify()")?; let mut peers = self.peers.lock().unwrap(); match peers.entry(peer.clone()) { Entry::Occupied(mut e) => e.get_mut().state = state.clone(), @@ -85,14 +82,16 @@ impl PeerStates { }); } } - self.highest_peer_block - .send_if_modified(|highest_peer_block| { - if *highest_peer_block >= last.header().number { - return false; - } - *highest_peer_block = last.header().number; - true - }); + if let Some(last) = &state.last { + self.highest_peer_block + .send_if_modified(|highest_peer_block| { + if *highest_peer_block >= last.header().number { + return false; + } + *highest_peer_block = last.header().number; + true + }); + } Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests/mod.rs b/node/actors/sync_blocks/src/peers/tests/mod.rs index d6c9d66d..ee28e436 100644 --- a/node/actors/sync_blocks/src/peers/tests/mod.rs +++ b/node/actors/sync_blocks/src/peers/tests/mod.rs @@ -98,3 +98,62 @@ async fn test_peer_states(test: T) { .await .unwrap(); } + +#[tokio::test] +async fn test_try_acquire_peer_permit() { + let clock = ctx::ManualClock::new(); + let ctx = &ctx::test_root(&clock); + let rng = &mut ctx.rng(); + let mut setup = validator::testonly::Setup::new(rng, 1); + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + let (send, _recv) = ctx::channel::unbounded(); + let peer_states = PeerStates::new(Config::default(), store, send); + + let peer: node::PublicKey = rng.gen(); + let b = &setup.blocks; + for s in [ + // Empty entry. + BlockStoreState { + first: b[0].number(), + last: None, + }, + // Entry with some blocks. + BlockStoreState { + first: b[0].number(), + last: Some(b[3].justification.clone()), + }, + // Entry with changed first. + BlockStoreState { + first: b[1].number(), + last: Some(b[3].justification.clone()), + }, + // Empty entry again. + BlockStoreState { + first: b[1].number(), + last: None, + }, + ] { + peer_states.update(&peer, s.clone()).unwrap(); + for block in b { + let got = peer_states + .try_acquire_peer_permit(block.number()) + .map(|p| p.0); + if s.first <= block.number() + && s.last + .as_ref() + .map_or(false, |last| block.number() <= last.header().number) + { + assert_eq!(Some(peer.clone()), got); + } else { + assert_eq!(None, got); + } + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/sync_blocks/src/peers/tests/snapshots.rs b/node/actors/sync_blocks/src/peers/tests/snapshots.rs deleted file mode 100644 index bdb7116d..00000000 --- a/node/actors/sync_blocks/src/peers/tests/snapshots.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Tests related to snapshot storage. - -use super::*; -use crate::tests::{send_block, sync_state}; -use zksync_consensus_network::io::GetBlockError; - -#[tokio::test] -async fn backfilling_peer_history() { - test_peer_states(BackfillingPeerHistory).await; -} diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 018b99ee..b078a804 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -13,7 +13,7 @@ use zksync_concurrency::{ testonly::{abort_on_panic, set_timeout}, }; use zksync_consensus_network as network; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_storage::testonly::new_store_with_first; type NetworkDispatcherPipe = pipe::DispatcherPipe; @@ -27,7 +27,16 @@ struct Node { impl Node { async fn new(ctx: &ctx::Ctx, network: network::Config, setup: &Setup) -> (Self, NodeRunner) { - let (store, store_runner) = new_store(ctx, &setup.genesis).await; + Self::new_with_first(ctx, network, setup, setup.genesis.fork.first_block).await + } + + async fn new_with_first( + ctx: &ctx::Ctx, + network: network::Config, + setup: &Setup, + first: validator::BlockNumber, + ) -> (Self, NodeRunner) { + let (store, store_runner) = new_store_with_first(ctx, &setup.genesis, first).await; let (start_send, start_recv) = channel::bounded(1); let (terminate_send, terminate_recv) = channel::bounded(1); @@ -331,3 +340,54 @@ impl GossipNetworkTest for SwitchingOnNodes { async fn switching_on_nodes(node_count: usize) { test_sync_blocks(SwitchingOnNodes { node_count }).await; } + +/// Test checking that nodes with different first block can synchronize. +#[tokio::test(flavor = "multi_thread")] +async fn test_different_first_block() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(25.)); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 2); + let n = 4; + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let mut nodes = vec![]; + // Spawn `n` nodes, all connected to each other. + for (i, net) in network::testonly::new_configs(rng, &setup, n) + .into_iter() + .enumerate() + { + // Choose the first block for the node at random. + let first = setup.blocks.choose(rng).unwrap().number(); + let (node, runner) = Node::new_with_first(ctx, net, &setup, first).await; + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + node.start(); + nodes.push(node); + } + // Randomize the order of nodes. + nodes.shuffle(rng); + + for block in &setup.blocks { + // Find nodes interested in the next block. + let interested_nodes: Vec<_> = nodes + .iter() + .filter(|n| n.store.subscribe().borrow().first <= block.number()) + .collect(); + // Store this block to one of them. + if let Some(node) = interested_nodes.choose(rng) { + node.store.queue_block(ctx, block.clone()).await.unwrap(); + } + // Wait until all remaining nodes get the new block. + for node in interested_nodes { + node.store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/libs/concurrency/src/limiter/mod.rs b/node/libs/concurrency/src/limiter/mod.rs index 7e54bd0c..9f609b26 100644 --- a/node/libs/concurrency/src/limiter/mod.rs +++ b/node/libs/concurrency/src/limiter/mod.rs @@ -1,6 +1,6 @@ //! Rate limiter which supports delayed permit consumption. use crate::{ctx, sync, time}; -use std::sync::Mutex; +use std::{fmt, sync::Mutex}; #[cfg(test)] mod tests; @@ -102,6 +102,12 @@ pub struct Limiter { acquire: sync::Mutex>, } +impl fmt::Debug for Limiter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Limiter").finish_non_exhaustive() + } +} + /// Permit reservation returned by `Limit::acquire()`. /// Represents a set of reserved permits. /// You need to drop it for the permits to get consumed diff --git a/node/libs/crypto/src/bn254/testonly.rs b/node/libs/crypto/src/bn254/testonly.rs index 8ee10d18..711f3a14 100644 --- a/node/libs/crypto/src/bn254/testonly.rs +++ b/node/libs/crypto/src/bn254/testonly.rs @@ -1,7 +1,7 @@ -//! Random key generation, intended for use in testing +//! Random key generation, intended for use in testing. use super::{AggregateSignature, PublicKey, SecretKey, Signature}; -use pairing::bn256::{Fr, G1, G2}; +use pairing::bn256::{Fr, G1}; use rand::{distributions::Standard, prelude::Distribution, Rng, RngCore}; use rand04::Rand; @@ -17,7 +17,6 @@ impl rand04::Rng for RngWrapper { } } -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let scalar = Fr::rand(&mut RngWrapper(rng)); @@ -25,15 +24,12 @@ impl Distribution for Standard { } } -/// Generates a random PublicKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> PublicKey { - let p = G2::rand(&mut RngWrapper(rng)); - PublicKey(p) + rng.gen::().public() } } -/// Generates a random Signature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let p = G1::rand(&mut RngWrapper(rng)); @@ -41,7 +37,6 @@ impl Distribution for Standard { } } -/// Generates a random AggregateSignature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { let p = G1::rand(&mut RngWrapper(rng)); diff --git a/node/libs/crypto/src/ed25519/testonly.rs b/node/libs/crypto/src/ed25519/testonly.rs index ecca0148..6b8d800f 100644 --- a/node/libs/crypto/src/ed25519/testonly.rs +++ b/node/libs/crypto/src/ed25519/testonly.rs @@ -1,6 +1,6 @@ //! Random key generation, intended for use in testing -use super::{SecretKey, Signature}; +use super::{PublicKey, SecretKey, Signature}; use crate::ByteFmt; use ed25519_dalek as ed; use rand::{ @@ -8,7 +8,6 @@ use rand::{ Rng, }; -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let raw: [u8; ed::SECRET_KEY_LENGTH] = rng.gen(); @@ -16,7 +15,12 @@ impl Distribution for Standard { } } -/// Generates a random Signature. This is meant for testing purposes. +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + rng.gen::().public() + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let key = rng.gen::(); diff --git a/node/libs/roles/src/node/testonly.rs b/node/libs/roles/src/node/testonly.rs index 1d712ebb..c8efd30f 100644 --- a/node/libs/roles/src/node/testonly.rs +++ b/node/libs/roles/src/node/testonly.rs @@ -1,4 +1,4 @@ -use super::{Msg, MsgHash, SecretKey, SessionId, Signature, Signed}; +use super::{Msg, MsgHash, PublicKey, SecretKey, SessionId, Signature, Signed}; use rand::{ distributions::{Distribution, Standard}, Rng, @@ -27,6 +27,12 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + PublicKey(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { SecretKey(Arc::new(rng.gen())) diff --git a/node/libs/roles/src/proto/validator.proto b/node/libs/roles/src/proto/validator.proto index 025e7beb..2b1d6329 100644 --- a/node/libs/roles/src/proto/validator.proto +++ b/node/libs/roles/src/proto/validator.proto @@ -7,7 +7,6 @@ import "zksync/std.proto"; message Fork { optional uint64 number = 1; // required; ForkId optional uint64 first_block = 2; // required; BlockNumber - optional BlockHeaderHash first_parent = 3; // optional } message Genesis { @@ -23,17 +22,11 @@ message PayloadHash { optional bytes keccak256 = 1; // required } -message BlockHeaderHash { - optional bytes keccak256 = 1; // required -} - message BlockHeader { - // Hash of the parent Block. - optional BlockHeaderHash parent = 2; // optional // Sequential number of the block = parent.number + 1. - optional uint64 number = 3; // required + optional uint64 number = 1; // required // Hash of the block payload. - optional PayloadHash payload = 4; // required + optional PayloadHash payload = 2; // required } message FinalBlock { diff --git a/node/libs/roles/src/validator/conv.rs b/node/libs/roles/src/validator/conv.rs index 04886834..9e616320 100644 --- a/node/libs/roles/src/validator/conv.rs +++ b/node/libs/roles/src/validator/conv.rs @@ -1,7 +1,7 @@ use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use crate::{node::SessionId, proto::validator as proto}; @@ -17,14 +17,12 @@ impl ProtoFmt for Fork { Ok(Self { number: ForkNumber(*required(&r.number).context("number")?), first_block: BlockNumber(*required(&r.first_block).context("first_block")?), - first_parent: read_optional(&r.first_parent).context("first_parent")?, }) } fn build(&self) -> Self::Proto { Self::Proto { number: Some(self.number.0), first_block: Some(self.first_block.0), - first_parent: self.first_parent.as_ref().map(|x| x.build()), } } } @@ -64,18 +62,6 @@ impl ProtoFmt for GenesisHash { } } -impl ProtoFmt for BlockHeaderHash { - type Proto = proto::BlockHeaderHash; - fn read(r: &Self::Proto) -> anyhow::Result { - Ok(Self(ByteFmt::decode(required(&r.keccak256)?)?)) - } - fn build(&self) -> Self::Proto { - Self::Proto { - keccak256: Some(self.0.encode()), - } - } -} - impl ProtoFmt for PayloadHash { type Proto = proto::PayloadHash; fn read(r: &Self::Proto) -> anyhow::Result { @@ -92,14 +78,12 @@ impl ProtoFmt for BlockHeader { type Proto = proto::BlockHeader; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - parent: read_optional(&r.parent).context("parent")?, number: BlockNumber(*required(&r.number).context("number")?), payload: read_required(&r.payload).context("payload")?, }) } fn build(&self) -> Self::Proto { Self::Proto { - parent: self.parent.as_ref().map(ProtoFmt::build), number: Some(self.number.0), payload: Some(self.payload.build()), } diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 261035e5..43d49ac3 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -52,15 +52,13 @@ impl Payload { } /// Sequential number of the block. -/// Genesis block can have an arbitrary block number. -/// For blocks other than genesis: block.number = block.parent.number + 1. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockNumber(pub u64); impl BlockNumber { /// Returns the next block number. pub fn next(self) -> Self { - Self(self.0 + 1) + Self(self.0.checked_add(1).unwrap()) } /// Returns the previous block number. @@ -69,77 +67,28 @@ impl BlockNumber { } } -impl fmt::Display for BlockNumber { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, formatter) - } -} - -/// Hash of the block header. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct BlockHeaderHash(pub(crate) Keccak256); - -impl BlockHeaderHash { - /// Interprets the specified `bytes` as a block header hash digest (i.e., a reverse operation to [`Self::as_bytes()`]). - /// It is caller's responsibility to ensure that `bytes` are actually a block header hash digest. - pub fn from_bytes(bytes: [u8; 32]) -> Self { - Self(Keccak256::from_bytes(bytes)) - } - - /// Returns a reference to the bytes of this hash. - pub fn as_bytes(&self) -> &[u8; 32] { - self.0.as_bytes() - } -} - -impl TextFmt for BlockHeaderHash { - fn decode(text: Text) -> anyhow::Result { - text.strip("block_header_hash:keccak256:")? - .decode_hex() - .map(Self) - } - - fn encode(&self) -> String { - format!( - "block_header_hash:keccak256:{}", - hex::encode(ByteFmt::encode(&self.0)) - ) +impl std::ops::Add for BlockNumber { + type Output = BlockNumber; + fn add(self, n: u64) -> Self { + Self(self.0.checked_add(n).unwrap()) } } -impl fmt::Debug for BlockHeaderHash { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.write_str(&TextFmt::encode(self)) +impl fmt::Display for BlockNumber { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, formatter) } } /// A block header. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockHeader { - /// Hash of the parent block. - pub parent: Option, /// Number of the block. pub number: BlockNumber, /// Payload of the block. pub payload: PayloadHash, } -impl BlockHeader { - /// Returns the hash of the block. - pub fn hash(&self) -> BlockHeaderHash { - BlockHeaderHash(Keccak256::new(&zksync_protobuf::canonical(self))) - } - - /// Creates a child block for the given parent. - pub fn next(parent: &BlockHeader, payload: PayloadHash) -> Self { - Self { - parent: Some(parent.hash()), - number: parent.number.next(), - payload, - } - } -} - /// A block that has been finalized by the consensus protocol. #[derive(Clone, Debug, PartialEq, Eq)] pub struct FinalBlock { @@ -194,16 +143,6 @@ impl ByteFmt for FinalBlock { } } -impl TextFmt for FinalBlock { - fn decode(text: Text) -> anyhow::Result { - text.strip("final_block:")?.decode_hex() - } - - fn encode(&self) -> String { - format!("final_block:{}", hex::encode(ByteFmt::encode(self))) - } -} - /// Errors that can occur validating a `FinalBlock` received from a node. #[derive(Debug, thiserror::Error)] #[non_exhaustive] diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 870355b0..4261443e 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,7 +1,5 @@ //! Messages related to the consensus protocol. -use super::{ - BlockHeaderHash, BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare, -}; +use super::{BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare}; use crate::validator; use bit_vec::BitVec; use std::{ @@ -62,8 +60,6 @@ pub struct Fork { pub number: ForkNumber, /// First block of a fork. pub first_block: BlockNumber, - /// Parent fo the first block of a fork. - pub first_parent: Option, } impl Default for Fork { @@ -71,7 +67,6 @@ impl Default for Fork { Self { number: ForkNumber(0), first_block: BlockNumber(0), - first_parent: None, } } } diff --git a/node/libs/roles/src/validator/messages/leader_prepare.rs b/node/libs/roles/src/validator/messages/leader_prepare.rs index 15580bb3..37d07e14 100644 --- a/node/libs/roles/src/validator/messages/leader_prepare.rs +++ b/node/libs/roles/src/validator/messages/leader_prepare.rs @@ -1,5 +1,5 @@ use super::{ - BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, + BlockHeader, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, ReplicaPrepareVerifyError, Signed, Signers, View, }; use crate::validator; @@ -177,14 +177,6 @@ pub enum LeaderPrepareVerifyError { /// Received proposal number. got: BlockNumber, }, - /// Bad parent hash. - #[error("bad parent hash: got {got:?}, want {want:?}")] - BadParentHash { - /// Correct parent hash. - want: Option, - /// Received parent hash. - got: Option, - }, /// New block proposal when the previous proposal was not finalized. #[error("new block proposal when the previous proposal was not finalized")] ProposalWhenPreviousNotFinalized, @@ -231,16 +223,10 @@ impl LeaderPrepare { { return Err(Error::ProposalWhenPreviousNotFinalized); } - let (want_parent, want_number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (genesis.fork.first_parent, genesis.fork.first_block), + let want_number = match high_qc { + Some(qc) => qc.header().number.next(), + None => genesis.fork.first_block, }; - if self.proposal.parent != want_parent { - return Err(Error::BadParentHash { - got: self.proposal.parent, - want: want_parent, - }); - } if self.proposal.number != want_number { return Err(Error::BadBlockNumber { got: self.proposal.number, diff --git a/node/libs/roles/src/validator/messages/replica_commit.rs b/node/libs/roles/src/validator/messages/replica_commit.rs index 963f8f39..d08a7069 100644 --- a/node/libs/roles/src/validator/messages/replica_commit.rs +++ b/node/libs/roles/src/validator/messages/replica_commit.rs @@ -14,12 +14,6 @@ impl ReplicaCommit { pub fn verify(&self, genesis: &Genesis) -> anyhow::Result<()> { anyhow::ensure!(self.view.fork == genesis.fork.number); anyhow::ensure!(self.proposal.number >= genesis.fork.first_block); - if self.proposal.number == genesis.fork.first_block { - anyhow::ensure!( - self.proposal.parent == genesis.fork.first_parent, - "bad parent of the first block of the fork" - ); - } Ok(()) } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index 27290f29..4bc7db79 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -1,8 +1,8 @@ //! Test-only utilities. use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use bit_vec::BitVec; @@ -38,7 +38,6 @@ impl Setup { let fork = Fork { number: ForkNumber(rng.gen_range(0..100)), first_block: BlockNumber(rng.gen_range(0..100)), - first_parent: Some(rng.gen()), }; Self::new_with_fork(rng, validators, fork) } @@ -64,9 +63,11 @@ impl Setup { .unwrap_or(ViewNumber(0)), }; let proposal = match self.0.blocks.last() { - Some(b) => BlockHeader::next(b.header(), payload.hash()), + Some(b) => BlockHeader { + number: b.number().next(), + payload: payload.hash(), + }, None => BlockHeader { - parent: self.genesis.fork.first_parent, number: self.genesis.fork.first_block, payload: payload.hash(), }, @@ -181,7 +182,6 @@ impl Distribution for Standard { Fork { number: rng.gen(), first_block: rng.gen(), - first_parent: Some(rng.gen()), } } } @@ -201,16 +201,9 @@ impl Distribution for Standard { } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockHeaderHash { - BlockHeaderHash(rng.gen()) - } -} - impl Distribution for Standard { fn sample(&self, rng: &mut R) -> BlockHeader { BlockHeader { - parent: rng.gen(), number: rng.gen(), payload: rng.gen(), } diff --git a/node/libs/roles/src/validator/tests.rs b/node/libs/roles/src/validator/tests.rs index 5f38b3d8..9affb1e0 100644 --- a/node/libs/roles/src/validator/tests.rs +++ b/node/libs/roles/src/validator/tests.rs @@ -72,17 +72,6 @@ fn test_text_encoding() { Text::new(&t).decode::().unwrap() ); - let block_header_hash: BlockHeaderHash = rng.gen(); - let t = TextFmt::encode(&block_header_hash); - assert_eq!( - block_header_hash, - Text::new(&t).decode::().unwrap() - ); - - let final_block: FinalBlock = rng.gen(); - let t = TextFmt::encode(&final_block); - assert_eq!(final_block, Text::new(&t).decode::().unwrap()); - let msg_hash: MsgHash = rng.gen(); let t = TextFmt::encode(&msg_hash); assert_eq!(msg_hash, Text::new(&t).decode::().unwrap()); @@ -98,7 +87,6 @@ fn test_schema_encoding() { let rng = &mut ctx.rng(); test_encode_random::(rng); test_encode_random::(rng); - test_encode_random::(rng); test_encode_random::(rng); test_encode_random::>(rng); test_encode_random::(rng); diff --git a/node/libs/storage/src/block_store/metrics.rs b/node/libs/storage/src/block_store/metrics.rs index 70447ad7..05b64133 100644 --- a/node/libs/storage/src/block_store/metrics.rs +++ b/node/libs/storage/src/block_store/metrics.rs @@ -7,9 +7,9 @@ pub(super) struct PersistentBlockStore { /// Latency of a successful `genesis()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) genesis_latency: vise::Histogram, - /// Latency of a successful `last()` call. + /// Latency of a successful `state()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] - pub(super) last_latency: vise::Histogram, + pub(super) state_latency: vise::Histogram, /// Latency of a successful `block()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) block_latency: vise::Histogram, diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index 456bc2e1..e769aecf 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -10,7 +10,7 @@ mod metrics; #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockStoreState { /// Stored block with the lowest number. - /// Currently always same as `genesis.first_block`. + /// If last is `None`, this is the first block that should be fetched. pub first: validator::BlockNumber, /// Stored block with the highest number. /// None iff store is empty. @@ -32,21 +32,41 @@ impl BlockStoreState { None => self.first, } } + + /// Verifies `BlockStoreState'. + pub fn verify(&self, genesis: &validator::Genesis) -> anyhow::Result<()> { + anyhow::ensure!( + genesis.fork.first_block <= self.first, + "first block ({}) doesn't belong to the fork (which starts at block {})", + self.first, + genesis.fork.first_block + ); + if let Some(last) = &self.last { + anyhow::ensure!( + self.first <= last.header().number, + "first block {} has bigger number than the last block {}", + self.first, + last.header().number + ); + last.verify(genesis).context("last.verify()")?; + } + Ok(()) + } } /// Storage of a continuous range of L2 blocks. /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait PersistentBlockStore: fmt::Debug + Send + Sync { +pub trait PersistentBlockStore: 'static + fmt::Debug + Send + Sync { /// Genesis matching the block store content. /// Consensus code calls this method only once. async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result; - /// Last block available in storage. + /// Range of blocks available in storage. /// Consensus code calls this method only once and then tracks the /// range of available blocks internally. - async fn last(&self, ctx: &ctx::Ctx) -> ctx::Result>; + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; /// Gets a block by its number. /// Returns error if block is missing. @@ -115,11 +135,11 @@ impl BlockStoreRunner { tracing::info!( "stored block #{}: {:#?}", block.header().number, - block.header().hash() + block.header().payload ); self.0.inner.send_modify(|inner| { - debug_assert_eq!(inner.persisted_state.next(), block.header().number); + debug_assert_eq!(inner.persisted_state.next(), block.number()); inner.persisted_state.last = Some(block.justification.clone()); inner.queue.pop_front(); }); @@ -145,16 +165,10 @@ impl BlockStore { let t = metrics::PERSISTENT_BLOCK_STORE.genesis_latency.start(); let genesis = persistent.genesis(ctx).await.wrap("persistent.genesis()")?; t.observe(); - let t = metrics::PERSISTENT_BLOCK_STORE.last_latency.start(); - let last = persistent.last(ctx).await.wrap("persistent.last()")?; + let t = metrics::PERSISTENT_BLOCK_STORE.state_latency.start(); + let state = persistent.state(ctx).await.wrap("persistent.state()")?; t.observe(); - if let Some(last) = &last { - last.verify(&genesis).context("last.verify()")?; - } - let state = BlockStoreState { - first: genesis.fork.first_block, - last, - }; + state.verify(&genesis).context("state.verify()")?; let this = Arc::new(Self { inner: sync::watch::channel(Inner { queued_state: sync::watch::channel(state.clone()).0, @@ -165,12 +179,6 @@ impl BlockStore { genesis, persistent, }); - // Verify the first block. - if let Some(block) = this.block(ctx, this.genesis.fork.first_block).await? { - block - .verify(&this.genesis) - .with_context(|| format!("verify({:?})", this.genesis.fork.first_block))?; - } Ok((this.clone(), BlockStoreRunner(this))) } @@ -227,17 +235,6 @@ impl BlockStore { return Ok(()); } block.verify(&self.genesis).context("block.verify()")?; - // Verify parent hash, if previous block is available. - if let Some(last) = queued_state.last.as_ref() { - if Some(last.header().hash()) != block.header().parent { - return Err(anyhow::format_err!( - "block.parent = {:?}, want {:?}", - block.header().parent, - last.header().hash() - ) - .into()); - } - } } self.inner.send_if_modified(|inner| { let modified = inner.queued_state.send_if_modified(|queued_state| { @@ -258,6 +255,7 @@ impl BlockStore { } /// Waits until the given block is queued to be stored. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -271,6 +269,7 @@ impl BlockStore { } /// Waits until the given block is stored persistently. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs deleted file mode 100644 index 634118dc..00000000 --- a/node/libs/storage/src/replica_state.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! `FallbackReplicaStateStore` type. - - diff --git a/node/libs/storage/src/replica_store.rs b/node/libs/storage/src/replica_store.rs index 0eaa96fd..465d26d6 100644 --- a/node/libs/storage/src/replica_store.rs +++ b/node/libs/storage/src/replica_store.rs @@ -10,7 +10,7 @@ use zksync_protobuf::{read_optional, read_required, required, ProtoFmt}; /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait ReplicaStore: fmt::Debug + Send + Sync { +pub trait ReplicaStore: 'static + fmt::Debug + Send + Sync { /// Gets the replica state, if it is contained in the database. async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 209d046c..1fd7398c 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,5 +1,5 @@ //! In-memory storage implementation. -use crate::{PersistentBlockStore, ReplicaState}; +use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; use anyhow::Context as _; use std::{ collections::VecDeque, @@ -10,6 +10,7 @@ use zksync_consensus_roles::validator; #[derive(Debug)] struct BlockStoreInner { + first: validator::BlockNumber, genesis: validator::Genesis, blocks: Mutex>, } @@ -24,8 +25,10 @@ pub struct ReplicaStore(Arc>); impl BlockStore { /// New In-memory `BlockStore`. - pub fn new(genesis: validator::Genesis) -> Self { + pub fn new(genesis: validator::Genesis, first: validator::BlockNumber) -> Self { + assert!(genesis.fork.first_block <= first); Self(Arc::new(BlockStoreInner { + first, genesis, blocks: Mutex::default(), })) @@ -38,14 +41,17 @@ impl PersistentBlockStore for BlockStore { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(self - .0 - .blocks - .lock() - .unwrap() - .back() - .map(|b| b.justification.clone())) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + first: self.0.first, + last: self + .0 + .blocks + .lock() + .unwrap() + .back() + .map(|b| b.justification.clone()), + }) } async fn block( @@ -69,11 +75,12 @@ impl PersistentBlockStore for BlockStore { ) -> ctx::Result<()> { let mut blocks = self.0.blocks.lock().unwrap(); let got = block.header().number; - if let Some(last) = blocks.back() { - let want = last.header().number.next(); - if got != want { - return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); - } + let want = match blocks.back() { + Some(last) => last.header().number.next(), + None => self.0.first, + }; + if got != want { + return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); } blocks.push_back(block.clone()); Ok(()) diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index 3a44800b..f0d66052 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -29,49 +29,63 @@ impl Distribution for Standard { } } -/// Constructs a new in-memory store with a genesis block. +/// Constructs a new in-memory store. pub async fn new_store( ctx: &ctx::Ctx, genesis: &validator::Genesis, ) -> (Arc, BlockStoreRunner) { - BlockStore::new(ctx, Box::new(in_memory::BlockStore::new(genesis.clone()))) - .await - .unwrap() + new_store_with_first(ctx, genesis, genesis.fork.first_block).await +} + +/// Constructs a new in-memory store with a custom expected first block +/// (i.e. possibly different than `genesis.fork.first_block`). +pub async fn new_store_with_first( + ctx: &ctx::Ctx, + genesis: &validator::Genesis, + first: validator::BlockNumber, +) -> (Arc, BlockStoreRunner) { + BlockStore::new( + ctx, + Box::new(in_memory::BlockStore::new(genesis.clone(), first)), + ) + .await + .unwrap() } /// Dumps all the blocks stored in `store`. pub async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { let genesis = store.genesis(ctx).await.unwrap(); - let last = store.last(ctx).await.unwrap(); + let state = store.state(ctx).await.unwrap(); + assert!(genesis.fork.first_block <= state.first); let mut blocks = vec![]; - let begin = genesis.fork.first_block; - let end = last + let after = state + .last .as_ref() .map(|qc| qc.header().number.next()) - .unwrap_or(begin); - for n in (begin.0..end.0).map(validator::BlockNumber) { + .unwrap_or(state.first); + for n in (state.first.0..after.0).map(validator::BlockNumber) { let block = store.block(ctx, n).await.unwrap(); assert_eq!(block.header().number, n); blocks.push(block); } - assert!(store.block(ctx, end).await.is_err()); + if let Some(before) = state.first.prev() { + assert!(store.block(ctx, before).await.is_err()); + } + assert!(store.block(ctx, after).await.is_err()); blocks } /// Verifies storage content. pub async fn verify(ctx: &ctx::Ctx, store: &BlockStore) -> anyhow::Result<()> { let range = store.subscribe().borrow().clone(); - let mut parent: Option = None; for n in (range.first.0..range.next().0).map(validator::BlockNumber) { async { - let block = store.block(ctx, n).await?.context("missing")?; - block.verify(store.genesis())?; - // Ignore checking the first block parent - if parent.is_some() { - anyhow::ensure!(parent == block.header().parent); - } - parent = Some(block.header().hash()); - Ok(()) + store + .block(ctx, n) + .await? + .context("missing")? + .verify(store.genesis()) + .context("verify()") } .await .context(n)?; diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 4a50c8a0..221be6f7 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -1,6 +1,6 @@ use super::*; -use crate::{testonly::new_store, ReplicaState}; -use zksync_concurrency::{ctx, scope, sync, testonly::abort_on_panic}; +use crate::{testonly::new_store_with_first, ReplicaState}; +use zksync_concurrency::{ctx, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator::testonly::Setup; #[tokio::test] @@ -10,7 +10,10 @@ async fn test_inmemory_block_store() { let mut setup = Setup::new(rng, 3); setup.push_blocks(rng, 5); - let store = &testonly::in_memory::BlockStore::new(setup.genesis.clone()); + let store = &testonly::in_memory::BlockStore::new( + setup.genesis.clone(), + setup.genesis.fork.first_block, + ); let mut want = vec![]; for block in &setup.blocks { store.store_next_block(ctx, block).await.unwrap(); @@ -32,26 +35,50 @@ async fn test_state_updates() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let mut setup = Setup::new(rng, 1); - setup.push_blocks(rng, 1); - let (store, runner) = new_store(ctx, &setup.genesis).await; + setup.push_blocks(rng, 5); + // Create store with non-trivial first block. + let first_block = &setup.blocks[2]; + let (store, runner) = new_store_with_first(ctx, &setup.genesis, first_block.number()).await; scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx)); let sub = &mut store.subscribe(); - let state = sub.borrow().clone(); - assert_eq!(state.first, setup.genesis.fork.first_block); - assert_eq!(state.last, None); + let want = BlockStoreState { + first: first_block.number(), + last: None, + }; - store - .queue_block(ctx, setup.blocks[0].clone()) - .await - .unwrap(); + // Waiting for blocks before genesis first block (or before `state.first_block`) should be ok + // and should complete immediately. + for n in [ + setup.genesis.fork.first_block.prev().unwrap(), + first_block.number().prev().unwrap(), + ] { + store.wait_until_queued(ctx, n).await.unwrap(); + store.wait_until_persisted(ctx, n).await.unwrap(); + assert_eq!(want, *sub.borrow()); + } - let state = sync::wait_for(ctx, sub, |state| { - state.last.as_ref() == Some(&setup.blocks[0].justification) - }) - .await? - .clone(); - assert_eq!(state.first, setup.blocks[0].header().number); + for block in &setup.blocks { + store.queue_block(ctx, block.clone()).await.unwrap(); + if block.number() < first_block.number() { + // Queueing block before first block should be a noop. + store.wait_until_queued(ctx, block.number()).await.unwrap(); + store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + assert_eq!(want, *sub.borrow()); + } else { + // Otherwise the state should be updated as soon as block is queued. + assert_eq!( + BlockStoreState { + first: first_block.number(), + last: Some(block.justification.clone()), + }, + *sub.borrow() + ); + } + } Ok(()) }) .await diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index f2832ae3..af581000 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -8,7 +8,7 @@ use std::{ }; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{PersistentBlockStore, ReplicaState, ReplicaStore}; +use zksync_consensus_storage::{BlockStoreState, PersistentBlockStore, ReplicaState, ReplicaStore}; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -107,8 +107,12 @@ impl PersistentBlockStore for RocksDB { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(scope::wait_blocking(|| self.last_blocking()).await?) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + // `RocksDB` is assumed to store all blocks starting from genesis. + first: self.0.genesis.fork.first_block, + last: scope::wait_blocking(|| self.last_blocking()).await?, + }) } async fn block(