diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index dce74edc..97559ba1 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -7,7 +7,7 @@ impl StateMachine { /// Tries to build a finalized block from the given CommitQC. We simply search our /// block proposal cache for the matching block, and if we find it we build the block. /// If this method succeeds, it sends the finalized block to the executor. - #[instrument(level = "trace", ret)] + #[instrument(level = "debug", skip(self), ret)] pub(crate) async fn save_block( &mut self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/block_store.rs b/node/libs/storage/src/block_store.rs index 81a42665..f1e7be4f 100644 --- a/node/libs/storage/src/block_store.rs +++ b/node/libs/storage/src/block_store.rs @@ -1,4 +1,5 @@ //! Defines storage layer for finalized blocks. +use anyhow::Context as _; use std::collections::BTreeMap; use std::fmt; use zksync_concurrency::{ctx, sync}; @@ -25,17 +26,17 @@ impl BlockStoreState { /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] pub trait PersistentBlockStore: fmt::Debug + Send + Sync { - /// Range of blocks avaliable in storage. + /// Range of blocks avaliable in storage. None iff storage is empty. /// Consensus code calls this method only once and then tracks the /// range of avaliable blocks internally. - async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result>; - /// Gets a block by its number. Should return an error if block is missing. + /// Gets a block by its number. Returns None if block is missing. async fn block( &self, ctx: &ctx::Ctx, number: validator::BlockNumber, - ) -> ctx::Result; + ) -> ctx::Result>; /// Persistently store a block. /// Implementations are only required to accept a block directly after the current last block, @@ -73,9 +74,9 @@ impl BlockStore { if cache_capacity < 1 { return Err(anyhow::anyhow!("cache_capacity has to be >=1").into()); } - let state = persistent.state(ctx).await?; - if state.first > state.last { - return Err(anyhow::anyhow!("at least 1 block has to be available in storage").into()); + let state = persistent.state(ctx).await?.context("storage empty, expected at least 1 block")?; + if state.first.header().number > state.last.header().number { + return Err(anyhow::anyhow!("invalid state").into()); } Ok(Self { persistent, @@ -103,7 +104,7 @@ impl BlockStore { return Ok(Some(block.clone())); } } - Ok(Some(self.persistent.block(ctx, number).await?)) + Ok(Some(self.persistent.block(ctx, number).await?.context("block disappeared from storage")?)) } pub async fn last_block(&self, ctx: &ctx::Ctx) -> ctx::Result { @@ -139,6 +140,7 @@ impl BlockStore { Ok(()) } + #[tracing::instrument(level = "debug", skip(self))] pub async fn store_block( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index 18a37399..54e9bb24 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -7,7 +7,7 @@ use anyhow::Context as _; use rocksdb::{Direction, IteratorMode, ReadOptions}; use std::sync::Arc; use std::{fmt, path::Path, sync::RwLock}; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::{ctx, scope, error::Wrap as _}; use zksync_consensus_roles::validator; /// Enum used to represent a key in the database. It also acts as a separator between different stores. @@ -69,32 +69,33 @@ impl Store { ))) } - fn state_blocking(&self) -> anyhow::Result { + fn state_blocking(&self) -> anyhow::Result> { let db = self.0.read().unwrap(); + let mut options = ReadOptions::default(); + options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); + let Some(res) = db + .iterator_opt(IteratorMode::Start, options) + .next() + else { return Ok(None) }; + let (_,first) = res.context("RocksDB error reading first stored block")?; + let first: validator::FinalBlock = + zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?; + let mut options = ReadOptions::default(); options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); let (_, last) = db .iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options) .next() - .context("Head block not found")? + .context("last block not found")? .context("RocksDB error reading head block")?; let last: validator::FinalBlock = zksync_protobuf::decode(&last).context("Failed decoding head block bytes")?; - let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let (_, first) = db - .iterator_opt(IteratorMode::Start, options) - .next() - .context("First stored block not found")? - .context("RocksDB error reading first stored block")?; - let first: validator::FinalBlock = - zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?; - Ok(BlockStoreState { + Ok(Some(BlockStoreState { first: first.justification, last: last.justification, - }) + })) } } @@ -106,7 +107,7 @@ impl fmt::Debug for Store { #[async_trait::async_trait] impl PersistentBlockStore for Arc { - async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result> { Ok(scope::wait_blocking(|| self.state_blocking()).await?) } @@ -114,19 +115,18 @@ impl PersistentBlockStore for Arc { &self, _ctx: &ctx::Ctx, number: validator::BlockNumber, - ) -> ctx::Result { - Ok(scope::wait_blocking(|| { + ) -> ctx::Result> { + scope::wait_blocking(|| { let db = self.0.read().unwrap(); - let block = db + let Some(block) = db .get(DatabaseKey::Block(number).encode_key()) .context("RocksDB error")? - .context("not found")?; - zksync_protobuf::decode(&block).context("failed decoding block") - }) - .await - .context(number)?) + else { return Ok(None) }; + Ok(Some(zksync_protobuf::decode(&block).context("failed decoding block")?)) + }).await.wrap(number) } + #[tracing::instrument(level = "debug", skip(self))] async fn store_next_block( &self, _ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index aed232ce..5f453397 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,6 +1,5 @@ //! In-memory storage implementation. use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; -use anyhow::Context as _; use std::collections::BTreeMap; use std::sync::Mutex; use zksync_concurrency::ctx; @@ -23,26 +22,25 @@ impl BlockStore { #[async_trait::async_trait] impl PersistentBlockStore for BlockStore { - async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result> { let blocks = self.0.lock().unwrap(); - Ok(BlockStoreState { + Ok(Some(BlockStoreState { first: blocks.first_key_value().unwrap().1.justification.clone(), last: blocks.last_key_value().unwrap().1.justification.clone(), - }) + })) } async fn block( &self, _ctx: &ctx::Ctx, number: validator::BlockNumber, - ) -> ctx::Result { + ) -> ctx::Result> { Ok(self .0 .lock() .unwrap() .get(&number) - .context("not found")? - .clone()) + .cloned()) } async fn store_next_block( diff --git a/node/libs/storage/src/tests/mod.rs b/node/libs/storage/src/tests/mod.rs index 50fdd1db..4c4d872f 100644 --- a/node/libs/storage/src/tests/mod.rs +++ b/node/libs/storage/src/tests/mod.rs @@ -40,15 +40,15 @@ fn make_block(rng: &mut impl Rng, parent: &validator::BlockHeader) -> validator: } async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { + let Some(range) = store.state(ctx).await.unwrap() else { return vec![] }; let mut blocks = vec![]; - let range = store.state(ctx).await.unwrap(); for n in range.first.header().number.0..range.next().0 { let n = validator::BlockNumber(n); - let block = store.block(ctx, n).await.unwrap(); + let block = store.block(ctx, n).await.unwrap().unwrap(); assert_eq!(block.header().number, n); blocks.push(block); } - assert!(store.block(ctx, range.next()).await.is_err()); + assert!(store.block(ctx, range.next()).await.unwrap().is_none()); blocks } diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 41b03e6c..839e727d 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -81,11 +81,9 @@ fn main() -> anyhow::Result<()> { public_addr: addrs[i], metrics_server_addr, - validator_key: Some(validator_keys[i].public()), validators: validator_set.clone(), genesis_block: genesis.clone(), - node_key: node_keys[i].public(), gossip_dynamic_inbound_limit: 0, gossip_static_inbound: [].into(), gossip_static_outbound: [].into(), diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index c02c3c07..fa8fb68f 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -31,11 +31,9 @@ pub struct AppConfig { pub public_addr: std::net::SocketAddr, pub metrics_server_addr: Option, - pub validator_key: Option, pub validators: validator::ValidatorSet, pub genesis_block: validator::FinalBlock, - pub node_key: node::PublicKey, pub gossip_dynamic_inbound_limit: u64, pub gossip_static_inbound: HashSet, pub gossip_static_outbound: HashMap, @@ -76,11 +74,9 @@ impl ProtoFmt for AppConfig { metrics_server_addr: read_optional_text(&r.metrics_server_addr) .context("metrics_server_addr")?, - validator_key: read_optional_text(&r.validator_key).context("validator_key")?, validators, genesis_block: read_required_text(&r.genesis_block).context("genesis_block")?, - node_key: read_required_text(&r.node_key).context("node_key")?, gossip_dynamic_inbound_limit: *required(&r.gossip_dynamic_inbound_limit) .context("gossip_dynamic_inbound_limit")?, gossip_static_inbound, @@ -94,11 +90,9 @@ impl ProtoFmt for AppConfig { public_addr: Some(self.public_addr.encode()), metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), - validator_key: self.validator_key.as_ref().map(TextFmt::encode), validators: self.validators.iter().map(TextFmt::encode).collect(), genesis_block: Some(self.genesis_block.encode()), - node_key: Some(self.node_key.encode()), gossip_dynamic_inbound_limit: Some(self.gossip_dynamic_inbound_limit), gossip_static_inbound: self .gossip_static_inbound @@ -171,35 +165,29 @@ impl<'a> ConfigPaths<'a> { } impl Configs { - pub async fn into_executor(self, ctx: &ctx::Ctx) -> anyhow::Result { - anyhow::ensure!( - self.app.node_key == self.node_key.public(), - "node secret key has to match the node public key in the app config", - ); - anyhow::ensure!( - self.app.validator_key == self.validator_key.as_ref().map(|k| k.public()), - "validator secret key has to match the validator public key in the app config", - ); - let storage = Arc::new(rocksdb::Store::new(&self.database).await?); - let block_store = Arc::new(BlockStore::new(ctx,Box::new(storage.clone()),1000).await?); - // TODO: figure out how to insert iff empty. - storage.store_next_block(ctx,&self.app.genesis_block,).await.context("store_next_block")?; + pub async fn make_executor(&self, ctx: &ctx::Ctx) -> anyhow::Result { + let store = Arc::new(rocksdb::Store::new(&self.database).await?); + // Store genesis if db is empty. + if store.state(ctx).await?.is_none() { + store.store_next_block(ctx,&self.app.genesis_block).await.context("store_next_block()")?; + } + let block_store = Arc::new(BlockStore::new(ctx,Box::new(store.clone()),1000).await?); Ok(executor::Executor { config: executor::Config { - server_addr: self.app.server_addr, - validators: self.app.validators, - node_key: self.node_key, + server_addr: self.app.server_addr.clone(), + validators: self.app.validators.clone(), + node_key: self.node_key.clone(), gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit, - gossip_static_inbound: self.app.gossip_static_inbound, - gossip_static_outbound: self.app.gossip_static_outbound, + gossip_static_inbound: self.app.gossip_static_inbound.clone(), + gossip_static_outbound: self.app.gossip_static_outbound.clone(), }, block_store: block_store, - validator: self.validator_key.map(|key| executor::Validator { + validator: self.validator_key.as_ref().map(|key| executor::Validator { config: executor::ValidatorConfig { - key, - public_addr: self.app.public_addr, + key: key.clone(), + public_addr: self.app.public_addr.clone(), }, - replica_store: Box::new(storage), + replica_store: Box::new(store), payload_manager: Box::new(bft::testonly::RandomPayload), }), }) diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index 41124afa..6e6c9c49 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> { .with_ansi(std::env::var("NO_COLOR").is_err() && std::io::stdout().is_terminal()) .with_file(false) .with_line_number(false) - .with_filter(LevelFilter::INFO); + .with_filter(tracing_subscriber::EnvFilter::from_default_env()); // Create the logger for the log file. This will produce machine-readable logs for // all events of level DEBUG or higher. @@ -84,6 +84,12 @@ async fn main() -> anyhow::Result<()> { .load() .context("config_paths().load()")?; + let executor = configs + .make_executor(ctx) + .await + .context("configs.into_executor()")?; + let block_store = executor.block_store.clone(); + // Initialize the storage. scope::run!(ctx, |ctx, s| async { if let Some(addr) = configs.app.metrics_server_addr { @@ -97,10 +103,7 @@ async fn main() -> anyhow::Result<()> { Ok(()) }); } - let executor = configs - .into_executor(ctx) - .await - .context("configs.into_executor()")?; + s.spawn_bg(block_store.run_background_tasks(ctx)); s.spawn(executor.run(ctx)); Ok(()) }) diff --git a/node/tools/src/proto/mod.proto b/node/tools/src/proto/mod.proto index b02501e6..e857c8eb 100644 --- a/node/tools/src/proto/mod.proto +++ b/node/tools/src/proto/mod.proto @@ -61,11 +61,6 @@ message AppConfig { optional string metrics_server_addr = 3; // [optional] IpAddr // Consensus - - // Public key of this validator. - // Should be set iff this node is a validator. - // It should match the secret key provided in the `validator_key` file. - optional string validator_key = 4; // [optional] ValidatorPublicKey // Public keys of all validators. repeated string validators = 5; // [required] ValidatorPublicKey @@ -76,9 +71,6 @@ message AppConfig { // Gossip network - // Public key of this node. It uniquely identifies the node. - // It should match the secret key provided in the `node_key` file. - optional string node_key = 7; // [required] NodePublicKey // Limit on the number of gossip network inbound connections outside // of the `gossip_static_inbound` set. optional uint64 gossip_dynamic_inbound_limit = 8; // [required] diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index cdead70a..71a97deb 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -4,7 +4,7 @@ use rand::{ Rng, }; use zksync_concurrency::ctx; -use zksync_consensus_roles::{node, validator}; +use zksync_consensus_roles::{node}; use zksync_protobuf::testonly::test_encode_random; fn make_addr(rng: &mut R) -> std::net::SocketAddr { @@ -18,11 +18,9 @@ impl Distribution for Standard { public_addr: make_addr(rng), metrics_server_addr: Some(make_addr(rng)), - validator_key: Some(rng.gen::().public()), validators: rng.gen(), genesis_block: rng.gen(), - node_key: rng.gen::().public(), gossip_dynamic_inbound_limit: rng.gen(), gossip_static_inbound: (0..5) .map(|_| rng.gen::().public())