Skip to content

Commit

Permalink
feat(en): Implement gossip fetcher (#371)
Browse files Browse the repository at this point in the history
# What ❔

...i.e., a fetcher component that would use gossip network instead of
JSON-RPC API.
Fixes BFT-326 and BFT-368.

## Why ❔

This can be used by external nodes to sync with the main node.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Nov 23, 2023
1 parent e59a7c6 commit a49b61d
Show file tree
Hide file tree
Showing 22 changed files with 2,364 additions and 271 deletions.
33 changes: 15 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use zksync_core::{
setup_sigint_handler,
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
fetcher::MainNodeFetcherCursor, genesis::perform_genesis_if_needed, ActionQueue,
MainNodeClient, SyncState,
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState,
},
};
use zksync_dal::{healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
Expand Down Expand Up @@ -128,7 +127,7 @@ async fn init_tasks(
.await
.context("failed to build a connection pool for `MainNodeFetcher`")?;
let mut storage = pool.access_storage_tagged("sync_layer").await?;
MainNodeFetcherCursor::new(&mut storage)
FetcherCursor::new(&mut storage)
.await
.context("failed to load `MainNodeFetcher` cursor from Postgres")?
};
Expand Down
6 changes: 3 additions & 3 deletions core/lib/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ codegen = { git = "https://github.com/matter-labs/solidity_plonk_verifier.git",
zkevm_test_harness = { git = "https://github.com/matter-labs/era-zkevm_test_harness.git", branch = "v1.3.3" }
zk_evm_1_4_0 = { git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0", package = "zk_evm" }
zk_evm = { git = "https://github.com/matter-labs/era-zk_evm.git", tag = "v1.3.3-rc2" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }

anyhow = "1.0.75"
chrono = { version = "0.4", features = ["serde"] }
Expand Down Expand Up @@ -55,4 +55,4 @@ tokio = { version = "1", features = ["rt", "macros"] }
serde_with = { version = "1", features = ["hex"] }

[build-dependencies]
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
12 changes: 6 additions & 6 deletions core/lib/zksync_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ vlog = { path = "../vlog" }

multivm = { path = "../multivm" }
# Consensus dependenices
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }

prost = "0.12.1"
serde = { version = "1.0", features = ["derive"] }
Expand Down Expand Up @@ -98,4 +98,4 @@ tempfile = "3.0.2"
test-casing = "0.1.2"

[build-dependencies]
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "bdf9ed0af965cc7fa32d6c46a35ea065779ede8b" }
zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ed71b2e817c980a2daffef6a01885219e1dc6fa0" }
8 changes: 2 additions & 6 deletions core/lib/zksync_core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_roles::validator;
use zksync_types::block::ConsensusBlockFields;
use zksync_types::{Address, MiniblockNumber};
//! Consensus-related functionality.
mod payload;
mod proto;

pub(crate) use payload::Payload;
pub(crate) use self::payload::Payload;
39 changes: 23 additions & 16 deletions core/lib/zksync_core/src/consensus/payload.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use anyhow::Context as _;

use zksync_consensus_roles::validator;
use zksync_protobuf::{required, ProtoFmt};
use zksync_types::api::en::SyncBlock;
use zksync_types::{Address, L1BatchNumber, Transaction, H256};

/// L2 block (= miniblock) payload.
#[derive(Debug)]
pub(crate) struct Payload {
pub hash: H256,
pub l1_batch_number: L1BatchNumber,
Expand All @@ -17,28 +20,31 @@ pub(crate) struct Payload {

impl ProtoFmt for Payload {
type Proto = super::proto::Payload;
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
let mut transactions = vec![];
for (i, t) in r.transactions.iter().enumerate() {

fn read(message: &Self::Proto) -> anyhow::Result<Self> {
let mut transactions = Vec::with_capacity(message.transactions.len());
for (i, tx) in message.transactions.iter().enumerate() {
transactions.push(
required(&t.json)
.and_then(|s| Ok(serde_json::from_str(&*s)?))
required(&tx.json)
.and_then(|json_str| Ok(serde_json::from_str(json_str)?))
.with_context(|| format!("transaction[{i}]"))?,
);
}

Ok(Self {
hash: required(&r.hash)
.and_then(|h| Ok(<[u8; 32]>::try_from(h.as_slice())?.into()))
hash: required(&message.hash)
.and_then(|bytes| Ok(<[u8; 32]>::try_from(bytes.as_slice())?.into()))
.context("hash")?,
l1_batch_number: L1BatchNumber(
*required(&r.l1_batch_number).context("l1_batch_number")?,
*required(&message.l1_batch_number).context("l1_batch_number")?,
),
timestamp: *required(&r.timestamp).context("timestamp")?,
l1_gas_price: *required(&r.l1_gas_price).context("l1_gas_price")?,
l2_fair_gas_price: *required(&r.l2_fair_gas_price).context("l2_fair_gas_price")?,
virtual_blocks: *required(&r.virtual_blocks).context("virtual_blocks")?,
operator_address: required(&r.operator_address)
.and_then(|a| Ok(<[u8; 20]>::try_from(a.as_slice())?.into()))
timestamp: *required(&message.timestamp).context("timestamp")?,
l1_gas_price: *required(&message.l1_gas_price).context("l1_gas_price")?,
l2_fair_gas_price: *required(&message.l2_fair_gas_price)
.context("l2_fair_gas_price")?,
virtual_blocks: *required(&message.virtual_blocks).context("virtual_blocks")?,
operator_address: required(&message.operator_address)
.and_then(|bytes| Ok(<[u8; 20]>::try_from(bytes.as_slice())?.into()))
.context("operator_address")?,
transactions,
})
Expand Down Expand Up @@ -67,6 +73,7 @@ impl ProtoFmt for Payload {

impl TryFrom<SyncBlock> for Payload {
type Error = anyhow::Error;

fn try_from(block: SyncBlock) -> anyhow::Result<Self> {
Ok(Self {
hash: block.hash.unwrap_or_default(),
Expand All @@ -82,8 +89,8 @@ impl TryFrom<SyncBlock> for Payload {
}

impl Payload {
pub fn decode(p: &validator::Payload) -> anyhow::Result<Self> {
zksync_protobuf::decode(&p.0)
pub fn decode(payload: &validator::Payload) -> anyhow::Result<Self> {
zksync_protobuf::decode(&payload.0)
}

pub fn encode(&self) -> validator::Payload {
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use zksync_verification_key_server::get_cached_commitments;
pub mod api_server;
pub mod basic_witness_input_producer;
pub mod block_reverter;
mod consensus;
pub mod consistency_checker;
pub mod data_fetchers;
pub mod eth_sender;
Expand Down
47 changes: 33 additions & 14 deletions core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ impl IoSealCriteria for ExternalIO {
}

fn should_seal_miniblock(&mut self, _manager: &UpdatesManager) -> bool {
matches!(self.actions.peek_action(), Some(SyncAction::SealMiniblock))
matches!(
self.actions.peek_action(),
Some(SyncAction::SealMiniblock(_))
)
}
}

Expand Down Expand Up @@ -368,7 +371,7 @@ impl StateKeeperIO for ExternalIO {
virtual_blocks,
});
}
Some(SyncAction::SealBatch { virtual_blocks }) => {
Some(SyncAction::SealBatch { virtual_blocks, .. }) => {
// We've reached the next batch, so this situation would be handled by the batch sealer.
// No need to pop the action from the queue.
// It also doesn't matter which timestamp we return, since there will be no more miniblocks in this
Expand Down Expand Up @@ -434,12 +437,9 @@ impl StateKeeperIO for ExternalIO {
}

async fn seal_miniblock(&mut self, updates_manager: &UpdatesManager) {
match self.actions.pop_action() {
Some(SyncAction::SealMiniblock) => {}
other => panic!(
"State keeper requested to seal miniblock, but the next action is {:?}",
other
),
let action = self.actions.pop_action();
let Some(SyncAction::SealMiniblock(consensus)) = action else {
panic!("State keeper requested to seal miniblock, but the next action is {action:?}");
};

let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();
Expand Down Expand Up @@ -481,6 +481,16 @@ impl StateKeeperIO for ExternalIO {
self.l2_erc20_bridge_addr,
);
command.seal(&mut transaction).await;

// We want to add miniblock consensus fields atomically with the miniblock data so that we
// don't need to deal with corner cases (e.g., a miniblock w/o consensus fields).
if let Some(consensus) = &consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.current_miniblock_number, consensus)
.await
.unwrap();
}
transaction.commit().await.unwrap();

self.sync_state
Expand All @@ -497,23 +507,32 @@ impl StateKeeperIO for ExternalIO {
l1_batch_env: &L1BatchEnv,
finished_batch: FinishedL1Batch,
) -> anyhow::Result<()> {
match self.actions.pop_action() {
Some(SyncAction::SealBatch { .. }) => {}
other => anyhow::bail!(
"State keeper requested to seal the batch, but the next action is {other:?}"
),
let action = self.actions.pop_action();
let Some(SyncAction::SealBatch { consensus, .. }) = action else {
anyhow::bail!(
"State keeper requested to seal the batch, but the next action is {action:?}"
);
};

let mut storage = self.pool.access_storage_tagged("sync_layer").await.unwrap();
let mut transaction = storage.start_transaction().await.unwrap();
updates_manager
.seal_l1_batch(
&mut storage,
&mut transaction,
self.current_miniblock_number,
l1_batch_env,
finished_batch,
self.l2_erc20_bridge_addr,
)
.await;
if let Some(consensus) = &consensus {
transaction
.blocks_dal()
.set_miniblock_consensus_fields(self.current_miniblock_number, consensus)
.await
.unwrap();
}
transaction.commit().await.unwrap();

tracing::info!("Batch {} is sealed", self.current_l1_batch_number);

Expand Down
Loading

0 comments on commit a49b61d

Please sign in to comment.