Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Implement gossip fetcher #371

Merged
merged 43 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
bbd3331
Move `BufferedStorage` from consensus module
slowli Oct 24, 2023
371d935
Sketch Postgres-based `BlockStore`
slowli Oct 25, 2023
259585b
Sketch gossip-based block fetcher
slowli Oct 26, 2023
cac0127
Update fetched block conversions
slowli Oct 30, 2023
e146c5d
Test Postgres-based block store implementation
slowli Oct 30, 2023
8d59f5c
Test gossip-based fetcher
slowli Oct 31, 2023
6e3127f
Refactor and add more tests for gossip fetcher
slowli Nov 1, 2023
2ba0f40
Use git revision of consensus deps
slowli Nov 1, 2023
b033e94
Brush up `start_gossip_fetcher()` entrypoint
slowli Nov 1, 2023
dfe730a
Update from upstream
slowli Nov 1, 2023
5dff11c
Simplify `ActionQueue` management in tests
slowli Nov 1, 2023
0e8c651
Use published version of `test-casing`
slowli Nov 2, 2023
0033abb
Add metrics for gossip fetcher
slowli Nov 2, 2023
3e0bc81
Update revision for consensus deps
slowli Nov 2, 2023
7d89e00
Add consensus-specific fields for miniblocks
slowli Nov 2, 2023
2ee6142
Use new fields in block conversions
slowli Nov 2, 2023
e46f1e4
Sketch consensus fields storage in seal logic
slowli Nov 3, 2023
9371f97
Update gossip fetcher tests
slowli Nov 3, 2023
2574d35
Test consensus fields persistence
slowli Nov 3, 2023
4481106
Update revision of consensus deps
slowli Nov 3, 2023
97e4a9a
Resolve FIXMEs in fetcher logic
slowli Nov 3, 2023
ab79a15
Fix newline ending in new SQL query
slowli Nov 3, 2023
368df8b
Update from upstream
slowli Nov 3, 2023
691171b
Sketch more advanced "last block in batch" logic
slowli Nov 6, 2023
88c26f8
Fix background tasks for `Buffered` storage
slowli Nov 6, 2023
5995eec
Do not propagate cancellations in background tasks
slowli Nov 6, 2023
b9fb194
Update cancellation logic in line with consensus repo
slowli Nov 7, 2023
530bcdf
Fix issues in `storage` module
slowli Nov 7, 2023
b618aaf
Create `CommitQCBytes` wrapper
slowli Nov 7, 2023
a7de013
Rework `FetchedBlock::from_gossip_block()`
slowli Nov 7, 2023
8e61d1a
Update from upstream
slowli Nov 13, 2023
c12ff08
Update revision of consensus crates
slowli Nov 13, 2023
0678d56
Use `Ctx::wait()` instead of `tokio::select!`
slowli Nov 13, 2023
c651cc4
Update from upstream
slowli Nov 17, 2023
f03e79a
Update `sync_layer` logic
slowli Nov 17, 2023
986ba28
Remove unused `sqlx` query
slowli Nov 17, 2023
4b0c920
Remove consensus fields from sealing logic
slowli Nov 17, 2023
195d921
Update prover workspace lockfile
slowli Nov 17, 2023
783556b
Update consensus deps revision
slowli Nov 17, 2023
ee180fc
Propagate context awareness further in `PostgresBlockStorage`
slowli Nov 20, 2023
c2c8d18
Remove `ctx.wait()` in `schedule_next_block()`
slowli Nov 20, 2023
215e021
Update from upstream
slowli Nov 20, 2023
78d4944
Document data flow
slowli Nov 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
874 changes: 819 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use zksync_core::{
setup_sigint_handler,
state_keeper::{L1BatchExecutorBuilder, MainBatchExecutorBuilder, ZkSyncStateKeeper},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO,
fetcher::MainNodeFetcherCursor, genesis::perform_genesis_if_needed, ActionQueue,
MainNodeClient, SyncState,
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
genesis::perform_genesis_if_needed, ActionQueue, MainNodeClient, SyncState,
},
};
use zksync_dal::{connection::DbVariant, healthcheck::ConnectionPoolHealthCheck, ConnectionPool};
Expand Down Expand Up @@ -129,7 +128,7 @@ async fn init_tasks(
.await
.context("failed to build a connection pool for `MainNodeFetcher`")?;
let mut storage = pool.access_storage_tagged("sync_layer").await?;
MainNodeFetcherCursor::new(&mut storage)
FetcherCursor::new(&mut storage)
.await
.context("failed to load `MainNodeFetcher` cursor from Postgres")?
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE miniblocks
DROP COLUMN IF EXISTS commit_qc;
ALTER TABLE miniblocks
DROP COLUMN IF EXISTS prev_consensus_block_hash;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE miniblocks
ADD COLUMN commit_qc BYTEA NULL;
ALTER TABLE miniblocks
ADD COLUMN prev_consensus_block_hash BYTEA NULL;
210 changes: 118 additions & 92 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -1344,6 +1344,110 @@
},
"query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET status='queued'\n WHERE (l1_batch_number, circuit_id, depth) IN\n (SELECT prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id, prover_jobs_fri.depth\n FROM prover_jobs_fri\n JOIN node_aggregation_witness_jobs_fri nawj ON\n prover_jobs_fri.l1_batch_number = nawj.l1_batch_number\n AND prover_jobs_fri.circuit_id = nawj.circuit_id\n AND prover_jobs_fri.depth = nawj.depth\n WHERE nawj.status = 'waiting_for_proofs'\n AND prover_jobs_fri.status = 'successful'\n AND prover_jobs_fri.aggregation_round = 1\n AND prover_jobs_fri.depth = 0\n GROUP BY prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id, prover_jobs_fri.depth, nawj.number_of_dependent_jobs\n HAVING COUNT(*) = nawj.number_of_dependent_jobs)\n RETURNING l1_batch_number, circuit_id, depth;\n "
},
"1dc3019f127fd7aa760489457b6eba8dbbde21e03927e4ca71ebb6ab859ac8d1": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "l1_batch_number!",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "last_batch_miniblock?",
"ordinal": 2,
"type_info": "Int8"
},
{
"name": "timestamp",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "root_hash?",
"ordinal": 4,
"type_info": "Bytea"
},
{
"name": "l1_gas_price",
"ordinal": 5,
"type_info": "Int8"
},
{
"name": "l2_fair_gas_price",
"ordinal": 6,
"type_info": "Int8"
},
{
"name": "bootloader_code_hash",
"ordinal": 7,
"type_info": "Bytea"
},
{
"name": "default_aa_code_hash",
"ordinal": 8,
"type_info": "Bytea"
},
{
"name": "virtual_blocks",
"ordinal": 9,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 10,
"type_info": "Bytea"
},
{
"name": "commit_qc",
"ordinal": 11,
"type_info": "Bytea"
},
{
"name": "prev_consensus_block_hash",
"ordinal": 12,
"type_info": "Bytea"
},
{
"name": "protocol_version!",
"ordinal": 13,
"type_info": "Int4"
},
{
"name": "fee_account_address?",
"ordinal": 14,
"type_info": "Bytea"
}
],
"nullable": [
false,
null,
null,
false,
false,
false,
false,
true,
true,
false,
false,
true,
true,
true,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "SELECT miniblocks.number, COALESCE(miniblocks.l1_batch_number, (SELECT (max(number) + 1) FROM l1_batches)) as \"l1_batch_number!\", (SELECT max(m2.number) FROM miniblocks m2 WHERE miniblocks.l1_batch_number = m2.l1_batch_number) as \"last_batch_miniblock?\", miniblocks.timestamp, miniblocks.hash as \"root_hash?\", miniblocks.l1_gas_price, miniblocks.l2_fair_gas_price, miniblocks.bootloader_code_hash, miniblocks.default_aa_code_hash, miniblocks.virtual_blocks, miniblocks.hash, miniblocks.commit_qc, miniblocks.prev_consensus_block_hash, miniblocks.protocol_version as \"protocol_version!\", l1_batches.fee_account_address as \"fee_account_address?\" FROM miniblocks LEFT JOIN l1_batches ON miniblocks.l1_batch_number = l1_batches.number WHERE miniblocks.number = $1"
},
"1ed353a16e8d0abaf426e5c235b20a79c727c08bc23fb1708a833a6930131691": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -3887,98 +3991,6 @@
},
"query": "UPDATE eth_txs SET has_failed = TRUE WHERE id = $1"
},
"5190fad25f0c476380af4013761d42ae97dbd55f87e38ceec33f8e148c5cbb14": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "l1_batch_number!",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "last_batch_miniblock?",
"ordinal": 2,
"type_info": "Int8"
},
{
"name": "timestamp",
"ordinal": 3,
"type_info": "Int8"
},
{
"name": "root_hash?",
"ordinal": 4,
"type_info": "Bytea"
},
{
"name": "l1_gas_price",
"ordinal": 5,
"type_info": "Int8"
},
{
"name": "l2_fair_gas_price",
"ordinal": 6,
"type_info": "Int8"
},
{
"name": "bootloader_code_hash",
"ordinal": 7,
"type_info": "Bytea"
},
{
"name": "default_aa_code_hash",
"ordinal": 8,
"type_info": "Bytea"
},
{
"name": "virtual_blocks",
"ordinal": 9,
"type_info": "Int8"
},
{
"name": "hash",
"ordinal": 10,
"type_info": "Bytea"
},
{
"name": "protocol_version!",
"ordinal": 11,
"type_info": "Int4"
},
{
"name": "fee_account_address?",
"ordinal": 12,
"type_info": "Bytea"
}
],
"nullable": [
false,
null,
null,
false,
false,
false,
false,
true,
true,
false,
false,
true,
false
],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "\n SELECT miniblocks.number,\n COALESCE(miniblocks.l1_batch_number, (SELECT (max(number) + 1) FROM l1_batches)) as \"l1_batch_number!\",\n (SELECT max(m2.number) FROM miniblocks m2 WHERE miniblocks.l1_batch_number = m2.l1_batch_number) as \"last_batch_miniblock?\",\n miniblocks.timestamp,\n miniblocks.hash as \"root_hash?\",\n miniblocks.l1_gas_price,\n miniblocks.l2_fair_gas_price,\n miniblocks.bootloader_code_hash,\n miniblocks.default_aa_code_hash,\n miniblocks.virtual_blocks,\n miniblocks.hash,\n miniblocks.protocol_version as \"protocol_version!\",\n l1_batches.fee_account_address as \"fee_account_address?\"\n FROM miniblocks\n LEFT JOIN l1_batches ON miniblocks.l1_batch_number = l1_batches.number\n WHERE miniblocks.number = $1\n "
},
"51cb712685991ffd600dce59f5ed8b5a1bfce8feed46ebd02471c43802e6e65a": {
"describe": {
"columns": [
Expand Down Expand Up @@ -8733,6 +8745,20 @@
},
"query": "SELECT * FROM eth_txs_history WHERE eth_tx_id = $1 ORDER BY created_at DESC LIMIT 1"
},
"ad48c26546edaa5f872e5698eb0b0d3ced291db178dd6c065fe6a39def4ea751": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Bytea"
]
}
},
"query": "UPDATE miniblocks SET prev_consensus_block_hash = $2, commit_qc = $3 WHERE number = $1"
},
"ad4f74aa6f131df0243f4fa500ade1b98aa335bd71ed417b02361e2c697e60f8": {
"describe": {
"columns": [],
Expand Down
26 changes: 25 additions & 1 deletion core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sqlx::Row;

use zksync_types::{
aggregated_operations::AggregatedActionType,
block::{BlockGasCount, L1BatchHeader, MiniblockHeader},
block::{BlockGasCount, ConsensusBlockFields, L1BatchHeader, MiniblockHeader},
commitment::{L1BatchMetadata, L1BatchWithMetadata},
Address, L1BatchNumber, LogQuery, MiniblockNumber, ProtocolVersionId, H256,
MAX_GAS_PER_PUBDATA_BYTE, U256,
Expand Down Expand Up @@ -466,6 +466,30 @@ impl BlocksDal<'_, '_> {
Ok(())
}

/// Sets consensus-related fields for the specified miniblock.
pub async fn set_miniblock_consensus_fields(
&mut self,
miniblock_number: MiniblockNumber,
consensus: &ConsensusBlockFields,
) -> anyhow::Result<()> {
let result = sqlx::query!(
"UPDATE miniblocks \
SET prev_consensus_block_hash = $2, commit_qc = $3 \
WHERE number = $1",
miniblock_number.0 as i64,
consensus.prev_block_hash.as_bytes(),
&consensus.commit_qc_bytes.0
)
.execute(self.storage.conn())
.await?;

anyhow::ensure!(
result.rows_affected() == 1,
"Miniblock #{miniblock_number} is not present in Postgres"
);
Ok(())
}

pub async fn update_hashes(
&mut self,
number_and_hashes: &[(MiniblockNumber, H256)],
Expand Down
18 changes: 15 additions & 3 deletions core/lib/dal/src/models/storage_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::convert::TryInto;

use zksync_contracts::BaseSystemContractsHashes;
use zksync_types::api::en::SyncBlock;
use zksync_types::Transaction;
use zksync_types::{Address, L1BatchNumber, MiniblockNumber, H256};
use zksync_types::{
block::ConsensusBlockFields, Address, Bytes, L1BatchNumber, MiniblockNumber, Transaction, H256,
};

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StorageSyncBlock {
Expand All @@ -22,6 +23,8 @@ pub struct StorageSyncBlock {
pub protocol_version: i32,
pub virtual_blocks: i64,
pub hash: Vec<u8>,
pub commit_qc: Option<Vec<u8>>,
pub prev_consensus_block_hash: Option<Vec<u8>>,
}

impl StorageSyncBlock {
Expand All @@ -30,12 +33,15 @@ impl StorageSyncBlock {
current_operator_address: Address,
transactions: Option<Vec<Transaction>>,
) -> SyncBlock {
let number = self.number;
let commit_qc = self.commit_qc;

SyncBlock {
number: MiniblockNumber(self.number as u32),
l1_batch_number: L1BatchNumber(self.l1_batch_number as u32),
last_in_batch: self
.last_batch_miniblock
.map(|n| n == self.number)
.map(|n| n == number)
.unwrap_or(false),
timestamp: self.timestamp as u64,
root_hash: self.root_hash.as_deref().map(H256::from_slice),
Expand All @@ -60,6 +66,12 @@ impl StorageSyncBlock {
virtual_blocks: Some(self.virtual_blocks as u32),
hash: Some(H256::from_slice(&self.hash)),
protocol_version: (self.protocol_version as u16).try_into().unwrap(),
consensus: self.prev_consensus_block_hash.and_then(|hash| {
Some(ConsensusBlockFields {
prev_block_hash: H256::from_slice(&hash),
commit_qc_bytes: Bytes(commit_qc?),
})
}),
}
}
}
36 changes: 18 additions & 18 deletions core/lib/dal/src/sync_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ impl SyncDal<'_, '_> {
let latency = MethodLatency::new("sync_dal_sync_block");
let storage_block_details = sqlx::query_as!(
StorageSyncBlock,
r#"
SELECT miniblocks.number,
COALESCE(miniblocks.l1_batch_number, (SELECT (max(number) + 1) FROM l1_batches)) as "l1_batch_number!",
(SELECT max(m2.number) FROM miniblocks m2 WHERE miniblocks.l1_batch_number = m2.l1_batch_number) as "last_batch_miniblock?",
miniblocks.timestamp,
miniblocks.hash as "root_hash?",
miniblocks.l1_gas_price,
miniblocks.l2_fair_gas_price,
miniblocks.bootloader_code_hash,
miniblocks.default_aa_code_hash,
miniblocks.virtual_blocks,
miniblocks.hash,
miniblocks.protocol_version as "protocol_version!",
l1_batches.fee_account_address as "fee_account_address?"
FROM miniblocks
LEFT JOIN l1_batches ON miniblocks.l1_batch_number = l1_batches.number
WHERE miniblocks.number = $1
"#,
"SELECT miniblocks.number, \
COALESCE(miniblocks.l1_batch_number, (SELECT (max(number) + 1) FROM l1_batches)) as \"l1_batch_number!\", \
(SELECT max(m2.number) FROM miniblocks m2 WHERE miniblocks.l1_batch_number = m2.l1_batch_number) as \"last_batch_miniblock?\", \
miniblocks.timestamp, \
miniblocks.hash as \"root_hash?\", \
miniblocks.l1_gas_price, \
miniblocks.l2_fair_gas_price, \
miniblocks.bootloader_code_hash, \
miniblocks.default_aa_code_hash, \
miniblocks.virtual_blocks, \
miniblocks.hash, \
miniblocks.commit_qc, \
miniblocks.prev_consensus_block_hash, \
miniblocks.protocol_version as \"protocol_version!\", \
l1_batches.fee_account_address as \"fee_account_address?\" \
FROM miniblocks \
LEFT JOIN l1_batches ON miniblocks.l1_batch_number = l1_batches.number \
WHERE miniblocks.number = $1",
block_number.0 as i64
)
.instrument("sync_dal_sync_block.block")
Expand Down
Loading
Loading