diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 1da0f59e..840f905a 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -81,7 +81,7 @@ impl Consensus { /// Starts the Consensus actor. It will start running, processing incoming messages and /// sending output messages. This is a blocking method. - #[instrument(level = "trace", skip(self) ret)] + #[instrument(level = "trace", skip(self), err)] pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { info!( "Starting consensus actor {:?}", diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 33c46a5f..a65857d6 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -1,7 +1,7 @@ //! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. use crate::io::Dispatcher; use anyhow::Context as _; -use std::{any, sync::Arc}; +use std::{any, fmt, sync::Arc}; use zksync_concurrency::{ctx, net, scope}; use zksync_consensus_bft::{misc::consensus_threshold, Consensus, PayloadSource}; use zksync_consensus_network as network; @@ -30,6 +30,14 @@ struct ValidatorExecutor { payload_source: Arc, } +impl fmt::Debug for ValidatorExecutor { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ValidatorExecutor") + .field("config", &self.config) + .finish() + } +} + impl ValidatorExecutor { /// Returns consensus network configuration. fn consensus_config(&self) -> network::consensus::Config { @@ -42,6 +50,7 @@ impl ValidatorExecutor { } /// Executor allowing to spin up all actors necessary for a consensus node. +#[derive(Debug)] pub struct Executor { /// General-purpose executor configuration. executor_config: ExecutorConfig, diff --git a/node/libs/storage/src/in_memory.rs b/node/libs/storage/src/in_memory.rs index 425dd3bf..7bbb1394 100644 --- a/node/libs/storage/src/in_memory.rs +++ b/node/libs/storage/src/in_memory.rs @@ -10,30 +10,33 @@ use zksync_concurrency::{ ctx, sync::{watch, Mutex}, }; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator; #[derive(Debug)] struct BlocksInMemoryStore { - blocks: BTreeMap, - last_contiguous_block_number: BlockNumber, + blocks: BTreeMap, + last_contiguous_block_number: validator::BlockNumber, } impl BlocksInMemoryStore { - fn head_block(&self) -> &FinalBlock { + fn head_block(&self) -> &validator::FinalBlock { self.blocks.values().next_back().unwrap() // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block } - fn first_block(&self) -> &FinalBlock { + fn first_block(&self) -> &validator::FinalBlock { self.blocks.values().next().unwrap() // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block } - fn block(&self, number: BlockNumber) -> Option<&FinalBlock> { + fn block(&self, number: validator::BlockNumber) -> Option<&validator::FinalBlock> { self.blocks.get(&number) } - fn missing_block_numbers(&self, range: ops::Range) -> Vec { + fn missing_block_numbers( + &self, + range: ops::Range, + ) -> Vec { let existing_numbers = self .blocks .range(range.clone()) @@ -43,7 +46,7 @@ impl BlocksInMemoryStore { .collect() } - fn put_block(&mut self, block: FinalBlock) { + fn put_block(&mut self, block: validator::FinalBlock) { let block_number = block.header.number; tracing::debug!("Inserting block #{block_number} into database"); if let Some(prev_block) = self.blocks.insert(block_number, block) { @@ -69,12 +72,12 @@ impl BlocksInMemoryStore { pub struct InMemoryStorage { blocks: Mutex, replica_state: Mutex>, - blocks_sender: watch::Sender, + blocks_sender: watch::Sender, } impl InMemoryStorage { /// Creates a new store containing only the specified `genesis_block`. - pub fn new(genesis_block: FinalBlock) -> Self { + pub fn new(genesis_block: validator::FinalBlock) -> Self { let genesis_block_number = genesis_block.header.number; Self { blocks: Mutex::new(BlocksInMemoryStore { @@ -89,38 +92,62 @@ impl InMemoryStorage { #[async_trait] impl BlockStore for InMemoryStorage { - async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(self.blocks.lock().await.head_block().clone()) } - async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(self.blocks.lock().await.first_block().clone()) } - async fn last_contiguous_block_number(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn last_contiguous_block_number( + &self, + _ctx: &ctx::Ctx, + ) -> ctx::Result { Ok(self.blocks.lock().await.last_contiguous_block_number) } - async fn block(&self, _ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { Ok(self.blocks.lock().await.block(number).cloned()) } async fn missing_block_numbers( &self, _ctx: &ctx::Ctx, - range: ops::Range, - ) -> ctx::Result> { + range: ops::Range, + ) -> ctx::Result> { Ok(self.blocks.lock().await.missing_block_numbers(range)) } - fn subscribe_to_block_writes(&self) -> watch::Receiver { + fn subscribe_to_block_writes(&self) -> watch::Receiver { self.blocks_sender.subscribe() } } #[async_trait] impl WriteBlockStore for InMemoryStorage { - async fn put_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { + /// Just verifies that the payload is for the successor of the current head. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + let head_number = self.head_block(ctx).await?.header.number; + if head_number >= block_number { + return Err(anyhow::anyhow!( + "received proposal for block {block_number:?}, while head is at {head_number:?}" + ) + .into()); + } + Ok(()) + } + + async fn put_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { self.blocks.lock().await.put_block(block.clone()); self.blocks_sender.send_replace(block.header.number); Ok(()) diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs index aa46e15c..5daf2a82 100644 --- a/node/libs/storage/src/replica_state.rs +++ b/node/libs/storage/src/replica_state.rs @@ -20,7 +20,7 @@ impl From for ReplicaState { } } -/// ReplicaStorage. +/// Storage combining [`ReplicaStateStore`] and [`WriteBlockStore`]. #[derive(Debug, Clone)] pub struct ReplicaStore { state: Arc, diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index a5f7b121..6366e313 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -18,7 +18,7 @@ use std::{ }, }; use zksync_concurrency::{ctx, scope, sync::watch}; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -27,8 +27,8 @@ enum DatabaseKey { /// ReplicaState -> ReplicaState ReplicaState, /// Key used to store the finalized blocks. - /// Block(BlockNumber) -> FinalBlock - Block(BlockNumber), + /// Block(validator::BlockNumber) -> validator::FinalBlock + Block(validator::BlockNumber), } impl DatabaseKey { @@ -57,11 +57,11 @@ impl DatabaseKey { } /// Parses the specified bytes as a `Self::Block(_)` key. - pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { + pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { let raw_key = raw_key .try_into() .context("Invalid encoding for block key")?; - Ok(BlockNumber(u64::from_be_bytes(raw_key))) + Ok(validator::BlockNumber(u64::from_be_bytes(raw_key))) } } @@ -79,7 +79,7 @@ pub struct RocksdbStorage { /// that blocks are never removed from the DB. cached_last_contiguous_block_number: AtomicU64, /// Sender of numbers of written blocks. - block_writes_sender: watch::Sender, + block_writes_sender: watch::Sender, } impl RocksdbStorage { @@ -87,7 +87,11 @@ impl RocksdbStorage { /// a new one. We need the genesis block of the chain as input. // TODO(bruno): we want to eventually start pruning old blocks, so having the genesis // block might be unnecessary. - pub async fn new(ctx: &ctx::Ctx, genesis_block: &FinalBlock, path: &Path) -> ctx::Result { + pub async fn new( + ctx: &ctx::Ctx, + genesis_block: &validator::FinalBlock, + path: &Path, + ) -> ctx::Result { let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); @@ -127,7 +131,7 @@ impl RocksdbStorage { self.inner.write().expect("DB lock is poisoned") } - fn head_block_blocking(&self) -> anyhow::Result { + fn head_block_blocking(&self) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -141,7 +145,7 @@ impl RocksdbStorage { } /// Returns a block with the least number stored in this database. - fn first_block_blocking(&self) -> anyhow::Result { + fn first_block_blocking(&self) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -154,11 +158,11 @@ impl RocksdbStorage { zksync_protobuf::decode(&first_block).context("Failed decoding first stored block bytes") } - fn last_contiguous_block_number_blocking(&self) -> anyhow::Result { + fn last_contiguous_block_number_blocking(&self) -> anyhow::Result { let last_contiguous_block_number = self .cached_last_contiguous_block_number .load(Ordering::Relaxed); - let last_contiguous_block_number = BlockNumber(last_contiguous_block_number); + let last_contiguous_block_number = validator::BlockNumber(last_contiguous_block_number); let last_contiguous_block_number = self.last_contiguous_block_number_impl(last_contiguous_block_number)?; @@ -175,8 +179,8 @@ impl RocksdbStorage { // is for the `cached_last_contiguous_block_number` to be present in the database. fn last_contiguous_block_number_impl( &self, - cached_last_contiguous_block_number: BlockNumber, - ) -> anyhow::Result { + cached_last_contiguous_block_number: validator::BlockNumber, + ) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -202,7 +206,10 @@ impl RocksdbStorage { } /// Gets a block by its number. - fn block_blocking(&self, number: BlockNumber) -> anyhow::Result> { + fn block_blocking( + &self, + number: validator::BlockNumber, + ) -> anyhow::Result> { let db = self.read(); let Some(raw_block) = db @@ -219,8 +226,8 @@ impl RocksdbStorage { /// Iterates over block numbers in the specified `range` that the DB *does not* have. fn missing_block_numbers_blocking( &self, - range: ops::Range, - ) -> anyhow::Result> { + range: ops::Range, + ) -> anyhow::Result> { let db = self.read(); let mut options = ReadOptions::default(); @@ -242,7 +249,7 @@ impl RocksdbStorage { // ---------------- Write methods ---------------- /// Insert a new block into the database. - fn put_block_blocking(&self, finalized_block: &FinalBlock) -> anyhow::Result<()> { + fn put_block_blocking(&self, finalized_block: &validator::FinalBlock) -> anyhow::Result<()> { let db = self.write(); let block_number = finalized_block.header.number; tracing::debug!("Inserting new block #{block_number} into the database."); @@ -292,38 +299,62 @@ impl fmt::Debug for RocksdbStorage { #[async_trait] impl BlockStore for RocksdbStorage { - async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(scope::wait_blocking(|| self.head_block_blocking()).await?) } - async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(scope::wait_blocking(|| self.first_block_blocking()).await?) } - async fn last_contiguous_block_number(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn last_contiguous_block_number( + &self, + _ctx: &ctx::Ctx, + ) -> ctx::Result { Ok(scope::wait_blocking(|| self.last_contiguous_block_number_blocking()).await?) } - async fn block(&self, _ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { Ok(scope::wait_blocking(|| self.block_blocking(number)).await?) } async fn missing_block_numbers( &self, _ctx: &ctx::Ctx, - range: ops::Range, - ) -> ctx::Result> { + range: ops::Range, + ) -> ctx::Result> { Ok(scope::wait_blocking(|| self.missing_block_numbers_blocking(range)).await?) } - fn subscribe_to_block_writes(&self) -> watch::Receiver { + fn subscribe_to_block_writes(&self) -> watch::Receiver { self.block_writes_sender.subscribe() } } #[async_trait] impl WriteBlockStore for RocksdbStorage { - async fn put_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { + /// Just verifies that the payload is for the successor of the current head. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + let head_number = self.head_block(ctx).await?.header.number; + if head_number >= block_number { + return Err(anyhow::anyhow!( + "received proposal for block {block_number:?}, while head is at {head_number:?}" + ) + .into()); + } + Ok(()) + } + + async fn put_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { Ok(scope::wait_blocking(|| self.put_block_blocking(block)).await?) } } diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs index 50865485..83e069f6 100644 --- a/node/libs/storage/src/traits.rs +++ b/node/libs/storage/src/traits.rs @@ -56,16 +56,8 @@ pub trait WriteBlockStore: BlockStore { ctx: &ctx::Ctx, block_number: BlockNumber, _payload: &Payload, - ) -> ctx::Result<()> { - let head_number = self.head_block(ctx).await?.header.number; - if head_number >= block_number { - return Err(anyhow::anyhow!( - "received proposal for block {block_number:?}, while head is at {head_number:?}" - ) - .into()); - } - Ok(()) - } + ) -> ctx::Result<()>; + /// Puts a block into this storage. async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()>; }