Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Implement gossip fetcher #371

Merged
merged 43 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
bbd3331
Move `BufferedStorage` from consensus module
slowli Oct 24, 2023
371d935
Sketch Postgres-based `BlockStore`
slowli Oct 25, 2023
259585b
Sketch gossip-based block fetcher
slowli Oct 26, 2023
cac0127
Update fetched block conversions
slowli Oct 30, 2023
e146c5d
Test Postgres-based block store implementation
slowli Oct 30, 2023
8d59f5c
Test gossip-based fetcher
slowli Oct 31, 2023
6e3127f
Refactor and add more tests for gossip fetcher
slowli Nov 1, 2023
2ba0f40
Use git revision of consensus deps
slowli Nov 1, 2023
b033e94
Brush up `start_gossip_fetcher()` entrypoint
slowli Nov 1, 2023
dfe730a
Update from upstream
slowli Nov 1, 2023
5dff11c
Simplify `ActionQueue` management in tests
slowli Nov 1, 2023
0e8c651
Use published version of `test-casing`
slowli Nov 2, 2023
0033abb
Add metrics for gossip fetcher
slowli Nov 2, 2023
3e0bc81
Update revision for consensus deps
slowli Nov 2, 2023
7d89e00
Add consensus-specific fields for miniblocks
slowli Nov 2, 2023
2ee6142
Use new fields in block conversions
slowli Nov 2, 2023
e46f1e4
Sketch consensus fields storage in seal logic
slowli Nov 3, 2023
9371f97
Update gossip fetcher tests
slowli Nov 3, 2023
2574d35
Test consensus fields persistence
slowli Nov 3, 2023
4481106
Update revision of consensus deps
slowli Nov 3, 2023
97e4a9a
Resolve FIXMEs in fetcher logic
slowli Nov 3, 2023
ab79a15
Fix newline ending in new SQL query
slowli Nov 3, 2023
368df8b
Update from upstream
slowli Nov 3, 2023
691171b
Sketch more advanced "last block in batch" logic
slowli Nov 6, 2023
88c26f8
Fix background tasks for `Buffered` storage
slowli Nov 6, 2023
5995eec
Do not propagate cancellations in background tasks
slowli Nov 6, 2023
b9fb194
Update cancellation logic in line with consensus repo
slowli Nov 7, 2023
530bcdf
Fix issues in `storage` module
slowli Nov 7, 2023
b618aaf
Create `CommitQCBytes` wrapper
slowli Nov 7, 2023
a7de013
Rework `FetchedBlock::from_gossip_block()`
slowli Nov 7, 2023
8e61d1a
Update from upstream
slowli Nov 13, 2023
c12ff08
Update revision of consensus crates
slowli Nov 13, 2023
0678d56
Use `Ctx::wait()` instead of `tokio::select!`
slowli Nov 13, 2023
c651cc4
Update from upstream
slowli Nov 17, 2023
f03e79a
Update `sync_layer` logic
slowli Nov 17, 2023
986ba28
Remove unused `sqlx` query
slowli Nov 17, 2023
4b0c920
Remove consensus fields from sealing logic
slowli Nov 17, 2023
195d921
Update prover workspace lockfile
slowli Nov 17, 2023
783556b
Update consensus deps revision
slowli Nov 17, 2023
ee180fc
Propagate context awareness further in `PostgresBlockStorage`
slowli Nov 20, 2023
c2c8d18
Remove `ctx.wait()` in `schedule_next_block()`
slowli Nov 20, 2023
215e021
Update from upstream
slowli Nov 20, 2023
78d4944
Document data flow
slowli Nov 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 15 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use zksync_core::{
setup_sigint_handler,
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
fetcher::MainNodeFetcherCursor, genesis::perform_genesis_if_needed, ActionQueue,
MainNodeClient, SyncState,
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState,
},
};
use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
Expand Down Expand Up @@ -129,7 +128,7 @@ async fn init_tasks(
.await
.context("failed to build a connection pool for `MainNodeFetcher`")?;
let mut storage = pool.access_storage_tagged("sync_layer").await?;
MainNodeFetcherCursor::new(&mut storage)
FetcherCursor::new(&mut storage)
.await
.context("failed to load `MainNodeFetcher` cursor from Postgres")?
};
Expand Down
6 changes: 3 additions & 3 deletions core/lib/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ codegen = { git = "https://github.com/matter-labs/solidity_plonk_verifier.git",
zkevm_test_harness = { git = "https://github.com/matter-labs/era-zkevm_test_harness.git", branch = "v1.3.3" }
zk_evm_1_4_0 = { git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0", package = "zk_evm" }
zk_evm = { git = "https://github.com/matter-labs/era-zk_evm.git", tag = "v1.3.3-rc2" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }

anyhow = "1.0.75"
chrono = { version = "0.4", features = ["serde"] }
Expand Down Expand Up @@ -55,4 +55,4 @@ tokio = { version = "1", features = ["rt", "macros"] }
serde_with = { version = "1", features = ["hex"] }

[build-dependencies]
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
12 changes: 6 additions & 6 deletions core/lib/zksync_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ vlog = { path = "../vlog" }

multivm = { path = "../multivm" }
# Consensus dependenices
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }

prost = "0.12.1"
serde = { version = "1.0", features = ["derive"] }
Expand Down Expand Up @@ -98,4 +98,4 @@ tempfile = "3.0.2"
test-casing = "0.1.2"

[build-dependencies]
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
8 changes: 2 additions & 6 deletions core/lib/zksync_core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_roles::validator;
use zksync_types::block::ConsensusBlockFields;
use zksync_types::{Address, MiniblockNumber};
//! Consensus-related functionality.

mod payload;
mod proto;

pub(crate) use payload::Payload;
pub(crate) use self::payload::Payload;
39 changes: 23 additions & 16 deletions core/lib/zksync_core/src/consensus/payload.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use anyhow::Context as _;

use zksync_consensus_roles::validator;
use zksync_protobuf::{required, ProtoFmt};
use zksync_types::api::en::SyncBlock;
use zksync_types::{Address, L1BatchNumber, Transaction, H256};

/// L2 block (= miniblock) payload.
#[derive(Debug)]
pub(crate) struct Payload {
pub hash: H256,
pub l1_batch_number: L1BatchNumber,
Expand All @@ -17,28 +20,31 @@ pub(crate) struct Payload {

impl ProtoFmt for Payload {
type Proto = super::proto::Payload;
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
let mut transactions = vec![];
for (i, t) in r.transactions.iter().enumerate() {

fn read(message: &Self::Proto) -> anyhow::Result<Self> {
let mut transactions = Vec::with_capacity(message.transactions.len());
for (i, tx) in message.transactions.iter().enumerate() {
transactions.push(
required(&t.json)
.and_then(|s| Ok(serde_json::from_str(&*s)?))
required(&tx.json)
.and_then(|json_str| Ok(serde_json::from_str(json_str)?))
.with_context(|| format!("transaction[{i}]"))?,
);
}

Ok(Self {
hash: required(&r.hash)
.and_then(|h| Ok(<[u8; 32]>::try_from(h.as_slice())?.into()))
hash: required(&message.hash)
.and_then(|bytes| Ok(<[u8; 32]>::try_from(bytes.as_slice())?.into()))
.context("hash")?,
l1_batch_number: L1BatchNumber(
*required(&r.l1_batch_number).context("l1_batch_number")?,
*required(&message.l1_batch_number).context("l1_batch_number")?,
),
timestamp: *required(&r.timestamp).context("timestamp")?,
l1_gas_price: *required(&r.l1_gas_price).context("l1_gas_price")?,
l2_fair_gas_price: *required(&r.l2_fair_gas_price).context("l2_fair_gas_price")?,
virtual_blocks: *required(&r.virtual_blocks).context("virtual_blocks")?,
operator_address: required(&r.operator_address)
.and_then(|a| Ok(<[u8; 20]>::try_from(a.as_slice())?.into()))
timestamp: *required(&message.timestamp).context("timestamp")?,
l1_gas_price: *required(&message.l1_gas_price).context("l1_gas_price")?,
l2_fair_gas_price: *required(&message.l2_fair_gas_price)
.context("l2_fair_gas_price")?,
virtual_blocks: *required(&message.virtual_blocks).context("virtual_blocks")?,
operator_address: required(&message.operator_address)
.and_then(|bytes| Ok(<[u8; 20]>::try_from(bytes.as_slice())?.into()))
.context("operator_address")?,
transactions,
})
Expand Down Expand Up @@ -67,6 +73,7 @@ impl ProtoFmt for Payload {

impl TryFrom<SyncBlock> for Payload {
type Error = anyhow::Error;

fn try_from(block: SyncBlock) -> anyhow::Result<Self> {
Ok(Self {
hash: block.hash.unwrap_or_default(),
Expand All @@ -82,8 +89,8 @@ impl TryFrom<SyncBlock> for Payload {
}

impl Payload {
pub fn decode(p: &validator::Payload) -> anyhow::Result<Self> {
zksync_protobuf::decode(&p.0)
pub fn decode(payload: &validator::Payload) -> anyhow::Result<Self> {
zksync_protobuf::decode(&payload.0)
}

pub fn encode(&self) -> validator::Payload {
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use zksync_verification_key_server::get_cached_commitments;
pub mod api_server;
pub mod basic_witness_input_producer;
pub mod block_reverter;
mod consensus;
pub mod consistency_checker;
pub mod data_fetchers;
pub mod eth_sender;
Expand Down
47 changes: 33 additions & 14 deletions core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,10 @@ impl IoSealCriteria for ExternalIO {
}

fn should_seal_miniblock(&mut self, _manager: &UpdatesManager) -> bool {
matches!(self.actions.peek_action(), Some(SyncAction::SealMiniblock))
matches!(
self.actions.peek_action(),
Some(SyncAction::SealMiniblock(_))
)
}
}

Expand Down Expand Up @@ -426,7 +429,7 @@ impl StateKeeperIO for ExternalIO {
virtual_blocks,
});
}
Some(SyncAction::SealBatch { virtual_blocks }) => {
Some(SyncAction::SealBatch { virtual_blocks, .. }) => {
// We've reached the next batch, so this situation would be handled by the batch sealer.
// No need to pop the action from the queue.
// It also doesn't matter which timestamp we return, since there will be no more miniblocks in this
Expand Down Expand Up @@ -492,12 +495,9 @@ impl StateKeeperIO for ExternalIO {
}

async fn seal_miniblock(&mut self, updates_manager: &UpdatesManager) {
match self.actions.pop_action() {
Some(SyncAction::SealMiniblock) => {}
other => panic!(
"State keeper requested to seal miniblock, but the next action is {:?}",
other
),
let action = self.actions.pop_action();
let Some(SyncAction::SealMiniblock(consensus)) = action else {
panic!("State keeper requested to seal miniblock, but the next action is {action:?}");
};

let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();
Expand Down Expand Up @@ -539,6 +539,16 @@ impl StateKeeperIO for ExternalIO {
self.l2_erc20_bridge_addr,
);
command.seal(&mut transaction).await;

// We want to add miniblock consensus fields atomically with the miniblock data so that we
// don't need to deal with corner cases (e.g., a miniblock w/o consensus fields).
if let Some(consensus) = &consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.current_miniblock_number, consensus)
.await
.unwrap();
}
transaction.commit().await.unwrap();

self.sync_state
Expand All @@ -555,23 +565,32 @@ impl StateKeeperIO for ExternalIO {
l1_batch_env: &L1BatchEnv,
finished_batch: FinishedL1Batch,
) -> anyhow::Result<()> {
match self.actions.pop_action() {
Some(SyncAction::SealBatch { .. }) => {}
other => anyhow::bail!(
"State keeper requested to seal the batch, but the next action is {other:?}"
),
let action = self.actions.pop_action();
let Some(SyncAction::SealBatch { consensus, .. }) = action else {
anyhow::bail!(
"State keeper requested to seal the batch, but the next action is {action:?}"
);
};

let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();
let mut transaction = storage.start_transaction().await.unwrap();
updates_manager
.seal_l1_batch(
&mut storage,
&mut transaction,
self.current_miniblock_number,
l1_batch_env,
finished_batch,
self.l2_erc20_bridge_addr,
)
.await;
if let Some(consensus) = &consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.current_miniblock_number, consensus)
.await
.unwrap();
}
transaction.commit().await.unwrap();

tracing::info!("Batch {} is sealed", self.current_l1_batch_number);

Expand Down
Loading