From a49b61d7769f9dd7b4cbc4905f8f8a23abfb541c Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 23 Nov 2023 11:21:01 +0200 Subject: [PATCH] feat(en): Implement gossip fetcher (#371) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ ...i.e., a fetcher component that would use gossip network instead of JSON-RPC API. Fixes BFT-326 and BFT-368. ## Why ❔ This can be used by external nodes to sync with the main node. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 33 +- core/bin/external_node/src/main.rs | 7 +- core/lib/types/Cargo.toml | 6 +- core/lib/zksync_core/Cargo.toml | 12 +- core/lib/zksync_core/src/consensus/mod.rs | 8 +- core/lib/zksync_core/src/consensus/payload.rs | 39 +- core/lib/zksync_core/src/lib.rs | 1 + .../zksync_core/src/sync_layer/external_io.rs | 47 +- .../lib/zksync_core/src/sync_layer/fetcher.rs | 214 ++++---- .../src/sync_layer/gossip/buffered/mod.rs | 340 +++++++++++++ .../src/sync_layer/gossip/buffered/tests.rs | 287 +++++++++++ .../src/sync_layer/gossip/conversions.rs | 57 +++ .../src/sync_layer/gossip/metrics.rs | 29 ++ .../zksync_core/src/sync_layer/gossip/mod.rs | 93 ++++ .../src/sync_layer/gossip/storage/mod.rs | 219 +++++++++ .../src/sync_layer/gossip/storage/tests.rs | 127 +++++ .../src/sync_layer/gossip/tests.rs | 339 +++++++++++++ .../src/sync_layer/gossip/utils.rs | 48 ++ core/lib/zksync_core/src/sync_layer/mod.rs | 5 +- .../zksync_core/src/sync_layer/sync_action.rs | 35 +- core/lib/zksync_core/src/sync_layer/tests.rs | 225 ++++----- prover/Cargo.lock | 464 +++++++++++++++++- 22 files changed, 2364 insertions(+), 271 deletions(-) create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/conversions.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/metrics.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/mod.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/tests.rs create mode 100644 core/lib/zksync_core/src/sync_layer/gossip/utils.rs diff --git a/Cargo.lock b/Cargo.lock index ff4520eeed59..2527bb550223 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3320,9 +3320,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.2", @@ -4971,7 +4971,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.0.2", + "indexmap 2.1.0", ] [[package]] @@ -7363,7 +7363,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.0.2", + "indexmap 2.1.0", "toml_datetime", "winnow", ] @@ -8514,7 +8514,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "once_cell", @@ -8541,7 +8541,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "once_cell", @@ -8561,7 +8561,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "blst", @@ -8579,7 +8579,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "prost", @@ -8601,7 +8601,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "async-trait", @@ -8625,7 +8625,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "bit-vec", @@ -8644,7 +8644,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "async-trait", @@ -8661,7 +8661,7 @@ dependencies = [ [[package]] name = "zksync_consensus_sync_blocks" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "thiserror", @@ -8676,7 +8676,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "thiserror", "zksync_concurrency", @@ -9001,7 +9001,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "bit-vec", @@ -9012,9 +9012,6 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "tokio", - "tracing", - "tracing-subscriber", "zksync_concurrency", "zksync_protobuf_build", ] @@ -9022,7 +9019,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=bdf9ed0af965cc7fa32d6c46a35ea065779ede8b#bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" dependencies = [ "anyhow", "heck 0.4.1", diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 4e12315d930c..52f3353dc072 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -24,9 +24,8 @@ use zksync_core::{ setup_sigint_handler, state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper}, sync_layer::{ - batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, - fetcher::MainNodeFetcherCursor, genesis::perform_genesis_if_needed, ActionQueue, - MainNodeClient, SyncState, + batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor, + genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState, }, }; use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool}; @@ -128,7 +127,7 @@ async fn init_tasks( .await .context("failed to build a connection pool for `MainNodeFetcher`")?; let mut storage = pool.access_storage_tagged("sync_layer").await?; - MainNodeFetcherCursor::new(&mut storage) + FetcherCursor::new(&mut storage) .await .context("failed to load `MainNodeFetcher` cursor from Postgres")? }; diff --git a/core/lib/types/Cargo.toml b/core/lib/types/Cargo.toml index 117cbdcec8dc..6bf130bc70c0 100644 --- a/core/lib/types/Cargo.toml +++ b/core/lib/types/Cargo.toml @@ -23,8 +23,8 @@ codegen = { git = "https://github.com/matter-labs/solidity_plonk_verifier.git", zkevm_test_harness = { git = "https://github.com/matter-labs/era-zkevm_test_harness.git", branch = "v1.3.3" } zk_evm_1_4_0 = { git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0", package = "zk_evm" } zk_evm = { git = "https://github.com/matter-labs/era-zk_evm.git", tag = "v1.3.3-rc2" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } anyhow = "1.0.75" chrono = { version = "0.4", features = ["serde"] } @@ -55,4 +55,4 @@ tokio = { version = "1", features = ["rt", "macros"] } serde_with = { version = "1", features = ["hex"] } [build-dependencies] -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } diff --git a/core/lib/zksync_core/Cargo.toml b/core/lib/zksync_core/Cargo.toml index 6c8e43763fd1..2bccff98ae9e 100644 --- a/core/lib/zksync_core/Cargo.toml +++ b/core/lib/zksync_core/Cargo.toml @@ -40,11 +40,11 @@ vlog = { path = "../vlog" } multivm = { path = "../multivm" } # Consensus dependenices -zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } -zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } -zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } +zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } +zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } +zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } prost = "0.12.1" serde = { version = "1.0", features = ["derive"] } @@ -98,4 +98,4 @@ tempfile = "3.0.2" test-casing = "0.1.2" [build-dependencies] -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" } diff --git a/core/lib/zksync_core/src/consensus/mod.rs b/core/lib/zksync_core/src/consensus/mod.rs index 08a02e1dd2a4..a229666e76c8 100644 --- a/core/lib/zksync_core/src/consensus/mod.rs +++ b/core/lib/zksync_core/src/consensus/mod.rs @@ -1,10 +1,6 @@ -use anyhow::Context as _; -use zksync_concurrency::{ctx, time}; -use zksync_consensus_roles::validator; -use zksync_types::block::ConsensusBlockFields; -use zksync_types::{Address, MiniblockNumber}; +//! Consensus-related functionality. mod payload; mod proto; -pub(crate) use payload::Payload; +pub(crate) use self::payload::Payload; diff --git a/core/lib/zksync_core/src/consensus/payload.rs b/core/lib/zksync_core/src/consensus/payload.rs index 818d63d74146..8d53fdf21f31 100644 --- a/core/lib/zksync_core/src/consensus/payload.rs +++ b/core/lib/zksync_core/src/consensus/payload.rs @@ -1,9 +1,12 @@ use anyhow::Context as _; + use zksync_consensus_roles::validator; use zksync_protobuf::{required, ProtoFmt}; use zksync_types::api::en::SyncBlock; use zksync_types::{Address, L1BatchNumber, Transaction, H256}; +/// L2 block (= miniblock) payload. +#[derive(Debug)] pub(crate) struct Payload { pub hash: H256, pub l1_batch_number: L1BatchNumber, @@ -17,28 +20,31 @@ pub(crate) struct Payload { impl ProtoFmt for Payload { type Proto = super::proto::Payload; - fn read(r: &Self::Proto) -> anyhow::Result { - let mut transactions = vec![]; - for (i, t) in r.transactions.iter().enumerate() { + + fn read(message: &Self::Proto) -> anyhow::Result { + let mut transactions = Vec::with_capacity(message.transactions.len()); + for (i, tx) in message.transactions.iter().enumerate() { transactions.push( - required(&t.json) - .and_then(|s| Ok(serde_json::from_str(&*s)?)) + required(&tx.json) + .and_then(|json_str| Ok(serde_json::from_str(json_str)?)) .with_context(|| format!("transaction[{i}]"))?, ); } + Ok(Self { - hash: required(&r.hash) - .and_then(|h| Ok(<[u8; 32]>::try_from(h.as_slice())?.into())) + hash: required(&message.hash) + .and_then(|bytes| Ok(<[u8; 32]>::try_from(bytes.as_slice())?.into())) .context("hash")?, l1_batch_number: L1BatchNumber( - *required(&r.l1_batch_number).context("l1_batch_number")?, + *required(&message.l1_batch_number).context("l1_batch_number")?, ), - timestamp: *required(&r.timestamp).context("timestamp")?, - l1_gas_price: *required(&r.l1_gas_price).context("l1_gas_price")?, - l2_fair_gas_price: *required(&r.l2_fair_gas_price).context("l2_fair_gas_price")?, - virtual_blocks: *required(&r.virtual_blocks).context("virtual_blocks")?, - operator_address: required(&r.operator_address) - .and_then(|a| Ok(<[u8; 20]>::try_from(a.as_slice())?.into())) + timestamp: *required(&message.timestamp).context("timestamp")?, + l1_gas_price: *required(&message.l1_gas_price).context("l1_gas_price")?, + l2_fair_gas_price: *required(&message.l2_fair_gas_price) + .context("l2_fair_gas_price")?, + virtual_blocks: *required(&message.virtual_blocks).context("virtual_blocks")?, + operator_address: required(&message.operator_address) + .and_then(|bytes| Ok(<[u8; 20]>::try_from(bytes.as_slice())?.into())) .context("operator_address")?, transactions, }) @@ -67,6 +73,7 @@ impl ProtoFmt for Payload { impl TryFrom for Payload { type Error = anyhow::Error; + fn try_from(block: SyncBlock) -> anyhow::Result { Ok(Self { hash: block.hash.unwrap_or_default(), @@ -82,8 +89,8 @@ impl TryFrom for Payload { } impl Payload { - pub fn decode(p: &validator::Payload) -> anyhow::Result { - zksync_protobuf::decode(&p.0) + pub fn decode(payload: &validator::Payload) -> anyhow::Result { + zksync_protobuf::decode(&payload.0) } pub fn encode(&self) -> validator::Payload { diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 94d050f36ad9..d52fd76661f2 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -45,6 +45,7 @@ use zksync_verification_key_server::get_cached_commitments; pub mod api_server; pub mod basic_witness_input_producer; pub mod block_reverter; +mod consensus; pub mod consistency_checker; pub mod data_fetchers; pub mod eth_sender; diff --git a/core/lib/zksync_core/src/sync_layer/external_io.rs b/core/lib/zksync_core/src/sync_layer/external_io.rs index 3742c92fca16..dcc38334a998 100644 --- a/core/lib/zksync_core/src/sync_layer/external_io.rs +++ b/core/lib/zksync_core/src/sync_layer/external_io.rs @@ -215,7 +215,10 @@ impl IoSealCriteria for ExternalIO { } fn should_seal_miniblock(&mut self, _manager: &UpdatesManager) -> bool { - matches!(self.actions.peek_action(), Some(SyncAction::SealMiniblock)) + matches!( + self.actions.peek_action(), + Some(SyncAction::SealMiniblock(_)) + ) } } @@ -368,7 +371,7 @@ impl StateKeeperIO for ExternalIO { virtual_blocks, }); } - Some(SyncAction::SealBatch { virtual_blocks }) => { + Some(SyncAction::SealBatch { virtual_blocks, .. }) => { // We've reached the next batch, so this situation would be handled by the batch sealer. // No need to pop the action from the queue. // It also doesn't matter which timestamp we return, since there will be no more miniblocks in this @@ -434,12 +437,9 @@ impl StateKeeperIO for ExternalIO { } async fn seal_miniblock(&mut self, updates_manager: &UpdatesManager) { - match self.actions.pop_action() { - Some(SyncAction::SealMiniblock) => {} - other => panic!( - "State keeper requested to seal miniblock, but the next action is {:?}", - other - ), + let action = self.actions.pop_action(); + let Some(SyncAction::SealMiniblock(consensus)) = action else { + panic!("State keeper requested to seal miniblock, but the next action is {action:?}"); }; let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap(); @@ -481,6 +481,16 @@ impl StateKeeperIO for ExternalIO { self.l2_erc20_bridge_addr, ); command.seal(&mut transaction).await; + + // We want to add miniblock consensus fields atomically with the miniblock data so that we + // don't need to deal with corner cases (e.g., a miniblock w/o consensus fields). + if let Some(consensus) = &consensus { + transaction + .blocks_dal() + .set_miniblock_consensus_fields(self.current_miniblock_number, consensus) + .await + .unwrap(); + } transaction.commit().await.unwrap(); self.sync_state @@ -497,23 +507,32 @@ impl StateKeeperIO for ExternalIO { l1_batch_env: &L1BatchEnv, finished_batch: FinishedL1Batch, ) -> anyhow::Result<()> { - match self.actions.pop_action() { - Some(SyncAction::SealBatch { .. }) => {} - other => anyhow::bail!( - "State keeper requested to seal the batch, but the next action is {other:?}" - ), + let action = self.actions.pop_action(); + let Some(SyncAction::SealBatch { consensus, .. }) = action else { + anyhow::bail!( + "State keeper requested to seal the batch, but the next action is {action:?}" + ); }; let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap(); + let mut transaction = storage.start_transaction().await.unwrap(); updates_manager .seal_l1_batch( - &mut storage, + &mut transaction, self.current_miniblock_number, l1_batch_env, finished_batch, self.l2_erc20_bridge_addr, ) .await; + if let Some(consensus) = &consensus { + transaction + .blocks_dal() + .set_miniblock_consensus_fields(self.current_miniblock_number, consensus) + .await + .unwrap(); + } + transaction.commit().await.unwrap(); tracing::info!("Batch {} is sealed", self.current_l1_batch_number); diff --git a/core/lib/zksync_core/src/sync_layer/fetcher.rs b/core/lib/zksync_core/src/sync_layer/fetcher.rs index 02d8d3b11372..4aabd163f21e 100644 --- a/core/lib/zksync_core/src/sync_layer/fetcher.rs +++ b/core/lib/zksync_core/src/sync_layer/fetcher.rs @@ -4,7 +4,10 @@ use tokio::sync::watch; use std::time::Duration; use zksync_dal::StorageProcessor; -use zksync_types::{L1BatchNumber, MiniblockNumber, H256}; +use zksync_types::{ + api::en::SyncBlock, block::ConsensusBlockFields, Address, L1BatchNumber, MiniblockNumber, + ProtocolVersionId, H256, +}; use zksync_web3_decl::jsonrpsee::core::Error as RpcError; use super::{ @@ -18,27 +21,67 @@ use crate::metrics::{TxStage, APP_METRICS}; const DELAY_INTERVAL: Duration = Duration::from_millis(500); const RETRY_DELAY_INTERVAL: Duration = Duration::from_secs(5); +/// Common denominator for blocks fetched by an external node. +#[derive(Debug)] +pub(super) struct FetchedBlock { + pub number: MiniblockNumber, + pub l1_batch_number: L1BatchNumber, + pub last_in_batch: bool, + pub protocol_version: ProtocolVersionId, + pub timestamp: u64, + pub hash: H256, + pub l1_gas_price: u64, + pub l2_fair_gas_price: u64, + pub virtual_blocks: u32, + pub operator_address: Address, + pub transactions: Vec, + pub consensus: Option, +} + +impl FetchedBlock { + fn from_sync_block(block: SyncBlock) -> Self { + Self { + number: block.number, + l1_batch_number: block.l1_batch_number, + last_in_batch: block.last_in_batch, + protocol_version: block.protocol_version, + timestamp: block.timestamp, + hash: block.hash.unwrap_or_default(), + l1_gas_price: block.l1_gas_price, + l2_fair_gas_price: block.l2_fair_gas_price, + virtual_blocks: block.virtual_blocks.unwrap_or(0), + operator_address: block.operator_address, + transactions: block + .transactions + .expect("Transactions are always requested"), + consensus: block.consensus, + } + } +} + /// Cursor of [`MainNodeFetcher`]. #[derive(Debug)] -pub struct MainNodeFetcherCursor { +pub struct FetcherCursor { // Fields are public for testing purposes. - pub(super) miniblock: MiniblockNumber, + pub(super) next_miniblock: MiniblockNumber, + pub(super) prev_miniblock_hash: H256, pub(super) l1_batch: L1BatchNumber, } -impl MainNodeFetcherCursor { - /// Loads the cursor +impl FetcherCursor { + /// Loads the cursor from Postgres. pub async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { let last_sealed_l1_batch_header = storage .blocks_dal() .get_newest_l1_batch_header() .await .context("Failed getting newest L1 batch header")?; - let last_miniblock_number = storage + let last_miniblock_header = storage .blocks_dal() - .get_sealed_miniblock_number() + .get_last_sealed_miniblock_header() .await - .context("Failed getting sealed miniblock number")?; + .context("Failed getting sealed miniblock header")? + .context("No miniblocks sealed")?; // It's important to know whether we have opened a new batch already or just sealed the previous one. // Depending on it, we must either insert `OpenBatch` item into the queue, or not. @@ -49,7 +92,8 @@ impl MainNodeFetcherCursor { .context("Failed checking whether pending L1 batch exists")?; // Miniblocks are always fully processed. - let miniblock = last_miniblock_number + 1; + let next_miniblock = last_miniblock_header.number + 1; + let prev_miniblock_hash = last_miniblock_header.hash; // Decide whether the next batch should be explicitly opened or not. let l1_batch = if was_new_batch_open { // No `OpenBatch` action needed. @@ -60,11 +104,75 @@ impl MainNodeFetcherCursor { }; Ok(Self { - miniblock, + next_miniblock, l1_batch, + prev_miniblock_hash, }) } + pub(super) fn advance(&mut self, block: FetchedBlock) -> Vec { + assert_eq!(block.number, self.next_miniblock); + + let mut new_actions = Vec::new(); + if block.l1_batch_number != self.l1_batch { + assert_eq!( + block.l1_batch_number, + self.l1_batch.next(), + "Unexpected batch number in the next received miniblock" + ); + + tracing::info!( + "New L1 batch: {}. Timestamp: {}", + block.l1_batch_number, + block.timestamp + ); + + new_actions.push(SyncAction::OpenBatch { + number: block.l1_batch_number, + timestamp: block.timestamp, + l1_gas_price: block.l1_gas_price, + l2_fair_gas_price: block.l2_fair_gas_price, + operator_address: block.operator_address, + protocol_version: block.protocol_version, + // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. + first_miniblock_info: (block.number, block.virtual_blocks), + prev_miniblock_hash: self.prev_miniblock_hash, + }); + FETCHER_METRICS.l1_batch[&L1BatchStage::Open].set(block.l1_batch_number.0.into()); + self.l1_batch += 1; + } else { + // New batch implicitly means a new miniblock, so we only need to push the miniblock action + // if it's not a new batch. + new_actions.push(SyncAction::Miniblock { + number: block.number, + timestamp: block.timestamp, + // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. + virtual_blocks: block.virtual_blocks, + }); + FETCHER_METRICS.miniblock.set(block.number.0.into()); + } + + APP_METRICS.processed_txs[&TxStage::added_to_mempool()] + .inc_by(block.transactions.len() as u64); + new_actions.extend(block.transactions.into_iter().map(SyncAction::from)); + + // Last miniblock of the batch is a "fictive" miniblock and would be replicated locally. + // We don't need to seal it explicitly, so we only put the seal miniblock command if it's not the last miniblock. + if block.last_in_batch { + new_actions.push(SyncAction::SealBatch { + // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. + virtual_blocks: block.virtual_blocks, + consensus: block.consensus, + }); + } else { + new_actions.push(SyncAction::SealMiniblock(block.consensus)); + } + self.next_miniblock += 1; + self.prev_miniblock_hash = block.hash; + + new_actions + } + /// Builds a fetcher from this cursor. pub fn into_fetcher( self, @@ -87,7 +195,7 @@ impl MainNodeFetcherCursor { #[derive(Debug)] pub struct MainNodeFetcher { client: CachingMainNodeClient, - cursor: MainNodeFetcherCursor, + cursor: FetcherCursor, actions: ActionQueueSender, sync_state: SyncState, stop_receiver: watch::Receiver, @@ -97,7 +205,7 @@ impl MainNodeFetcher { pub async fn run(mut self) -> anyhow::Result<()> { tracing::info!( "Starting the fetcher routine. Initial miniblock: {}, initial l1 batch: {}", - self.cursor.miniblock, + self.cursor.next_miniblock, self.cursor.l1_batch ); // Run the main routine and reconnect upon the network errors. @@ -137,7 +245,7 @@ impl MainNodeFetcher { self.sync_state.set_main_node_block(last_main_node_block); self.client - .populate_miniblocks_cache(self.cursor.miniblock, last_main_node_block) + .populate_miniblocks_cache(self.cursor.next_miniblock, last_main_node_block) .await; let has_action_capacity = self.actions.has_action_capacity(); if has_action_capacity { @@ -162,84 +270,26 @@ impl MainNodeFetcher { async fn fetch_next_miniblock(&mut self) -> anyhow::Result { let total_latency = FETCHER_METRICS.fetch_next_miniblock.start(); let request_latency = FETCHER_METRICS.requests[&FetchStage::SyncL2Block].start(); - let Some(block) = self.client.fetch_l2_block(self.cursor.miniblock).await? else { - return Ok(false); - }; - - // This will be fetched from cache. - let prev_block = self + let Some(block) = self .client - .fetch_l2_block(self.cursor.miniblock - 1) + .fetch_l2_block(self.cursor.next_miniblock) .await? - .expect("Previous block must exist"); + else { + return Ok(false); + }; request_latency.observe(); - let mut new_actions = Vec::new(); - if block.l1_batch_number != self.cursor.l1_batch { - assert_eq!( - block.l1_batch_number, - self.cursor.l1_batch.next(), - "Unexpected batch number in the next received miniblock" - ); - - tracing::info!( - "New batch: {}. Timestamp: {}", - block.l1_batch_number, - block.timestamp - ); - - new_actions.push(SyncAction::OpenBatch { - number: block.l1_batch_number, - timestamp: block.timestamp, - l1_gas_price: block.l1_gas_price, - l2_fair_gas_price: block.l2_fair_gas_price, - operator_address: block.operator_address, - protocol_version: block.protocol_version, - // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. - first_miniblock_info: (block.number, block.virtual_blocks.unwrap_or(0)), - // Same for `prev_block.hash` as above. - prev_miniblock_hash: prev_block.hash.unwrap_or_else(H256::zero), - }); - FETCHER_METRICS.l1_batch[&L1BatchStage::Open].set(block.l1_batch_number.0.into()); - self.cursor.l1_batch += 1; - } else { - // New batch implicitly means a new miniblock, so we only need to push the miniblock action - // if it's not a new batch. - new_actions.push(SyncAction::Miniblock { - number: block.number, - timestamp: block.timestamp, - // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. - virtual_blocks: block.virtual_blocks.unwrap_or(0), - }); - FETCHER_METRICS.miniblock.set(block.number.0.into()); - } - - let txs: Vec = block - .transactions - .expect("Transactions are always requested"); - APP_METRICS.processed_txs[&TxStage::added_to_mempool()].inc_by(txs.len() as u64); - new_actions.extend(txs.into_iter().map(SyncAction::from)); - - // Last miniblock of the batch is a "fictive" miniblock and would be replicated locally. - // We don't need to seal it explicitly, so we only put the seal miniblock command if it's not the last miniblock. - if block.last_in_batch { - new_actions.push(SyncAction::SealBatch { - // `block.virtual_blocks` can be `None` only for old VM versions where it's not used, so it's fine to provide any number. - virtual_blocks: block.virtual_blocks.unwrap_or(0), - }); - } else { - new_actions.push(SyncAction::SealMiniblock); - } + let block_number = block.number; + let fetched_block = FetchedBlock::from_sync_block(block); + let new_actions = self.cursor.advance(fetched_block); tracing::info!( - "New miniblock: {} / {}", - block.number, - self.sync_state.get_main_node_block().max(block.number) + "New miniblock: {block_number} / {}", + self.sync_state.get_main_node_block().max(block_number) ); // Forgetting only the previous one because we still need the current one in cache for the next iteration. - let prev_miniblock_number = MiniblockNumber(self.cursor.miniblock.0.saturating_sub(1)); + let prev_miniblock_number = MiniblockNumber(block_number.0.saturating_sub(1)); self.client.forget_miniblock(prev_miniblock_number); - self.cursor.miniblock += 1; self.actions.push_actions(new_actions).await; total_latency.observe(); diff --git a/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs new file mode 100644 index 000000000000..41ca50e1cf2f --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/buffered/mod.rs @@ -0,0 +1,340 @@ +//! Buffered [`BlockStore`] implementation. + +use async_trait::async_trait; + +use std::{collections::BTreeMap, ops, time::Instant}; + +#[cfg(test)] +use zksync_concurrency::ctx::channel; +use zksync_concurrency::{ + ctx, scope, + sync::{self, watch, Mutex}, +}; +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_storage::{BlockStore, StorageError, StorageResult, WriteBlockStore}; + +#[cfg(test)] +mod tests; + +use super::{ + metrics::{BlockResponseKind, METRICS}, + utils::MissingBlockNumbers, +}; + +/// [`BlockStore`] variation that upholds additional invariants as to how blocks are processed. +/// +/// The invariants are as follows: +/// +/// - Stored blocks always have contiguous numbers; there are no gaps. +/// - Blocks can be scheduled to be added using [`Self::schedule_next_block()`] only. New blocks do not +/// appear in the store otherwise. +#[async_trait] +pub(super) trait ContiguousBlockStore: BlockStore { + /// Schedules a block to be added to the store. Unlike [`WriteBlockStore::put_block()`], + /// there is no expectation that the block is added to the store *immediately*. It's + /// expected that it will be added to the store eventually, which will be signaled via + /// a subscriber returned from [`BlockStore::subscribe_to_block_writes()`]. + /// + /// [`Buffered`] guarantees that this method will only ever be called: + /// + /// - with the next block (i.e., one immediately after [`BlockStore::head_block()`]) + /// - sequentially (i.e., multiple blocks cannot be scheduled at once) + async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()>; +} + +/// In-memory buffer or [`FinalBlock`]s received from peers, but not executed and persisted locally yet. +/// +/// Unlike with executed / persisted blocks, there may be gaps between blocks in the buffer. +/// These blocks are shared with peers using the gossip network, but are not persisted and lost +/// on the node restart. +#[derive(Debug)] +struct BlockBuffer { + store_block_number: BlockNumber, + blocks: BTreeMap, +} + +impl BlockBuffer { + fn new(store_block_number: BlockNumber) -> Self { + Self { + store_block_number, + blocks: BTreeMap::new(), + } + } + + fn head_block(&self) -> Option { + self.blocks.values().next_back().cloned() + } + + #[tracing::instrument(level = "trace", skip(self))] + fn set_store_block(&mut self, store_block_number: BlockNumber) { + assert!( + store_block_number > self.store_block_number, + "`ContiguousBlockStore` invariant broken: unexpected new head block number" + ); + + self.store_block_number = store_block_number; + let old_len = self.blocks.len(); + self.blocks = self.blocks.split_off(&store_block_number.next()); + // ^ Removes all entries up to and including `store_block_number` + tracing::debug!("Removed {} blocks from buffer", old_len - self.blocks.len()); + METRICS.buffer_size.set(self.blocks.len()); + } + + fn last_contiguous_block_number(&self) -> BlockNumber { + // By design, blocks in the underlying store are always contiguous. + let mut last_number = self.store_block_number; + for &number in self.blocks.keys() { + if number > last_number.next() { + return last_number; + } + last_number = number; + } + last_number + } + + fn missing_block_numbers(&self, mut range: ops::Range) -> Vec { + // Clamp the range start so we don't produce extra missing blocks. + range.start = range.start.max(self.store_block_number.next()); + if range.is_empty() { + return vec![]; // Return early to not trigger panic in `BTreeMap::range()` + } + + let keys = self.blocks.range(range.clone()).map(|(&num, _)| num); + MissingBlockNumbers::new(range, keys).collect() + } + + fn put_block(&mut self, block: FinalBlock) { + let block_number = block.header.number; + assert!(block_number > self.store_block_number); + // ^ Must be checked previously + self.blocks.insert(block_number, block); + tracing::debug!(%block_number, "Inserted block in buffer"); + METRICS.buffer_size.set(self.blocks.len()); + } +} + +/// Events emitted by [`Buffered`] storage. +#[cfg(test)] +#[derive(Debug)] +pub(super) enum BufferedStorageEvent { + /// Update was received from the underlying storage. + UpdateReceived(BlockNumber), +} + +/// [`BlockStore`] with an in-memory buffer for pending blocks. +/// +/// # Data flow +/// +/// The store is plugged into the `SyncBlocks` actor, so that it can receive new blocks +/// from peers over the gossip network and to share blocks with peers. Received blocks are stored +/// in a [`BlockBuffer`]. The `SyncBlocks` actor doesn't guarantee that blocks are received in order, +/// so we have a background task that waits for successive blocks and feeds them to +/// the underlying storage ([`ContiguousBlockStore`]). The underlying storage executes and persists +/// blocks using the state keeper; see [`PostgresBlockStorage`](super::PostgresBlockStorage) for more details. +/// This logic is largely shared with the old syncing logic using JSON-RPC; the only differing part +/// is producing block data. +/// +/// Once a block is processed and persisted by the state keeper, it can be removed from the [`BlockBuffer`]; +/// we do this in another background task. Removing blocks from the buffer ensures that it doesn't +/// grow infinitely; it also allows to track syncing progress via metrics. +#[derive(Debug)] +pub(super) struct Buffered { + inner: T, + inner_subscriber: watch::Receiver, + block_writes_sender: watch::Sender, + buffer: Mutex, + #[cfg(test)] + events_sender: channel::UnboundedSender, +} + +impl Buffered { + /// Creates a new buffered storage. The buffer is initially empty. + pub fn new(store: T) -> Self { + let inner_subscriber = store.subscribe_to_block_writes(); + let store_block_number = *inner_subscriber.borrow(); + tracing::debug!( + store_block_number = store_block_number.0, + "Initialized buffer storage" + ); + Self { + inner: store, + inner_subscriber, + block_writes_sender: watch::channel(store_block_number).0, + buffer: Mutex::new(BlockBuffer::new(store_block_number)), + #[cfg(test)] + events_sender: channel::unbounded().0, + } + } + + #[cfg(test)] + fn set_events_sender(&mut self, sender: channel::UnboundedSender) { + self.events_sender = sender; + } + + pub(super) fn inner(&self) -> &T { + &self.inner + } + + #[cfg(test)] + async fn buffer_len(&self) -> usize { + self.buffer.lock().await.blocks.len() + } + + /// Listens to the updates in the underlying storage. + #[tracing::instrument(level = "trace", skip_all)] + async fn listen_to_updates(&self, ctx: &ctx::Ctx) { + let mut subscriber = self.inner_subscriber.clone(); + loop { + let store_block_number = { + let Ok(number) = sync::changed(ctx, &mut subscriber).await else { + return; // Do not propagate cancellation errors + }; + *number + }; + tracing::debug!( + store_block_number = store_block_number.0, + "Underlying block number updated" + ); + + let Ok(mut buffer) = sync::lock(ctx, &self.buffer).await else { + return; // Do not propagate cancellation errors + }; + buffer.set_store_block(store_block_number); + #[cfg(test)] + self.events_sender + .send(BufferedStorageEvent::UpdateReceived(store_block_number)); + } + } + + /// Schedules blocks in the underlying store as they are pushed to this store. + #[tracing::instrument(level = "trace", skip_all, err)] + async fn schedule_blocks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + let mut blocks_subscriber = self.block_writes_sender.subscribe(); + + let mut next_scheduled_block_number = { + let Ok(buffer) = sync::lock(ctx, &self.buffer).await else { + return Ok(()); // Do not propagate cancellation errors + }; + buffer.store_block_number.next() + }; + loop { + loop { + let block = match self.buffered_block(ctx, next_scheduled_block_number).await { + Err(ctx::Canceled) => return Ok(()), // Do not propagate cancellation errors + Ok(None) => break, + Ok(Some(block)) => block, + }; + self.inner.schedule_next_block(ctx, &block).await?; + next_scheduled_block_number = next_scheduled_block_number.next(); + } + // Wait until some more blocks are pushed into the buffer. + let Ok(number) = sync::changed(ctx, &mut blocks_subscriber).await else { + return Ok(()); // Do not propagate cancellation errors + }; + tracing::debug!(block_number = number.0, "Received new block"); + } + } + + async fn buffered_block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> ctx::OrCanceled> { + Ok(sync::lock(ctx, &self.buffer) + .await? + .blocks + .get(&number) + .cloned()) + } + + /// Runs background tasks for this store. This method **must** be spawned as a background task + /// which should be running as long at the [`Buffered`] is in use; otherwise, it will function incorrectly. + pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + scope::run!(ctx, |ctx, s| { + s.spawn(async { + self.listen_to_updates(ctx).await; + Ok(()) + }); + self.schedule_blocks(ctx) + }) + .await + } +} + +#[async_trait] +impl BlockStore for Buffered { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + let buffered_head_block = sync::lock(ctx, &self.buffer).await?.head_block(); + if let Some(block) = buffered_head_block { + return Ok(block); + } + self.inner.head_block(ctx).await + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + // First block is always situated in the underlying store + self.inner.first_block(ctx).await + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + Ok(sync::lock(ctx, &self.buffer) + .await? + .last_contiguous_block_number()) + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + let started_at = Instant::now(); + { + let buffer = sync::lock(ctx, &self.buffer).await?; + if number > buffer.store_block_number { + let block = buffer.blocks.get(&number).cloned(); + METRICS.get_block_latency[&BlockResponseKind::InMemory] + .observe(started_at.elapsed()); + return Ok(block); + } + } + let block = self.inner.block(ctx, number).await?; + METRICS.get_block_latency[&BlockResponseKind::Persisted].observe(started_at.elapsed()); + Ok(block) + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult> { + // By design, the underlying store has no missing blocks. + Ok(sync::lock(ctx, &self.buffer) + .await? + .missing_block_numbers(range)) + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + self.block_writes_sender.subscribe() + } +} + +#[async_trait] +impl WriteBlockStore for Buffered { + async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + let buffer_block_latency = METRICS.buffer_block_latency.start(); + { + let mut buffer = sync::lock(ctx, &self.buffer).await?; + let block_number = block.header.number; + if block_number <= buffer.store_block_number { + let err = anyhow::anyhow!( + "Cannot replace a block #{block_number} since it is already present in the underlying storage", + ); + return Err(StorageError::Database(err)); + } + buffer.put_block(block.clone()); + } + self.block_writes_sender.send_replace(block.header.number); + buffer_block_latency.observe(); + Ok(()) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs new file mode 100644 index 000000000000..de5ef8a88cb0 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/buffered/tests.rs @@ -0,0 +1,287 @@ +//! Tests for buffered storage. + +use assert_matches::assert_matches; +use async_trait::async_trait; +use rand::{rngs::StdRng, seq::SliceRandom, Rng}; +use test_casing::test_casing; + +use std::{iter, ops}; + +use zksync_concurrency::{ + ctx::{self, channel}, + scope, + sync::{self, watch}, + time, +}; +use zksync_consensus_roles::validator::{BlockHeader, BlockNumber, FinalBlock, Payload}; +use zksync_consensus_storage::{BlockStore, InMemoryStorage, StorageResult, WriteBlockStore}; + +use super::*; + +fn init_store(rng: &mut impl Rng) -> (FinalBlock, InMemoryStorage) { + let payload = Payload(vec![]); + let genesis_block = FinalBlock { + header: BlockHeader::genesis(payload.hash()), + payload, + justification: rng.gen(), + }; + let block_store = InMemoryStorage::new(genesis_block.clone()); + (genesis_block, block_store) +} + +fn gen_blocks(rng: &mut impl Rng, genesis_block: FinalBlock, count: usize) -> Vec { + let blocks = iter::successors(Some(genesis_block), |parent| { + let payload = Payload(vec![]); + let header = BlockHeader { + parent: parent.header.hash(), + number: parent.header.number.next(), + payload: payload.hash(), + }; + Some(FinalBlock { + header, + payload, + justification: rng.gen(), + }) + }); + blocks.skip(1).take(count).collect() +} + +#[derive(Debug)] +struct MockContiguousStore { + inner: InMemoryStorage, + block_sender: channel::UnboundedSender, +} + +impl MockContiguousStore { + fn new(inner: InMemoryStorage) -> (Self, channel::UnboundedReceiver) { + let (block_sender, block_receiver) = channel::unbounded(); + let this = Self { + inner, + block_sender, + }; + (this, block_receiver) + } + + async fn run_updates( + &self, + ctx: &ctx::Ctx, + mut block_receiver: channel::UnboundedReceiver, + ) -> StorageResult<()> { + let rng = &mut ctx.rng(); + while let Ok(block) = block_receiver.recv(ctx).await { + let head_block_number = self.head_block(ctx).await?.header.number; + assert_eq!(block.header.number, head_block_number.next()); + + let sleep_duration = time::Duration::milliseconds(rng.gen_range(0..5)); + ctx.sleep(sleep_duration).await?; + self.inner.put_block(ctx, &block).await?; + } + Ok(()) + } +} + +#[async_trait] +impl BlockStore for MockContiguousStore { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + self.inner.head_block(ctx).await + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + self.inner.first_block(ctx).await + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + self.inner.last_contiguous_block_number(ctx).await + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + self.inner.block(ctx, number).await + } + + async fn missing_block_numbers( + &self, + ctx: &ctx::Ctx, + range: ops::Range, + ) -> StorageResult> { + self.inner.missing_block_numbers(ctx, range).await + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + self.inner.subscribe_to_block_writes() + } +} + +#[async_trait] +impl ContiguousBlockStore for MockContiguousStore { + async fn schedule_next_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + tracing::trace!(block_number = block.header.number.0, "Scheduled next block"); + self.block_sender.send(block.clone()); + Ok(()) + } +} + +#[tracing::instrument(level = "trace", skip(shuffle_blocks))] +async fn test_buffered_storage( + initial_block_count: usize, + block_count: usize, + block_interval: time::Duration, + shuffle_blocks: impl FnOnce(&mut StdRng, &mut [FinalBlock]), +) { + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let (genesis_block, block_store) = init_store(rng); + let mut initial_blocks = gen_blocks(rng, genesis_block.clone(), initial_block_count); + for block in &initial_blocks { + block_store.put_block(ctx, block).await.unwrap(); + } + initial_blocks.insert(0, genesis_block.clone()); + + let (block_store, block_receiver) = MockContiguousStore::new(block_store); + let mut buffered_store = Buffered::new(block_store); + let (events_sender, mut events_receiver) = channel::unbounded(); + buffered_store.set_events_sender(events_sender); + + // Check initial values returned by the store. + let last_initial_block = initial_blocks.last().unwrap().clone(); + assert_eq!( + buffered_store.head_block(ctx).await.unwrap(), + last_initial_block + ); + for block in &initial_blocks { + let block_result = buffered_store.block(ctx, block.header.number).await; + assert_eq!(block_result.unwrap().as_ref(), Some(block)); + } + let mut subscriber = buffered_store.subscribe_to_block_writes(); + assert_eq!( + *subscriber.borrow(), + BlockNumber(initial_block_count as u64) + ); + + let mut blocks = gen_blocks(rng, last_initial_block, block_count); + shuffle_blocks(rng, &mut blocks); + let last_block_number = BlockNumber((block_count + initial_block_count) as u64); + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(buffered_store.inner().run_updates(ctx, block_receiver)); + s.spawn_bg(buffered_store.run_background_tasks(ctx)); + + for (idx, block) in blocks.iter().enumerate() { + buffered_store.put_block(ctx, block).await?; + let new_block_number = *sync::changed(ctx, &mut subscriber).await?; + assert_eq!(new_block_number, block.header.number); + + // Check that all written blocks are immediately accessible. + for existing_block in initial_blocks.iter().chain(&blocks[0..=idx]) { + let number = existing_block.header.number; + assert_eq!( + buffered_store.block(ctx, number).await?.as_ref(), + Some(existing_block) + ); + } + assert_eq!(buffered_store.first_block(ctx).await?, genesis_block); + + let expected_head_block = blocks[0..=idx] + .iter() + .max_by_key(|block| block.header.number) + .unwrap(); + assert_eq!(buffered_store.head_block(ctx).await?, *expected_head_block); + + let expected_last_contiguous_block = blocks[(idx + 1)..] + .iter() + .map(|block| block.header.number) + .min() + .map_or(last_block_number, BlockNumber::prev); + assert_eq!( + buffered_store.last_contiguous_block_number(ctx).await?, + expected_last_contiguous_block + ); + + ctx.sleep(block_interval).await?; + } + + let mut inner_subscriber = buffered_store.inner().subscribe_to_block_writes(); + while buffered_store + .inner() + .last_contiguous_block_number(ctx) + .await? + < last_block_number + { + sync::changed(ctx, &mut inner_subscriber).await?; + } + + // Check events emitted by the buffered storage. This also ensures that all underlying storage + // updates are processed before proceeding to the following checks. + let expected_numbers = (initial_block_count as u64 + 1)..=last_block_number.0; + for expected_number in expected_numbers.map(BlockNumber) { + assert_matches!( + events_receiver.recv(ctx).await?, + BufferedStorageEvent::UpdateReceived(number) if number == expected_number + ); + } + + assert_eq!(buffered_store.buffer_len().await, 0); + Ok(()) + }) + .await + .unwrap(); +} + +// Choose intervals so that they are both smaller and larger than the sleep duration in +// `MockContiguousStore::run_updates()`. +const BLOCK_INTERVALS: [time::Duration; 4] = [ + time::Duration::ZERO, + time::Duration::milliseconds(3), + time::Duration::milliseconds(5), + time::Duration::milliseconds(10), +]; + +#[test_casing(4, BLOCK_INTERVALS)] +#[tokio::test] +async fn buffered_storage_with_sequential_blocks(block_interval: time::Duration) { + test_buffered_storage(0, 30, block_interval, |_, _| { + // Do not perform shuffling + }) + .await; +} + +#[test_casing(4, BLOCK_INTERVALS)] +#[tokio::test] +async fn buffered_storage_with_random_blocks(block_interval: time::Duration) { + test_buffered_storage(0, 30, block_interval, |rng, blocks| blocks.shuffle(rng)).await; +} + +#[test_casing(4, BLOCK_INTERVALS)] +#[tokio::test] +async fn buffered_storage_with_slightly_shuffled_blocks(block_interval: time::Duration) { + test_buffered_storage(0, 30, block_interval, |rng, blocks| { + for chunk in blocks.chunks_mut(4) { + chunk.shuffle(rng); + } + }) + .await; +} + +#[test_casing(4, BLOCK_INTERVALS)] +#[tokio::test] +async fn buffered_storage_with_initial_blocks(block_interval: time::Duration) { + test_buffered_storage(10, 20, block_interval, |_, _| { + // Do not perform shuffling + }) + .await; +} + +#[test_casing(4, BLOCK_INTERVALS)] +#[tokio::test] +async fn buffered_storage_with_initial_blocks_and_slight_shuffling(block_interval: time::Duration) { + test_buffered_storage(10, 20, block_interval, |rng, blocks| { + for chunk in blocks.chunks_mut(5) { + chunk.shuffle(rng); + } + }) + .await; +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs b/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs new file mode 100644 index 000000000000..8face4e69426 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/conversions.rs @@ -0,0 +1,57 @@ +//! Conversion logic between server and consensus types. + +use anyhow::Context as _; + +use zksync_consensus_roles::validator::{BlockHeader, BlockNumber, FinalBlock}; +use zksync_types::{ + api::en::SyncBlock, block::ConsensusBlockFields, MiniblockNumber, ProtocolVersionId, +}; + +use crate::{consensus, sync_layer::fetcher::FetchedBlock}; + +pub(super) fn sync_block_to_consensus_block(mut block: SyncBlock) -> anyhow::Result { + let number = BlockNumber(block.number.0.into()); + let consensus = block.consensus.take().context("Missing consensus fields")?; + let payload: consensus::Payload = block.try_into().context("Missing `SyncBlock` data")?; + let payload = payload.encode(); + let header = BlockHeader { + parent: consensus.parent, + number, + payload: payload.hash(), + }; + Ok(FinalBlock { + header, + payload, + justification: consensus.justification, + }) +} + +impl FetchedBlock { + pub(super) fn from_gossip_block( + block: &FinalBlock, + last_in_batch: bool, + ) -> anyhow::Result { + let number = u32::try_from(block.header.number.0) + .context("Integer overflow converting block number")?; + let payload = consensus::Payload::decode(&block.payload) + .context("Failed deserializing block payload")?; + + Ok(Self { + number: MiniblockNumber(number), + l1_batch_number: payload.l1_batch_number, + last_in_batch, + protocol_version: ProtocolVersionId::latest(), // FIXME + timestamp: payload.timestamp, + hash: payload.hash, + l1_gas_price: payload.l1_gas_price, + l2_fair_gas_price: payload.l2_fair_gas_price, + virtual_blocks: payload.virtual_blocks, + operator_address: payload.operator_address, + transactions: payload.transactions, + consensus: Some(ConsensusBlockFields { + parent: block.header.parent, + justification: block.justification.clone(), + }), + }) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/metrics.rs b/core/lib/zksync_core/src/sync_layer/gossip/metrics.rs new file mode 100644 index 000000000000..f67c150b99c0 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/metrics.rs @@ -0,0 +1,29 @@ +//! Metrics for gossip-powered syncing. + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; + +use std::time::Duration; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "kind", rename_all = "snake_case")] +pub(super) enum BlockResponseKind { + Persisted, + InMemory, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "external_node_gossip_fetcher")] +pub(super) struct GossipFetcherMetrics { + /// Number of currently buffered unexecuted blocks. + pub buffer_size: Gauge, + /// Latency of a `get_block` call. + #[metrics(unit = Unit::Seconds, buckets = Buckets::LATENCIES)] + pub get_block_latency: Family>, + /// Latency of putting a block into the buffered storage. This may include the time to queue + /// block actions, but does not include block execution. + #[metrics(unit = Unit::Seconds, buckets = Buckets::LATENCIES)] + pub buffer_block_latency: Histogram, +} + +#[vise::register] +pub(super) static METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/zksync_core/src/sync_layer/gossip/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/mod.rs new file mode 100644 index 000000000000..630ded953453 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/mod.rs @@ -0,0 +1,93 @@ +//! Consensus adapter for EN synchronization logic. + +use anyhow::Context as _; +use tokio::sync::watch; + +use std::sync::Arc; + +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_executor::{Executor, ExecutorConfig}; +use zksync_consensus_roles::node; +use zksync_dal::ConnectionPool; + +mod buffered; +mod conversions; +mod metrics; +mod storage; +#[cfg(test)] +mod tests; +mod utils; + +use self::{buffered::Buffered, storage::PostgresBlockStorage}; +use super::{fetcher::FetcherCursor, sync_action::ActionQueueSender}; + +/// Starts fetching L2 blocks using peer-to-peer gossip network. +pub async fn run_gossip_fetcher( + pool: ConnectionPool, + actions: ActionQueueSender, + executor_config: ExecutorConfig, + node_key: node::SecretKey, + mut stop_receiver: watch::Receiver, +) -> anyhow::Result<()> { + scope::run!(&ctx::root(), |ctx, s| async { + s.spawn_bg(run_gossip_fetcher_inner( + ctx, + pool, + actions, + executor_config, + node_key, + )); + if stop_receiver.changed().await.is_err() { + tracing::warn!( + "Stop signal sender for gossip fetcher was dropped without sending a signal" + ); + } + tracing::info!("Stop signal received, gossip fetcher is shutting down"); + Ok(()) + }) + .await +} + +async fn run_gossip_fetcher_inner( + ctx: &ctx::Ctx, + pool: ConnectionPool, + actions: ActionQueueSender, + executor_config: ExecutorConfig, + node_key: node::SecretKey, +) -> anyhow::Result<()> { + tracing::info!( + "Starting gossip fetcher with {executor_config:?} and node key {:?}", + node_key.public() + ); + + let mut storage = pool + .access_storage_tagged("sync_layer") + .await + .context("Failed acquiring Postgres connection for cursor")?; + let cursor = FetcherCursor::new(&mut storage).await?; + drop(storage); + + let store = PostgresBlockStorage::new(pool, actions, cursor); + let buffered = Arc::new(Buffered::new(store)); + let store = buffered.inner(); + let executor = Executor::new(executor_config, node_key, buffered.clone()) + .context("Node executor misconfiguration")?; + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(async { + store + .run_background_tasks(ctx) + .await + .context("`PostgresBlockStorage` background tasks failed") + }); + s.spawn_bg(async { + buffered + .run_background_tasks(ctx) + .await + .context("`Buffered` storage background tasks failed") + }); + + executor.run(ctx).await.context("Node executor terminated") + }) + .await +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs b/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs new file mode 100644 index 000000000000..d4e95c9e2d46 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/storage/mod.rs @@ -0,0 +1,219 @@ +//! Storage implementation based on DAL. + +use anyhow::Context as _; +use async_trait::async_trait; + +use std::ops; + +use zksync_concurrency::{ + ctx, + sync::{self, watch, Mutex}, + time, +}; +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_storage::{BlockStore, StorageError, StorageResult}; +use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_types::{Address, MiniblockNumber}; + +#[cfg(test)] +mod tests; + +use super::{buffered::ContiguousBlockStore, conversions::sync_block_to_consensus_block}; +use crate::sync_layer::{ + fetcher::{FetchedBlock, FetcherCursor}, + sync_action::{ActionQueueSender, SyncAction}, +}; + +#[derive(Debug)] +struct CursorWithCachedBlock { + inner: FetcherCursor, + maybe_last_block_in_batch: Option, +} + +impl From for CursorWithCachedBlock { + fn from(inner: FetcherCursor) -> Self { + Self { + inner, + maybe_last_block_in_batch: None, + } + } +} + +impl CursorWithCachedBlock { + fn advance(&mut self, block: FetchedBlock) -> Vec> { + let mut actions = Vec::with_capacity(2); + if let Some(mut prev_block) = self.maybe_last_block_in_batch.take() { + prev_block.last_in_batch = prev_block.l1_batch_number != block.l1_batch_number; + actions.push(self.inner.advance(prev_block)); + } + + // We take advantage of the fact that the last block in a batch is a *fictive* block that + // does not contain transactions. Thus, any block with transactions cannot be last in an L1 batch. + let can_be_last_in_batch = block.transactions.is_empty(); + if can_be_last_in_batch { + self.maybe_last_block_in_batch = Some(block); + // We cannot convert the block into actions yet, since we don't know whether it seals an L1 batch. + } else { + actions.push(self.inner.advance(block)); + } + actions + } +} + +/// Postgres-based [`BlockStore`] implementation. New blocks are scheduled to be written via +/// [`ContiguousBlockStore`] trait, which internally uses an [`ActionQueueSender`] to queue +/// block data (miniblock and L1 batch parameters, transactions) for the state keeper. Block data processing +/// is shared with JSON-RPC-based syncing. +#[derive(Debug)] +pub(super) struct PostgresBlockStorage { + pool: ConnectionPool, + actions: ActionQueueSender, + block_sender: watch::Sender, + cursor: Mutex, +} + +impl PostgresBlockStorage { + /// Creates a new storage handle. `pool` should have multiple connections to work efficiently. + pub fn new(pool: ConnectionPool, actions: ActionQueueSender, cursor: FetcherCursor) -> Self { + let current_block_number = cursor.next_miniblock.0.saturating_sub(1).into(); + Self { + pool, + actions, + block_sender: watch::channel(BlockNumber(current_block_number)).0, + cursor: Mutex::new(cursor.into()), + } + } + + /// Runs background tasks for this store. This method **must** be spawned as a background task + /// which should be running as long at the [`PostgresBlockStorage`] is in use; otherwise, + /// it will function incorrectly. + pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> StorageResult<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + let sealed_miniblock_number = match self.sealed_miniblock_number(ctx).await { + Ok(number) => number, + Err(err @ StorageError::Database(_)) => return Err(err), + Err(StorageError::Canceled(_)) => return Ok(()), // Do not propagate cancellation errors + }; + self.block_sender.send_if_modified(|number| { + if *number != sealed_miniblock_number { + *number = sealed_miniblock_number; + true + } else { + false + } + }); + if let Err(ctx::Canceled) = ctx.sleep(POLL_INTERVAL).await { + return Ok(()); // Do not propagate cancellation errors + } + } + } + + async fn storage(&self, ctx: &ctx::Ctx) -> StorageResult> { + ctx.wait(self.pool.access_storage_tagged("sync_layer")) + .await? + .context("Failed to connect to Postgres") + .map_err(StorageError::Database) + } + + async fn block( + ctx: &ctx::Ctx, + storage: &mut StorageProcessor<'_>, + number: MiniblockNumber, + ) -> StorageResult> { + let operator_address = Address::default(); // FIXME: where to get this address from? + let Some(block) = ctx + .wait( + storage + .sync_dal() + .sync_block(number, operator_address, true), + ) + .await? + .with_context(|| format!("Failed getting miniblock #{number} from Postgres")) + .map_err(StorageError::Database)? + else { + return Ok(None); + }; + let block = sync_block_to_consensus_block(block).map_err(StorageError::Database)?; + Ok(Some(block)) + } + + async fn sealed_miniblock_number(&self, ctx: &ctx::Ctx) -> StorageResult { + let mut storage = self.storage(ctx).await?; + let number = ctx + .wait(storage.blocks_dal().get_sealed_miniblock_number()) + .await? + .context("Failed getting sealed miniblock number") + .map_err(StorageError::Database)?; + Ok(BlockNumber(number.0.into())) + } +} + +#[async_trait] +impl BlockStore for PostgresBlockStorage { + async fn head_block(&self, ctx: &ctx::Ctx) -> StorageResult { + let mut storage = self.storage(ctx).await?; + let miniblock_number = ctx + .wait(storage.blocks_dal().get_sealed_miniblock_number()) + .await? + .context("Failed getting sealed miniblock number") + .map_err(StorageError::Database)?; + // ^ The number can get stale, but it's OK for our purposes + Ok(Self::block(ctx, &mut storage, miniblock_number) + .await? + .with_context(|| format!("Miniblock #{miniblock_number} disappeared from Postgres")) + .map_err(StorageError::Database)?) + } + + async fn first_block(&self, ctx: &ctx::Ctx) -> StorageResult { + let mut storage = self.storage(ctx).await?; + Self::block(ctx, &mut storage, MiniblockNumber(0)) + .await? + .context("Genesis miniblock not present in Postgres") + .map_err(StorageError::Database) + } + + async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult { + self.sealed_miniblock_number(ctx).await + } + + async fn block( + &self, + ctx: &ctx::Ctx, + number: BlockNumber, + ) -> StorageResult> { + let number = u32::try_from(number.0) + .context("block number is too large") + .map_err(StorageError::Database)?; + let mut storage = self.storage(ctx).await?; + Self::block(ctx, &mut storage, MiniblockNumber(number)).await + } + + async fn missing_block_numbers( + &self, + _ctx: &ctx::Ctx, + _range: ops::Range, + ) -> StorageResult> { + Ok(vec![]) // The storage never has missing blocks by construction + } + + fn subscribe_to_block_writes(&self) -> watch::Receiver { + self.block_sender.subscribe() + } +} + +#[async_trait] +impl ContiguousBlockStore for PostgresBlockStorage { + async fn schedule_next_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> StorageResult<()> { + // last_in_batch` is always set to `false` by this call; it is properly set by `CursorWithCachedBlock`. + let fetched_block = + FetchedBlock::from_gossip_block(block, false).map_err(StorageError::Database)?; + let actions = sync::lock(ctx, &self.cursor).await?.advance(fetched_block); + for actions_chunk in actions { + // We don't wrap this in `ctx.wait()` because `PostgresBlockStorage` will get broken + // if it gets reused after context cancellation. + self.actions.push_actions(actions_chunk).await; + } + Ok(()) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs new file mode 100644 index 000000000000..437c51883308 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/storage/tests.rs @@ -0,0 +1,127 @@ +//! Tests for Postgres storage implementation. + +use rand::{thread_rng, Rng}; + +use zksync_concurrency::scope; +use zksync_types::L2ChainId; + +use super::*; +use crate::{ + genesis::{ensure_genesis_state, GenesisParams}, + sync_layer::{ + gossip::tests::{ + add_consensus_fields, assert_first_block_actions, assert_second_block_actions, + load_final_block, + }, + tests::run_state_keeper_with_multiple_miniblocks, + ActionQueue, + }, +}; + +const TEST_TIMEOUT: time::Duration = time::Duration::seconds(10); + +#[tokio::test] +async fn block_store_basics_for_postgres() { + let pool = ConnectionPool::test_pool().await; + run_state_keeper_with_multiple_miniblocks(pool.clone()).await; + + let mut storage = pool.access_storage().await.unwrap(); + add_consensus_fields(&mut storage, &thread_rng().gen(), 3).await; + let cursor = FetcherCursor::new(&mut storage).await.unwrap(); + drop(storage); + let (actions_sender, _) = ActionQueue::new(); + let storage = PostgresBlockStorage::new(pool.clone(), actions_sender, cursor); + + let ctx = &ctx::test_root(&ctx::RealClock); + let genesis_block = BlockStore::first_block(&storage, ctx).await.unwrap(); + assert_eq!(genesis_block.header.number, BlockNumber(0)); + let head_block = BlockStore::head_block(&storage, ctx).await.unwrap(); + assert_eq!(head_block.header.number, BlockNumber(2)); + let last_contiguous_block_number = storage.last_contiguous_block_number(ctx).await.unwrap(); + assert_eq!(last_contiguous_block_number, BlockNumber(2)); + + let block = storage + .block(ctx, BlockNumber(1)) + .await + .unwrap() + .expect("no block #1"); + assert_eq!(block.header.number, BlockNumber(1)); + let missing_block = storage.block(ctx, BlockNumber(3)).await.unwrap(); + assert!(missing_block.is_none(), "{missing_block:?}"); +} + +#[tokio::test] +async fn subscribing_to_block_updates_for_postgres() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + if storage.blocks_dal().is_genesis_needed().await.unwrap() { + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + } + let cursor = FetcherCursor::new(&mut storage).await.unwrap(); + // ^ This is logically incorrect (the storage should not be updated other than using + // `ContiguousBlockStore`), but for testing subscriptions this is fine. + drop(storage); + let (actions_sender, _) = ActionQueue::new(); + let storage = PostgresBlockStorage::new(pool.clone(), actions_sender, cursor); + let mut subscriber = storage.subscribe_to_block_writes(); + + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(&ctx.with_timeout(TEST_TIMEOUT), |ctx, s| async { + s.spawn_bg(storage.run_background_tasks(ctx)); + s.spawn(async { + run_state_keeper_with_multiple_miniblocks(pool.clone()).await; + Ok(()) + }); + + loop { + let block = *sync::changed(ctx, &mut subscriber).await?; + if block == BlockNumber(2) { + // We should receive at least the last update. + break; + } + } + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn processing_new_blocks() { + let pool = ConnectionPool::test_pool().await; + run_state_keeper_with_multiple_miniblocks(pool.clone()).await; + + let mut storage = pool.access_storage().await.unwrap(); + add_consensus_fields(&mut storage, &thread_rng().gen(), 3).await; + let first_block = load_final_block(&mut storage, 1).await; + let second_block = load_final_block(&mut storage, 2).await; + storage + .transactions_dal() + .reset_transactions_state(MiniblockNumber(0)) + .await; + storage + .blocks_dal() + .delete_miniblocks(MiniblockNumber(0)) + .await + .unwrap(); + let cursor = FetcherCursor::new(&mut storage).await.unwrap(); + drop(storage); + + let (actions_sender, mut actions) = ActionQueue::new(); + let storage = PostgresBlockStorage::new(pool.clone(), actions_sender, cursor); + let ctx = &ctx::test_root(&ctx::RealClock); + let ctx = &ctx.with_timeout(TEST_TIMEOUT); + storage + .schedule_next_block(ctx, &first_block) + .await + .unwrap(); + assert_first_block_actions(&mut actions).await; + + storage + .schedule_next_block(ctx, &second_block) + .await + .unwrap(); + assert_second_block_actions(&mut actions).await; +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/tests.rs b/core/lib/zksync_core/src/sync_layer/gossip/tests.rs new file mode 100644 index 000000000000..ca3ce29f4d37 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/tests.rs @@ -0,0 +1,339 @@ +//! Tests for consensus adapters for EN synchronization logic. + +use assert_matches::assert_matches; +use test_casing::{test_casing, Product}; + +use zksync_concurrency::{ctx, scope, time}; +use zksync_consensus_executor::testonly::FullValidatorConfig; +use zksync_consensus_roles::validator::{self, FinalBlock}; +use zksync_consensus_storage::{InMemoryStorage, WriteBlockStore}; +use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_types::{block::ConsensusBlockFields, Address, L1BatchNumber, MiniblockNumber}; + +use super::*; +use crate::{ + consensus, + sync_layer::{ + sync_action::SyncAction, + tests::{ + mock_l1_batch_hash_computation, run_state_keeper_with_multiple_l1_batches, + run_state_keeper_with_multiple_miniblocks, StateKeeperHandles, + }, + ActionQueue, + }, +}; + +const CLOCK_SPEEDUP: i64 = 20; +const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50 * CLOCK_SPEEDUP); + +/// Loads a block from the storage and converts it to a `FinalBlock`. +pub(super) async fn load_final_block( + storage: &mut StorageProcessor<'_>, + number: u32, +) -> FinalBlock { + let sync_block = storage + .sync_dal() + .sync_block(MiniblockNumber(number), Address::repeat_byte(1), true) + .await + .unwrap() + .unwrap_or_else(|| panic!("no sync block #{number}")); + conversions::sync_block_to_consensus_block(sync_block).unwrap() +} + +pub async fn block_payload(storage: &mut StorageProcessor<'_>, number: u32) -> validator::Payload { + let sync_block = storage + .sync_dal() + .sync_block(MiniblockNumber(number), Address::repeat_byte(1), true) + .await + .unwrap() + .unwrap_or_else(|| panic!("no sync block #{number}")); + consensus::Payload::try_from(sync_block).unwrap().encode() +} + +/// Adds consensus information for the specified `count` of miniblocks, starting from the genesis. +pub(super) async fn add_consensus_fields( + storage: &mut StorageProcessor<'_>, + validator_key: &validator::SecretKey, + count: u32, +) { + let mut prev_block_hash = validator::BlockHeaderHash::from_bytes([0; 32]); + let validator_set = validator::ValidatorSet::new([validator_key.public()]).unwrap(); + for number in 0..count { + let payload = block_payload(storage, number).await; + let block_header = validator::BlockHeader { + parent: prev_block_hash, + number: validator::BlockNumber(number.into()), + payload: payload.hash(), + }; + let replica_commit = validator::ReplicaCommit { + protocol_version: validator::CURRENT_VERSION, + view: validator::ViewNumber(number.into()), + proposal: block_header, + }; + let replica_commit = validator_key.sign_msg(replica_commit); + let justification = validator::CommitQC::from(&[replica_commit], &validator_set) + .expect("Failed creating QC"); + + let consensus = ConsensusBlockFields { + parent: prev_block_hash, + justification, + }; + storage + .blocks_dal() + .set_miniblock_consensus_fields(MiniblockNumber(number), &consensus) + .await + .unwrap(); + prev_block_hash = block_header.hash(); + } +} + +pub(super) async fn assert_first_block_actions(actions: &mut ActionQueue) -> Vec { + let mut received_actions = vec![]; + while !matches!(received_actions.last(), Some(SyncAction::SealMiniblock(_))) { + received_actions.push(actions.recv_action().await); + } + assert_matches!( + received_actions.as_slice(), + [ + SyncAction::OpenBatch { + number: L1BatchNumber(1), + timestamp: 1, + first_miniblock_info: (MiniblockNumber(1), 1), + .. + }, + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::SealMiniblock(_), + ] + ); + received_actions +} + +pub(super) async fn assert_second_block_actions(actions: &mut ActionQueue) -> Vec { + let mut received_actions = vec![]; + while !matches!(received_actions.last(), Some(SyncAction::SealMiniblock(_))) { + received_actions.push(actions.recv_action().await); + } + assert_matches!( + received_actions.as_slice(), + [ + SyncAction::Miniblock { + number: MiniblockNumber(2), + timestamp: 2, + virtual_blocks: 1, + }, + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::Tx(_), + SyncAction::SealMiniblock(_), + ] + ); + received_actions +} + +#[test_casing(4, Product(([false, true], [false, true])))] +#[tokio::test] +async fn syncing_via_gossip_fetcher(delay_first_block: bool, delay_second_block: bool) { + zksync_concurrency::testonly::abort_on_panic(); + let pool = ConnectionPool::test_pool().await; + let tx_hashes = run_state_keeper_with_multiple_miniblocks(pool.clone()).await; + + let mut storage = pool.access_storage().await.unwrap(); + let genesis_block_payload = block_payload(&mut storage, 0).await; + let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)); + let rng = &mut ctx.rng(); + let mut validator = FullValidatorConfig::for_single_validator(rng, genesis_block_payload); + let validator_set = validator.node_config.validators.clone(); + let external_node = validator.connect_full_node(rng); + + let (genesis_block, blocks) = + get_blocks_and_reset_storage(storage, &validator.validator_key).await; + let [first_block, second_block] = blocks.as_slice() else { + unreachable!("Unexpected blocks in storage: {blocks:?}"); + }; + tracing::trace!("Node storage reset"); + + let validator_storage = Arc::new(InMemoryStorage::new(genesis_block)); + if !delay_first_block { + validator_storage.put_block(ctx, first_block).await.unwrap(); + if !delay_second_block { + validator_storage + .put_block(ctx, second_block) + .await + .unwrap(); + } + } + let validator = Executor::new( + validator.node_config, + validator.node_key, + validator_storage.clone(), + ) + .unwrap(); + // ^ We intentionally do not run consensus on the validator node, since it'll produce blocks + // with payloads that cannot be parsed by the external node. + + let (actions_sender, mut actions) = ActionQueue::new(); + let (keeper_actions_sender, keeper_actions) = ActionQueue::new(); + let state_keeper = StateKeeperHandles::new(pool.clone(), keeper_actions, &[&tx_hashes]).await; + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(validator.run(ctx)); + s.spawn_bg(run_gossip_fetcher_inner( + ctx, + pool.clone(), + actions_sender, + external_node.node_config, + external_node.node_key, + )); + + if delay_first_block { + ctx.sleep(POLL_INTERVAL).await?; + validator_storage.put_block(ctx, first_block).await.unwrap(); + if !delay_second_block { + validator_storage + .put_block(ctx, second_block) + .await + .unwrap(); + } + } + + let received_actions = assert_first_block_actions(&mut actions).await; + // Manually replicate actions to the state keeper. + keeper_actions_sender.push_actions(received_actions).await; + + if delay_second_block { + validator_storage + .put_block(ctx, second_block) + .await + .unwrap(); + } + + let received_actions = assert_second_block_actions(&mut actions).await; + keeper_actions_sender.push_actions(received_actions).await; + state_keeper + .wait(|state| state.get_local_block() == MiniblockNumber(2)) + .await; + Ok(()) + }) + .await + .unwrap(); + + // Check that received blocks have consensus fields persisted. + let mut storage = pool.access_storage().await.unwrap(); + for number in [1, 2] { + let block = load_final_block(&mut storage, number).await; + block.justification.verify(&validator_set, 1).unwrap(); + } +} + +async fn get_blocks_and_reset_storage( + mut storage: StorageProcessor<'_>, + validator_key: &validator::SecretKey, +) -> (FinalBlock, Vec) { + let sealed_miniblock_number = storage + .blocks_dal() + .get_sealed_miniblock_number() + .await + .unwrap(); + add_consensus_fields(&mut storage, validator_key, sealed_miniblock_number.0 + 1).await; + let genesis_block = load_final_block(&mut storage, 0).await; + + let mut blocks = Vec::with_capacity(sealed_miniblock_number.0 as usize); + for number in 1..=sealed_miniblock_number.0 { + blocks.push(load_final_block(&mut storage, number).await); + } + + storage + .transactions_dal() + .reset_transactions_state(MiniblockNumber(0)) + .await; + storage + .blocks_dal() + .delete_miniblocks(MiniblockNumber(0)) + .await + .unwrap(); + storage + .blocks_dal() + .delete_l1_batches(L1BatchNumber(0)) + .await + .unwrap(); + (genesis_block, blocks) +} + +#[test_casing(4, [3, 2, 1, 0])] +#[tokio::test] +async fn syncing_via_gossip_fetcher_with_multiple_l1_batches(initial_block_count: usize) { + assert!(initial_block_count <= 3); + zksync_concurrency::testonly::abort_on_panic(); + + let pool = ConnectionPool::test_pool().await; + let tx_hashes = run_state_keeper_with_multiple_l1_batches(pool.clone()).await; + let tx_hashes: Vec<_> = tx_hashes.iter().map(Vec::as_slice).collect(); + + let mut storage = pool.access_storage().await.unwrap(); + let genesis_block_payload = block_payload(&mut storage, 0).await; + let ctx = &ctx::test_root(&ctx::AffineClock::new(CLOCK_SPEEDUP as f64)); + let rng = &mut ctx.rng(); + let mut validator = FullValidatorConfig::for_single_validator(rng, genesis_block_payload); + let validator_set = validator.node_config.validators.clone(); + let external_node = validator.connect_full_node(rng); + + let (genesis_block, blocks) = + get_blocks_and_reset_storage(storage, &validator.validator_key).await; + assert_eq!(blocks.len(), 3); // 2 real + 1 fictive blocks + tracing::trace!("Node storage reset"); + let (initial_blocks, delayed_blocks) = blocks.split_at(initial_block_count); + + let validator_storage = Arc::new(InMemoryStorage::new(genesis_block)); + for block in initial_blocks { + validator_storage.put_block(ctx, block).await.unwrap(); + } + let validator = Executor::new( + validator.node_config, + validator.node_key, + validator_storage.clone(), + ) + .unwrap(); + + let (actions_sender, actions) = ActionQueue::new(); + let state_keeper = StateKeeperHandles::new(pool.clone(), actions, &tx_hashes).await; + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(validator.run(ctx)); + s.spawn_bg(async { + for block in delayed_blocks { + ctx.sleep(POLL_INTERVAL).await?; + validator_storage.put_block(ctx, block).await?; + } + Ok(()) + }); + + let cloned_pool = pool.clone(); + s.spawn_bg(async { + mock_l1_batch_hash_computation(cloned_pool, 1).await; + Ok(()) + }); + s.spawn_bg(run_gossip_fetcher_inner( + ctx, + pool.clone(), + actions_sender, + external_node.node_config, + external_node.node_key, + )); + + state_keeper + .wait(|state| state.get_local_block() == MiniblockNumber(3)) + .await; + Ok(()) + }) + .await + .unwrap(); + + // Check that received blocks have consensus fields persisted. + let mut storage = pool.access_storage().await.unwrap(); + for number in [1, 2, 3] { + let block = load_final_block(&mut storage, number).await; + block.justification.verify(&validator_set, 1).unwrap(); + } +} diff --git a/core/lib/zksync_core/src/sync_layer/gossip/utils.rs b/core/lib/zksync_core/src/sync_layer/gossip/utils.rs new file mode 100644 index 000000000000..8407821a2ec6 --- /dev/null +++ b/core/lib/zksync_core/src/sync_layer/gossip/utils.rs @@ -0,0 +1,48 @@ +use std::{iter, ops}; + +use zksync_consensus_roles::validator::BlockNumber; + +/// Iterator over missing block numbers. +pub(crate) struct MissingBlockNumbers { + range: ops::Range, + existing_numbers: iter::Peekable, +} + +impl MissingBlockNumbers +where + I: Iterator, +{ + /// Creates a new iterator based on the provided params. + pub(crate) fn new(range: ops::Range, existing_numbers: I) -> Self { + Self { + range, + existing_numbers: existing_numbers.peekable(), + } + } +} + +impl Iterator for MissingBlockNumbers +where + I: Iterator, +{ + type Item = BlockNumber; + + fn next(&mut self) -> Option { + // Loop while existing numbers match the starting numbers from the range. The check + // that the range is non-empty is redundant given how `existing_numbers` are constructed + // (they are guaranteed to be lesser than the upper range bound); we add it just to be safe. + while !self.range.is_empty() + && matches!(self.existing_numbers.peek(), Some(&num) if num == self.range.start) + { + self.range.start = self.range.start.next(); + self.existing_numbers.next(); // Advance to the next number + } + + if self.range.is_empty() { + return None; + } + let next_number = self.range.start; + self.range.start = self.range.start.next(); + Some(next_number) + } +} diff --git a/core/lib/zksync_core/src/sync_layer/mod.rs b/core/lib/zksync_core/src/sync_layer/mod.rs index e216ef4f8c55..df059947e3e3 100644 --- a/core/lib/zksync_core/src/sync_layer/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/mod.rs @@ -3,6 +3,7 @@ mod client; pub mod external_io; pub mod fetcher; pub mod genesis; +mod gossip; mod metrics; pub(crate) mod sync_action; mod sync_state; @@ -10,6 +11,6 @@ mod sync_state; mod tests; pub use self::{ - client::MainNodeClient, external_io::ExternalIO, sync_action::ActionQueue, - sync_state::SyncState, + client::MainNodeClient, external_io::ExternalIO, gossip::run_gossip_fetcher, + sync_action::ActionQueue, sync_state::SyncState, }; diff --git a/core/lib/zksync_core/src/sync_layer/sync_action.rs b/core/lib/zksync_core/src/sync_layer/sync_action.rs index 977d03dd5329..b4f56999d4fd 100644 --- a/core/lib/zksync_core/src/sync_layer/sync_action.rs +++ b/core/lib/zksync_core/src/sync_layer/sync_action.rs @@ -1,6 +1,9 @@ use tokio::sync::mpsc; -use zksync_types::{Address, L1BatchNumber, MiniblockNumber, ProtocolVersionId, Transaction, H256}; +use zksync_types::{ + block::ConsensusBlockFields, Address, L1BatchNumber, MiniblockNumber, ProtocolVersionId, + Transaction, H256, +}; use super::metrics::QUEUE_METRICS; @@ -52,7 +55,7 @@ impl ActionQueueSender { return Err(format!("Unexpected Tx: {:?}", actions)); } } - SyncAction::SealMiniblock | SyncAction::SealBatch { .. } => { + SyncAction::SealMiniblock(_) | SyncAction::SealBatch { .. } => { if !opened || miniblock_sealed { return Err(format!("Unexpected SealMiniblock/SealBatch: {:?}", actions)); } @@ -89,7 +92,7 @@ impl ActionQueue { } /// Removes the first action from the queue. - pub(crate) fn pop_action(&mut self) -> Option { + pub(super) fn pop_action(&mut self) -> Option { if let Some(peeked) = self.peeked.take() { QUEUE_METRICS.action_queue_size.dec_by(1); return Some(peeked); @@ -101,8 +104,19 @@ impl ActionQueue { action } + #[cfg(test)] + pub(super) async fn recv_action(&mut self) -> SyncAction { + if let Some(peeked) = self.peeked.take() { + return peeked; + } + self.receiver + .recv() + .await + .expect("actions sender was dropped prematurely") + } + /// Returns the first action from the queue without removing it. - pub(crate) fn peek_action(&mut self) -> Option { + pub(super) fn peek_action(&mut self) -> Option { if let Some(action) = &self.peeked { return Some(action.clone()); } @@ -135,11 +149,13 @@ pub(crate) enum SyncAction { /// that they are sealed, but at the same time the next miniblock may not exist yet. /// By having a dedicated action for that we prevent a situation where the miniblock is kept open on the EN until /// the next one is sealed on the main node. - SealMiniblock, + SealMiniblock(Option), /// Similarly to `SealMiniblock` we must be able to seal the batch even if there is no next miniblock yet. SealBatch { - // Virtual blocks count for the fictive miniblock. + /// Virtual blocks count for the fictive miniblock. virtual_blocks: u32, + /// Consensus-related fields for the fictive miniblock. + consensus: Option, }, } @@ -193,11 +209,14 @@ mod tests { } fn seal_miniblock() -> SyncAction { - SyncAction::SealMiniblock + SyncAction::SealMiniblock(None) } fn seal_batch() -> SyncAction { - SyncAction::SealBatch { virtual_blocks: 1 } + SyncAction::SealBatch { + virtual_blocks: 1, + consensus: None, + } } #[test] diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index 576cb56dd7dd..4a337bbf5dc3 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -16,11 +16,7 @@ use zksync_types::{ api, Address, L1BatchNumber, L2ChainId, MiniblockNumber, ProtocolVersionId, Transaction, H256, }; -use super::{ - fetcher::MainNodeFetcherCursor, - sync_action::{ActionQueueSender, SyncAction}, - *, -}; +use super::{fetcher::FetcherCursor, sync_action::SyncAction, *}; use crate::{ api_server::web3::tests::spawn_http_server, genesis::{ensure_genesis_state, GenesisParams}, @@ -146,15 +142,52 @@ fn open_l1_batch(number: u32, timestamp: u64, first_miniblock_number: u32) -> Sy } #[derive(Debug)] -struct StateKeeperHandles { - actions_sender: ActionQueueSender, - stop_sender: watch::Sender, - sync_state: SyncState, - task: JoinHandle>, +pub(super) struct StateKeeperHandles { + pub stop_sender: watch::Sender, + pub sync_state: SyncState, + pub task: JoinHandle>, } impl StateKeeperHandles { - async fn wait(self, mut condition: impl FnMut(&SyncState) -> bool) { + /// `tx_hashes` are grouped by the L1 batch. + pub async fn new(pool: ConnectionPool, actions: ActionQueue, tx_hashes: &[&[H256]]) -> Self { + assert!(!tx_hashes.is_empty()); + assert!(tx_hashes.iter().all(|tx_hashes| !tx_hashes.is_empty())); + + ensure_genesis(&mut pool.access_storage().await.unwrap()).await; + + let sync_state = SyncState::new(); + let io = ExternalIO::new( + pool, + actions, + sync_state.clone(), + Box::::default(), + Address::repeat_byte(1), + u32::MAX, + L2ChainId::default(), + ) + .await; + + let (stop_sender, stop_receiver) = watch::channel(false); + let mut batch_executor_base = TestBatchExecutorBuilder::default(); + for &tx_hashes_in_l1_batch in tx_hashes { + batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch); + } + + let state_keeper = ZkSyncStateKeeper::without_sealer( + stop_receiver, + Box::new(io), + Box::new(batch_executor_base), + ); + Self { + stop_sender, + sync_state, + task: tokio::spawn(state_keeper.run()), + } + } + + /// Waits for the given condition. + pub async fn wait(self, mut condition: impl FnMut(&SyncState) -> bool) { let started_at = Instant::now(); loop { assert!( @@ -187,45 +220,6 @@ async fn ensure_genesis(storage: &mut StorageProcessor<'_>) { } } -/// `tx_hashes` are grouped by the L1 batch. -async fn run_state_keeper(pool: ConnectionPool, tx_hashes: &[&[H256]]) -> StateKeeperHandles { - assert!(!tx_hashes.is_empty()); - assert!(tx_hashes.iter().all(|tx_hashes| !tx_hashes.is_empty())); - - ensure_genesis(&mut pool.access_storage().await.unwrap()).await; - - let (actions_sender, actions) = ActionQueue::new(); - let sync_state = SyncState::new(); - let io = ExternalIO::new( - pool, - actions, - sync_state.clone(), - Box::::default(), - Address::repeat_byte(1), - u32::MAX, - L2ChainId::default(), - ) - .await; - - let (stop_sender, stop_receiver) = watch::channel(false); - let mut batch_executor_base = TestBatchExecutorBuilder::default(); - for &tx_hashes_in_l1_batch in tx_hashes { - batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch); - } - - let state_keeper = ZkSyncStateKeeper::without_sealer( - stop_receiver, - Box::new(io), - Box::new(batch_executor_base), - ); - StateKeeperHandles { - actions_sender, - stop_sender, - sync_state, - task: tokio::spawn(state_keeper.run()), - } -} - fn extract_tx_hashes<'a>(actions: impl IntoIterator) -> Vec { actions .into_iter() @@ -246,10 +240,12 @@ async fn external_io_basics() { let tx = create_l2_transaction(10, 100); let tx_hash = tx.hash(); let tx = SyncAction::Tx(Box::new(tx.into())); - let actions = vec![open_l1_batch, tx, SyncAction::SealMiniblock]; + let actions = vec![open_l1_batch, tx, SyncAction::SealMiniblock(None)]; - let state_keeper = run_state_keeper(pool.clone(), &[&extract_tx_hashes(&actions)]).await; - state_keeper.actions_sender.push_actions(actions).await; + let (actions_sender, action_queue) = ActionQueue::new(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), action_queue, &[&extract_tx_hashes(&actions)]).await; + actions_sender.push_actions(actions).await; // Wait until the miniblock is sealed. state_keeper .wait(|state| state.get_local_block() == MiniblockNumber(1)) @@ -279,7 +275,7 @@ async fn external_io_basics() { assert_eq!(tx_receipt.transaction_index, 0.into()); } -async fn run_state_keeper_with_multiple_miniblocks(pool: ConnectionPool) -> Vec { +pub(super) async fn run_state_keeper_with_multiple_miniblocks(pool: ConnectionPool) -> Vec { let open_l1_batch = open_l1_batch(1, 1, 1); let txs = (0..5).map(|_| { let tx = create_l2_transaction(10, 100); @@ -287,7 +283,7 @@ async fn run_state_keeper_with_multiple_miniblocks(pool: ConnectionPool) -> Vec< }); let first_miniblock_actions: Vec<_> = iter::once(open_l1_batch) .chain(txs) - .chain([SyncAction::SealMiniblock]) + .chain([SyncAction::SealMiniblock(None)]) .collect(); let open_miniblock = SyncAction::Miniblock { @@ -301,7 +297,7 @@ async fn run_state_keeper_with_multiple_miniblocks(pool: ConnectionPool) -> Vec< }); let second_miniblock_actions: Vec<_> = iter::once(open_miniblock) .chain(more_txs) - .chain([SyncAction::SealMiniblock]) + .chain([SyncAction::SealMiniblock(None)]) .collect(); let tx_hashes = extract_tx_hashes( @@ -309,15 +305,10 @@ async fn run_state_keeper_with_multiple_miniblocks(pool: ConnectionPool) -> Vec< .iter() .chain(&second_miniblock_actions), ); - let state_keeper = run_state_keeper(pool, &[&tx_hashes]).await; - state_keeper - .actions_sender - .push_actions(first_miniblock_actions) - .await; - state_keeper - .actions_sender - .push_actions(second_miniblock_actions) - .await; + let (actions_sender, action_queue) = ActionQueue::new(); + let state_keeper = StateKeeperHandles::new(pool, action_queue, &[&tx_hashes]).await; + actions_sender.push_actions(first_miniblock_actions).await; + actions_sender.push_actions(second_miniblock_actions).await; // Wait until both miniblocks are sealed. state_keeper .wait(|state| state.get_local_block() == MiniblockNumber(2)) @@ -366,7 +357,8 @@ async fn test_external_io_recovery(pool: ConnectionPool, mut tx_hashes: Vec Vec> { let l1_batch = open_l1_batch(1, 1, 1); let first_tx = create_l2_transaction(10, 100); let first_tx_hash = first_tx.hash(); let first_tx = SyncAction::Tx(Box::new(first_tx.into())); - let first_l1_batch_actions = vec![l1_batch, first_tx, SyncAction::SealMiniblock]; + let first_l1_batch_actions = vec![l1_batch, first_tx, SyncAction::SealMiniblock(None)]; let fictive_miniblock = SyncAction::Miniblock { number: MiniblockNumber(2), timestamp: 2, virtual_blocks: 0, }; - let seal_l1_batch = SyncAction::SealBatch { virtual_blocks: 0 }; + let seal_l1_batch = SyncAction::SealBatch { + virtual_blocks: 0, + consensus: None, + }; let fictive_miniblock_actions = vec![fictive_miniblock, seal_l1_batch]; let l1_batch = open_l1_batch(2, 3, 3); let second_tx = create_l2_transaction(10, 100); let second_tx_hash = second_tx.hash(); let second_tx = SyncAction::Tx(Box::new(second_tx.into())); - let second_l1_batch_actions = vec![l1_batch, second_tx, SyncAction::SealMiniblock]; + let second_l1_batch_actions = vec![l1_batch, second_tx, SyncAction::SealMiniblock(None)]; - let state_keeper = run_state_keeper(pool.clone(), &[&[first_tx_hash], &[second_tx_hash]]).await; - state_keeper - .actions_sender - .push_actions(first_l1_batch_actions) - .await; - state_keeper - .actions_sender - .push_actions(fictive_miniblock_actions) - .await; - state_keeper - .actions_sender - .push_actions(second_l1_batch_actions) - .await; + let (actions_sender, action_queue) = ActionQueue::new(); + let state_keeper = StateKeeperHandles::new( + pool.clone(), + action_queue, + &[&[first_tx_hash], &[second_tx_hash]], + ) + .await; + actions_sender.push_actions(first_l1_batch_actions).await; + actions_sender.push_actions(fictive_miniblock_actions).await; + actions_sender.push_actions(second_l1_batch_actions).await; let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1)); // Wait until the miniblocks are sealed. @@ -463,6 +456,14 @@ async fn external_io_with_multiple_l1_batches() { .await; hash_task.await.unwrap(); + vec![vec![first_tx_hash], vec![second_tx_hash]] +} + +#[tokio::test] +async fn external_io_with_multiple_l1_batches() { + let pool = ConnectionPool::test_pool().await; + run_state_keeper_with_multiple_l1_batches(pool.clone()).await; + let mut storage = pool.access_storage().await.unwrap(); let l1_batch_header = storage .blocks_dal() @@ -497,9 +498,9 @@ async fn fetcher_basics() { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); ensure_genesis(&mut storage).await; - let fetcher_cursor = MainNodeFetcherCursor::new(&mut storage).await.unwrap(); + let fetcher_cursor = FetcherCursor::new(&mut storage).await.unwrap(); assert_eq!(fetcher_cursor.l1_batch, L1BatchNumber(0)); - assert_eq!(fetcher_cursor.miniblock, MiniblockNumber(1)); + assert_eq!(fetcher_cursor.next_miniblock, MiniblockNumber(1)); drop(storage); let mut mock_client = MockMainNodeClient::default(); @@ -529,15 +530,11 @@ async fn fetcher_basics() { let mut current_miniblock_number = MiniblockNumber(0); let mut tx_count_in_miniblock = 0; let started_at = Instant::now(); + let deadline = started_at + TEST_TIMEOUT; loop { - assert!( - started_at.elapsed() <= TEST_TIMEOUT, - "Timed out waiting for fetcher" - ); - let Some(action) = actions.pop_action() else { - tokio::time::sleep(POLL_INTERVAL).await; - continue; - }; + let action = tokio::time::timeout_at(deadline.into(), actions.recv_action()) + .await + .unwrap(); match action { SyncAction::OpenBatch { number, .. } => { current_l1_batch_number += 1; @@ -550,7 +547,7 @@ async fn fetcher_basics() { tx_count_in_miniblock = 0; assert_eq!(number, current_miniblock_number); } - SyncAction::SealBatch { virtual_blocks } => { + SyncAction::SealBatch { virtual_blocks, .. } => { assert_eq!(virtual_blocks, 0); assert_eq!(tx_count_in_miniblock, 0); if current_miniblock_number == MiniblockNumber(5) { @@ -561,7 +558,7 @@ async fn fetcher_basics() { assert_eq!(tx.hash(), tx_hashes.pop_front().unwrap()); tx_count_in_miniblock += 1; } - SyncAction::SealMiniblock => { + SyncAction::SealMiniblock(_) => { assert_eq!(tx_count_in_miniblock, 1); } } @@ -577,6 +574,15 @@ async fn fetcher_with_real_server() { // Fill in transactions grouped in multiple miniblocks in the storage. let tx_hashes = run_state_keeper_with_multiple_miniblocks(pool.clone()).await; let mut tx_hashes = VecDeque::from(tx_hashes); + let mut connection = pool.access_storage().await.unwrap(); + let genesis_miniblock_hash = connection + .blocks_dal() + .get_miniblock_header(MiniblockNumber(0)) + .await + .unwrap() + .expect("No genesis miniblock") + .hash; + drop(connection); // Start the API server. let network_config = NetworkConfig::for_tests(); @@ -590,8 +596,9 @@ async fn fetcher_with_real_server() { let sync_state = SyncState::default(); let (actions_sender, mut actions) = ActionQueue::new(); let client = ::json_rpc(&format!("http://{server_addr}/")).unwrap(); - let fetcher_cursor = MainNodeFetcherCursor { - miniblock: MiniblockNumber(1), + let fetcher_cursor = FetcherCursor { + next_miniblock: MiniblockNumber(1), + prev_miniblock_hash: genesis_miniblock_hash, l1_batch: L1BatchNumber(0), }; let fetcher = fetcher_cursor.into_fetcher( @@ -607,15 +614,11 @@ async fn fetcher_with_real_server() { let mut tx_count_in_miniblock = 0; let miniblock_number_to_tx_count = HashMap::from([(1, 5), (2, 3)]); let started_at = Instant::now(); + let deadline = started_at + TEST_TIMEOUT; loop { - assert!( - started_at.elapsed() <= TEST_TIMEOUT, - "Timed out waiting for fetcher actions" - ); - let Some(action) = actions.pop_action() else { - tokio::time::sleep(POLL_INTERVAL).await; - continue; - }; + let action = tokio::time::timeout_at(deadline.into(), actions.recv_action()) + .await + .unwrap(); match action { SyncAction::OpenBatch { number, @@ -637,7 +640,7 @@ async fn fetcher_with_real_server() { assert_eq!(tx.hash(), tx_hashes.pop_front().unwrap()); tx_count_in_miniblock += 1; } - SyncAction::SealMiniblock => { + SyncAction::SealMiniblock(_) => { assert_eq!( tx_count_in_miniblock, miniblock_number_to_tx_count[¤t_miniblock_number] diff --git a/prover/Cargo.lock b/prover/Cargo.lock index d100677d7469..d27b787084f1 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -324,6 +324,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "beef" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" + [[package]] name = "bellman_ce" version = "0.3.2" @@ -617,6 +623,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "blst" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c94087b935a822949d3291a9989ad2b2051ea141eda0fd4e478a75f6aa3e604b" +dependencies = [ + "cc", + "glob", + "threadpool", + "zeroize", +] + [[package]] name = "boojum" version = "0.1.0" @@ -1350,6 +1368,34 @@ dependencies = [ "serde_json", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "curve25519-dalek-derive", + "digest 0.10.7", + "fiat-crypto", + "platforms", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 2.0.39", +] + [[package]] name = "darling" version = "0.13.4" @@ -1429,6 +1475,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "der" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +dependencies = [ + "const-oid 0.9.5", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.9" @@ -1524,7 +1580,32 @@ dependencies = [ "der 0.6.1", "elliptic-curve", "rfc6979", - "signature", + "signature 1.6.4", +] + +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8 0.10.2", + "signature 2.2.0", +] + +[[package]] +name = "ed25519-dalek" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" +dependencies = [ + "curve25519-dalek", + "ed25519", + "rand_core 0.6.4", + "serde", + "sha2 0.10.8", + "subtle", + "zeroize", ] [[package]] @@ -1764,6 +1845,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "fiat-crypto" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" + [[package]] name = "findshlibs" version = "0.10.2" @@ -1812,6 +1899,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "fnv" version = "1.0.7" @@ -2840,6 +2933,38 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "logos" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c000ca4d908ff18ac99b93a062cb8958d331c3220719c52e77cb19cc6ac5d2c1" +dependencies = [ + "logos-derive", +] + +[[package]] +name = "logos-codegen" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc487311295e0002e452025d6b580b77bb17286de87b57138f3b5db711cded68" +dependencies = [ + "beef", + "fnv", + "proc-macro2 1.0.69", + "quote 1.0.33", + "regex-syntax 0.6.29", + "syn 2.0.39", +] + +[[package]] +name = "logos-derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbfc0d229f1f42d790440136d941afd806bc9e949e2bcb8faa813b0f00d1267e" +dependencies = [ + "logos-codegen", +] + [[package]] name = "mach2" version = "0.4.1" @@ -2959,6 +3084,29 @@ dependencies = [ "sketches-ddsketch", ] +[[package]] +name = "miette" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e" +dependencies = [ + "miette-derive", + "once_cell", + "thiserror", + "unicode-width", +] + +[[package]] +name = "miette-derive" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 2.0.39", +] + [[package]] name = "mime" version = "0.3.17" @@ -3016,6 +3164,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "multivm" version = "0.1.0" @@ -3395,6 +3549,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "os_info" version = "3.7.0" @@ -3435,6 +3598,18 @@ dependencies = [ "serde", ] +[[package]] +name = "pairing_ce" +version = "0.28.5" +source = "git+https://github.com/matter-labs/pairing.git?rev=f55393f#f55393fd366596eac792d78525d26e9c4d6ed1ca" +dependencies = [ + "byteorder", + "cfg-if 1.0.0", + "ff_ce", + "rand 0.4.6", + "serde", +] + [[package]] name = "pairing_ce" version = "0.28.5" @@ -3684,6 +3859,16 @@ dependencies = [ "sha2 0.10.8", ] +[[package]] +name = "petgraph" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +dependencies = [ + "fixedbitset", + "indexmap 2.1.0", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -3748,12 +3933,28 @@ dependencies = [ "spki 0.6.0", ] +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der 0.7.8", + "spki 0.7.2", +] + [[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" + [[package]] name = "plotters" version = "0.3.5" @@ -3949,6 +4150,103 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5a410fc7882af66deb8d01d01737353cf3ad6204c408177ba494291a626312" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa3d084c8704911bfefb2771be2f9b6c5c0da7343a71e0021ee3c665cada738" +dependencies = [ + "bytes", + "heck 0.4.1", + "itertools 0.11.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.39", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065717a5dfaca4a83d2fe57db3487b311365200000551d7a364e715dbf4346bc" +dependencies = [ + "anyhow", + "itertools 0.11.0", + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 2.0.39", +] + +[[package]] +name = "prost-reflect" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "057237efdb71cf4b3f9396302a3d6599a92fa94063ba537b66130980ea9909f3" +dependencies = [ + "base64 0.21.5", + "logos", + "miette", + "once_cell", + "prost", + "prost-types", + "serde", + "serde-value", +] + +[[package]] +name = "prost-types" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8339f32236f590281e2f6368276441394fcd1b2133b549cc895d0ae80f2f9a52" +dependencies = [ + "prost", +] + +[[package]] +name = "protox" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00bb76c5f6221de491fe2c8f39b106330bbd9762c6511119c07940e10eb9ff11" +dependencies = [ + "bytes", + "miette", + "prost", + "prost-reflect", + "prost-types", + "protox-parse", + "thiserror", +] + +[[package]] +name = "protox-parse" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4581f441c58863525a3e6bec7b8de98188cf75239a56c725a3e7288450a33f" +dependencies = [ + "logos", + "miette", + "prost-types", + "thiserror", +] + [[package]] name = "prover-service" version = "0.1.0" @@ -4004,6 +4302,15 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-protobuf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f" +dependencies = [ + "byteorder", +] + [[package]] name = "quote" version = "0.6.13" @@ -4881,6 +5188,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.192" @@ -5096,6 +5413,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "simple_asn1" version = "0.6.2" @@ -5209,6 +5535,16 @@ dependencies = [ "der 0.6.1", ] +[[package]] +name = "spki" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" +dependencies = [ + "base64ct", + "der 0.7.8", +] + [[package]] name = "splitmut" version = "0.2.1" @@ -5571,6 +5907,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "time" version = "0.3.30" @@ -6448,6 +6793,20 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 2.0.39", +] [[package]] name = "zk_evm" @@ -6697,6 +7056,24 @@ dependencies = [ "zksync_verification_key_generator_and_server", ] +[[package]] +name = "zksync_concurrency" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "anyhow", + "once_cell", + "pin-project", + "rand 0.8.5", + "sha3 0.10.8", + "thiserror", + "time", + "tokio", + "tracing", + "tracing-subscriber", + "vise", +] + [[package]] name = "zksync_config" version = "0.1.0" @@ -6706,6 +7083,52 @@ dependencies = [ "zksync_basic_types", ] +[[package]] +name = "zksync_consensus_crypto" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "anyhow", + "blst", + "ed25519-dalek", + "ff_ce", + "hex", + "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=f55393f)", + "rand 0.4.6", + "rand 0.8.5", + "sha3 0.10.8", + "thiserror", + "tracing", +] + +[[package]] +name = "zksync_consensus_roles" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "anyhow", + "bit-vec", + "hex", + "prost", + "rand 0.8.5", + "serde", + "tracing", + "zksync_concurrency", + "zksync_consensus_crypto", + "zksync_consensus_utils", + "zksync_protobuf", + "zksync_protobuf_build", +] + +[[package]] +name = "zksync_consensus_utils" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "thiserror", + "zksync_concurrency", +] + [[package]] name = "zksync_contracts" version = "0.1.0" @@ -6875,6 +7298,40 @@ dependencies = [ "zksync_utils", ] +[[package]] +name = "zksync_protobuf" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "anyhow", + "bit-vec", + "once_cell", + "prost", + "prost-reflect", + "quick-protobuf", + "rand 0.8.5", + "serde", + "serde_json", + "zksync_concurrency", + "zksync_protobuf_build", +] + +[[package]] +name = "zksync_protobuf_build" +version = "0.1.0" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ed71b2e817c980a2daffef6a01885219e1dc6fa0#ed71b2e817c980a2daffef6a01885219e1dc6fa0" +dependencies = [ + "anyhow", + "heck 0.4.1", + "prettyplease", + "proc-macro2 1.0.69", + "prost-build", + "prost-reflect", + "protox", + "quote 1.0.33", + "syn 2.0.39", +] + [[package]] name = "zksync_prover" version = "0.1.0" @@ -7071,6 +7528,7 @@ dependencies = [ name = "zksync_types" version = "0.1.0" dependencies = [ + "anyhow", "blake2 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)", "chrono", "codegen 0.1.0", @@ -7080,6 +7538,7 @@ dependencies = [ "num_enum", "once_cell", "parity-crypto", + "prost", "rlp", "serde", "serde_json", @@ -7090,8 +7549,11 @@ dependencies = [ "zk_evm 1.4.0", "zkevm_test_harness 1.3.3", "zksync_basic_types", + "zksync_consensus_roles", "zksync_contracts", "zksync_mini_merkle_tree", + "zksync_protobuf", + "zksync_protobuf_build", "zksync_system_constants", "zksync_utils", ]