Skip to content

Commit

Permalink
feat(consensus): enabled syncing pregenesis blocks over p2p (#3192)
Browse files Browse the repository at this point in the history
It was already implemented, just not enabled, because the blockMetadata
RPC has to be whitelisted first.
  • Loading branch information
pompon0 authored Oct 29, 2024
1 parent 5161eed commit 6adb224
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 79 deletions.
22 changes: 1 addition & 21 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@ pub(super) struct EN {
impl EN {
/// Task running a consensus node for the external node.
/// It may be a validator, but it cannot be a leader (cannot propose blocks).
///
/// If `enable_pregenesis` is false,
/// before starting the consensus node it fetches all the blocks
/// older than consensus genesis from the main node using json RPC.
/// NOTE: currently `enable_pregenesis` is hardcoded to `false` in `era.rs`.
/// True is used only in tests. Once the `block_metadata` RPC is enabled everywhere
/// this flag should be removed and fetching pregenesis blocks will always be done
/// over the gossip network.
pub async fn run(
self,
ctx: &ctx::Ctx,
actions: ActionQueueSender,
cfg: ConsensusConfig,
secrets: ConsensusSecrets,
build_version: Option<semver::Version>,
enable_pregenesis: bool,
) -> anyhow::Result<()> {
let attester = config::attester_key(&secrets).context("attester_key")?;

Expand All @@ -74,24 +65,13 @@ impl EN {
.await
.wrap("try_update_global_config()")?;

let mut payload_queue = conn
let payload_queue = conn
.new_payload_queue(ctx, actions, self.sync_state.clone())
.await
.wrap("new_payload_queue()")?;

drop(conn);

// Fetch blocks before the genesis.
if !enable_pregenesis {
self.fetch_blocks(
ctx,
&mut payload_queue,
Some(global_config.genesis.first_block),
)
.await
.wrap("fetch_blocks()")?;
}

// Monitor the genesis of the main node.
// If it changes, it means that a hard fork occurred and we need to reset the consensus state.
s.spawn_bg::<()>({
Expand Down
14 changes: 2 additions & 12 deletions core/node/consensus/src/era.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,8 @@ pub async fn run_external_node(
is_validator = secrets.validator_key.is_some(),
"running external node"
);
// We will enable it once the main node on all envs supports
// `block_metadata()` JSON RPC method.
let enable_pregenesis = false;
en.run(
ctx,
actions,
cfg,
secrets,
Some(build_version),
enable_pregenesis,
)
.await
en.run(ctx, actions, cfg, secrets, Some(build_version))
.await
}
None => {
tracing::info!("running fetcher");
Expand Down
11 changes: 1 addition & 10 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub(super) struct ConfigSet {
net: network::Config,
pub(super) config: config::ConsensusConfig,
pub(super) secrets: config::ConsensusSecrets,
pub(super) enable_pregenesis: bool,
}

impl ConfigSet {
Expand All @@ -83,17 +82,11 @@ impl ConfigSet {
config: make_config(&net, None),
secrets: make_secrets(&net, None),
net,
enable_pregenesis: self.enable_pregenesis,
}
}
}

pub(super) fn new_configs(
rng: &mut impl Rng,
setup: &Setup,
seed_peers: usize,
pregenesis: bool,
) -> Vec<ConfigSet> {
pub(super) fn new_configs(rng: &mut impl Rng, setup: &Setup, seed_peers: usize) -> Vec<ConfigSet> {
let net_cfgs = network::testonly::new_configs(rng, setup, 0);
let genesis_spec = config::GenesisSpec {
chain_id: setup.genesis.chain_id.0.try_into().unwrap(),
Expand Down Expand Up @@ -133,7 +126,6 @@ pub(super) fn new_configs(
config: make_config(&net, Some(genesis_spec.clone())),
secrets: make_secrets(&net, setup.attester_keys.get(i).cloned()),
net,
enable_pregenesis: pregenesis,
})
.collect()
}
Expand Down Expand Up @@ -473,7 +465,6 @@ impl StateKeeper {
cfgs.config,
cfgs.secrets,
cfgs.net.build_version,
cfgs.enable_pregenesis,
)
.await
}
Expand Down
10 changes: 5 additions & 5 deletions core/node/consensus/src/tests/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Context as _;
use rand::Rng as _;
use test_casing::{test_casing, Product};
use test_casing::test_casing;
use tracing::Instrument as _;
use zksync_concurrency::{ctx, error::Wrap, scope};
use zksync_consensus_roles::{
Expand All @@ -12,7 +12,7 @@ use zksync_test_account::Account;
use zksync_types::ProtocolVersionId;
use zksync_web3_decl::namespaces::EnNamespaceClient as _;

use super::{POLL_INTERVAL, PREGENESIS, VERSIONS};
use super::{POLL_INTERVAL, VERSIONS};
use crate::{
mn::run_main_node,
registry::{testonly, Registry},
Expand Down Expand Up @@ -126,9 +126,9 @@ async fn test_attestation_status_api(version: ProtocolVersionId) {
// Test running a couple of attesters (which are also validators).
// Main node is expected to collect all certificates.
// External nodes are expected to just vote for the batch.
#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_multiple_attesters(version: ProtocolVersionId, pregenesis: bool) {
async fn test_multiple_attesters(version: ProtocolVersionId) {
const NODES: usize = 4;

zksync_concurrency::testonly::abort_on_panic();
Expand All @@ -137,7 +137,7 @@ async fn test_multiple_attesters(version: ProtocolVersionId, pregenesis: bool) {
let account = &mut Account::random();
let to_fund = &[account.address];
let setup = Setup::new(rng, 4);
let mut cfgs = new_configs(rng, &setup, NODES, pregenesis);
let mut cfgs = new_configs(rng, &setup, NODES);
scope::run!(ctx, |ctx, s| async {
let validator_pool = ConnectionPool::test(false, version).await;
let (mut validator, runner) = StateKeeper::new(ctx, validator_pool.clone()).await?;
Expand Down
56 changes: 25 additions & 31 deletions core/node/consensus/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mod attestation;

const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()];
const FROM_SNAPSHOT: [bool; 2] = [true, false];
const PREGENESIS: [bool; 2] = [true, false];
const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(500);

#[test_casing(2, VERSIONS)]
Expand Down Expand Up @@ -190,14 +189,14 @@ async fn test_validator_block_store(version: ProtocolVersionId) {
// In the current implementation, consensus certificates are created asynchronously
// for the L2 blocks constructed by the StateKeeper. This means that consensus actor
// is effectively just back filling the consensus certificates for the L2 blocks in storage.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_validator(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -254,14 +253,14 @@ async fn test_validator(from_snapshot: bool, version: ProtocolVersionId, pregene
}

// Test running a validator node and 2 full nodes recovered from different snapshots.
#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_nodes_from_various_snapshots(version: ProtocolVersionId, pregenesis: bool) {
async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

scope::run!(ctx, |ctx, s| async {
Expand Down Expand Up @@ -335,14 +334,14 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId, pregenesi
.unwrap();
}

#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let mut validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let mut validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -412,16 +411,16 @@ async fn test_config_change(from_snapshot: bool, version: ProtocolVersionId, pre
// Test running a validator node and a couple of full nodes.
// Validator is producing signed blocks and fetchers are expected to fetch
// them directly or indirectly.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) {
const NODES: usize = 2;

zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let account = &mut Account::random();

// topology:
Expand Down Expand Up @@ -500,16 +499,16 @@ async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId, pregen
}

// Test running external node (non-leader) validators.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) {
const NODES: usize = 3;

zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, NODES);
let cfgs = testonly::new_configs(rng, &setup, 1, pregenesis);
let cfgs = testonly::new_configs(rng, &setup, 1);
let account = &mut Account::random();

// Run all nodes in parallel.
Expand Down Expand Up @@ -583,18 +582,14 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId, pre
}

// Test fetcher back filling missing certs.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_p2p_fetcher_backfill_certs(
from_snapshot: bool,
version: ProtocolVersionId,
pregenesis: bool,
) {
async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -668,16 +663,16 @@ async fn test_p2p_fetcher_backfill_certs(
}

// Test temporary fetcher fetching blocks if a lot of certs are missing.
#[test_casing(8, Product((FROM_SNAPSHOT,VERSIONS,PREGENESIS)))]
#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))]
#[tokio::test]
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId, pregenesis: bool) {
async fn test_temporary_fetcher(from_snapshot: bool, version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
// We force certs to be missing on EN by having 1 of the validators permanently offline.
// This way no blocks will be finalized at all, so no one will have certs.
let setup = Setup::new(rng, 2);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -749,8 +744,7 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.));
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let pregenesis = true;
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down Expand Up @@ -797,14 +791,14 @@ async fn test_temporary_fetcher_termination(from_snapshot: bool, version: Protoc
.unwrap();
}

#[test_casing(4, Product((VERSIONS,PREGENESIS)))]
#[test_casing(2, VERSIONS)]
#[tokio::test]
async fn test_with_pruning(version: ProtocolVersionId, pregenesis: bool) {
async fn test_with_pruning(version: ProtocolVersionId) {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let setup = Setup::new(rng, 1);
let validator_cfg = testonly::new_configs(rng, &setup, 0, pregenesis)[0].clone();
let validator_cfg = testonly::new_configs(rng, &setup, 0)[0].clone();
let node_cfg = validator_cfg.new_fullnode(rng);
let account = &mut Account::random();

Expand Down

0 comments on commit 6adb224

Please sign in to comment.