diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index ad353249..80cf46cb 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -1,7 +1,13 @@ # Contribution Guidelines Hello! Thanks for your interest in joining the mission to accelerate the mass adoption of crypto for personal -sovereignty! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes! +sovereignty! We welcome contributions from anyone on the internet. + +Note, however, that all the contributions are subject to review, and not every contribution is guaranteed to be merged. +It is highly advised to reach out to developers (for example, by creating an issue) before preparing a significant +change in the codebase, and explicitly confirm that this contribution will be considered for merge. Otherwise, it is +possible to discover that a feature you have spent some time on does not align with the core team vision or capacity to +maintain a high quality of given submission long term. ## Ways to contribute diff --git a/node/Cargo.lock b/node/Cargo.lock index 8e33dab2..8f242b32 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3060,17 +3060,6 @@ dependencies = [ "syn 2.0.51", ] -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand 0.8.5", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" @@ -3870,7 +3859,6 @@ dependencies = [ "serde_json", "tempfile", "tokio", - "tokio-retry", "tower", "tracing", "tracing-subscriber", diff --git a/node/Cargo.toml b/node/Cargo.toml index d08616dc..80f955fe 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -81,7 +81,6 @@ test-casing = "0.1.0" thiserror = "1.0.40" time = "0.3.23" tokio = { version = "1.34.0", features = ["full"] } -tokio-retry = "0.3.0" tracing = { version = "0.1.37", features = ["attributes"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } kube = { version = "0.88.1", features = ["runtime", "derive"] } @@ -157,7 +156,5 @@ wildcard_dependencies = "warn" redundant_locals = "allow" needless_pass_by_ref_mut = "allow" box_default = "allow" -# remove once fix to https://github.com/rust-lang/rust-clippy/issues/11764 is available on CI. -map_identity = "allow" # &*x is not equivalent to x, because it affects borrowing in closures. borrow_deref_ref = "allow" diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 459b1e15..8d44909a 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -167,9 +167,9 @@ impl StateMachine { // The previous block was finalized, so we can propose a new block. _ => { let fork = &cfg.genesis().fork; - let (parent, number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (fork.first_parent, fork.first_block), + let number = match high_qc { + Some(qc) => qc.header().number.next(), + None => fork.first_block, }; // Defensively assume that PayloadManager cannot propose until the previous block is stored. if let Some(prev) = number.prev() { @@ -189,7 +189,6 @@ impl StateMachine { .observe(payload.0.len()); let proposal = validator::BlockHeader { number, - parent, payload: payload.hash(), }; (proposal, Some(payload)) diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index b74e4411..9974c5c8 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -37,13 +37,6 @@ async fn replica_prepare_sanity_yield_leader_prepare() { .unwrap() .unwrap(); assert_eq!(leader_prepare.msg.view(), &replica_prepare.view); - assert_eq!( - leader_prepare.msg.proposal.parent, - replica_prepare - .high_vote - .as_ref() - .map(|v| v.proposal.hash()), - ); assert_eq!( leader_prepare.msg.justification, util.new_prepare_qc(|msg| *msg = replica_prepare) diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index eb00cd9f..20ca6f3b 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -39,7 +39,7 @@ impl StateMachine { tracing::info!( "Finalized block {}: {:#?}", block.header().number, - block.header().hash(), + block.header().payload, ); self.config .block_store diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/leader_prepare.rs index 7e017bcb..daa57825 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/leader_prepare.rs @@ -42,8 +42,9 @@ pub(crate) enum Error { /// Invalid message. #[error("invalid message: {0:#}")] InvalidMessage(#[source] validator::LeaderPrepareVerifyError), - /// Previous proposal was not finalized. - + /// Leader proposed a block that was already pruned from replica's storage. + #[error("leader proposed a block that was already pruned from replica's storage")] + ProposalAlreadyPruned, /// Oversized payload. #[error("block proposal with an oversized payload (payload size: {payload_size})")] ProposalOversizedPayload { @@ -110,6 +111,14 @@ impl StateMachine { }); } + // Replica MUSTN'T vote for blocks which have been already pruned for storage. + // (because it won't be able to persist and broadcast them once finalized). + // TODO(gprusak): it should never happen, we should add safety checks to prevent + // pruning blocks not known to be finalized. + if message.proposal.number < self.config.block_store.subscribe().borrow().first { + return Err(Error::ProposalAlreadyPruned); + } + // ----------- Checking the message -------------- signed_message.verify().map_err(Error::InvalidSignature)?; diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 3866e813..c1b5be08 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -7,7 +7,7 @@ use assert_matches::assert_matches; use rand::Rng; use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::{ - self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber, + self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, }; /// Sanity check of the happy path. @@ -101,10 +101,6 @@ async fn leader_prepare_invalid_leader() { let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - let view = ViewNumber(2); - util.set_view(view); - assert_eq!(util.view_leader(view), util.keys[0].public()); - let replica_prepare = util.new_replica_prepare(); assert!(util .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) @@ -167,6 +163,35 @@ async fn leader_prepare_old_view() { .unwrap(); } +#[tokio::test] +async fn leader_prepare_pruned_block() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut leader_prepare = util.new_leader_prepare(ctx).await; + // We assume default replica state and nontrivial `genesis.fork.first_block` here. + leader_prepare.proposal.number = util + .replica + .config + .block_store + .subscribe() + .borrow() + .first + .prev() + .unwrap(); + let res = util + .process_leader_prepare(ctx, util.sign(leader_prepare)) + .await; + assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned)); + Ok(()) + }) + .await + .unwrap(); +} + /// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. #[tokio::test] async fn leader_prepare_invalid_payload() { @@ -338,33 +363,6 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { .unwrap(); } -#[tokio::test] -async fn leader_prepare_bad_parent_hash() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Produce initial block."); - util.produce_block(ctx).await; - tracing::info!("Make leader propose the next block."); - let mut leader_prepare = util.new_leader_prepare(ctx).await; - tracing::info!("Modify the proposal.parent so that it doesn't match the previous block"); - leader_prepare.proposal.parent = Some(ctx.rng().gen()); - let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await; - assert_matches!(res, Err(leader_prepare::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::BadParentHash { got, want } - )) => { - assert_eq!(want, Some(leader_prepare.justification.high_qc().unwrap().message.proposal.hash())); - assert_eq!(got, leader_prepare.proposal.parent); - }); - Ok(()) - }) - .await - .unwrap(); -} - #[tokio::test] async fn leader_prepare_bad_block_number() { zksync_concurrency::testonly::abort_on_panic(); @@ -500,7 +498,7 @@ async fn leader_prepare_reproposal_invalid_block() { .unwrap(); } -/// Check that replica provides expecte high_vote and high_qc after finalizing a block. +/// Check that replica provides expected high_vote and high_qc after finalizing a block. #[tokio::test] async fn leader_commit_sanity_yield_replica_prepare() { zksync_concurrency::testonly::abort_on_panic(); diff --git a/node/actors/bft/src/testonly/fuzz.rs b/node/actors/bft/src/testonly/fuzz.rs index 077be553..fd0cdd4a 100644 --- a/node/actors/bft/src/testonly/fuzz.rs +++ b/node/actors/bft/src/testonly/fuzz.rs @@ -174,11 +174,9 @@ impl Fuzz for validator::Payload { impl Fuzz for validator::BlockHeader { fn mutate(&mut self, rng: &mut impl Rng) { - match rng.gen_range(0..3) { - 0 => self.parent = rng.gen(), - 1 => self.number = rng.gen(), - 2 => self.payload = rng.gen(), - _ => unreachable!(), + match rng.gen_range(0..2) { + 0 => self.number = rng.gen(), + _ => self.payload = rng.gen(), } } } diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 8880788c..29590cce 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -137,19 +137,6 @@ impl UTHarness { self.replica.view = view; } - pub(crate) fn set_view(&mut self, view: ViewNumber) { - self.set_replica_view(view); - self.set_leader_view(view); - } - - pub(crate) fn set_leader_view(&mut self, view: ViewNumber) { - self.leader.view = view - } - - pub(crate) fn set_replica_view(&mut self, view: ViewNumber) { - self.replica.view = view - } - pub(crate) fn replica_view(&self) -> validator::View { validator::View { protocol_version: self.protocol_version(), diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index f95b6641..2b34e7a0 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -5,32 +5,48 @@ use zksync_consensus_bft as bft; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ - testonly::{in_memory, new_store}, + testonly::{in_memory, new_store, new_store_with_first}, BlockStore, }; -fn make_executor(cfg: &network::Config, block_store: Arc) -> Executor { +fn config(cfg: &network::Config) -> Config { + Config { + server_addr: *cfg.server_addr, + public_addr: cfg.public_addr, + max_payload_size: usize::MAX, + node_key: cfg.gossip.key.clone(), + gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, + gossip_static_inbound: cfg.gossip.static_inbound.clone(), + gossip_static_outbound: cfg.gossip.static_outbound.clone(), + } +} + +fn validator( + cfg: &network::Config, + block_store: Arc, + replica_store: impl ReplicaStore, +) -> Executor { Executor { - config: Config { - server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, - max_payload_size: usize::MAX, - node_key: cfg.gossip.key.clone(), - gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, - gossip_static_inbound: cfg.gossip.static_inbound.clone(), - gossip_static_outbound: cfg.gossip.static_outbound.clone(), - }, + config: config(cfg), block_store, - validator: cfg.validator_key.as_ref().map(|key| Validator { - key: key.clone(), - replica_store: Box::new(in_memory::ReplicaStore::default()), + validator: Some(Validator { + key: cfg.validator_key.clone().unwrap(), + replica_store: Box::new(replica_store), payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), } } +fn fullnode(cfg: &network::Config, block_store: Arc) -> Executor { + Executor { + config: config(cfg), + block_store, + validator: None, + } +} + #[tokio::test] -async fn executing_single_validator() { +async fn test_single_validator() { abort_on_panic(); let ctx = &ctx::root(); let rng = &mut ctx.rng(); @@ -38,9 +54,10 @@ async fn executing_single_validator() { let setup = Setup::new(rng, 1); let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store.clone()).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store.clone(), replica_store).run(ctx)); store.wait_until_persisted(ctx, BlockNumber(5)).await?; Ok(()) }) @@ -49,7 +66,7 @@ async fn executing_single_validator() { } #[tokio::test] -async fn executing_validator_and_full_node() { +async fn test_fullnode_syncing_from_validator() { abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); let rng = &mut ctx.rng(); @@ -58,17 +75,73 @@ async fn executing_validator_and_full_node() { let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { // Spawn validator. + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); // Spawn full node. let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); // Wait for blocks in full node store. - store.wait_until_persisted(ctx, BlockNumber(5)).await?; + store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 5) + .await?; + Ok(()) + }) + .await + .unwrap(); +} + +/// Test in which validator is syncing missing blocks from a full node before producing blocks. +#[tokio::test] +async fn test_validator_syncing_from_fullnode() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); + let rng = &mut ctx.rng(); + + let setup = Setup::new(rng, 1); + let cfgs = new_configs(rng, &setup, 0); + scope::run!(ctx, |ctx, s| async { + // Spawn full node. + let (node_store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), node_store.clone()).run(ctx)); + + // Run validator and produce some blocks. + // Wait for the blocks to be fetched by the full node. + let replica_store = in_memory::ReplicaStore::default(); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store.clone()).run(ctx)); + node_store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 4) + .await?; + Ok(()) + }) + .await + .unwrap(); + + // Restart the validator with empty store (but preserved replica state) and non-trivial + // `store.state.first`. + // Validator should fetch the past blocks from the full node before producing next blocks. + let last_block = node_store + .subscribe() + .borrow() + .last + .as_ref() + .unwrap() + .header() + .number; + let (store, runner) = + new_store_with_first(ctx, &setup.genesis, setup.genesis.fork.first_block + 2).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); + node_store.wait_until_persisted(ctx, last_block + 3).await?; + Ok(()) }) .await diff --git a/node/actors/network/src/mux/reusable_stream.rs b/node/actors/network/src/mux/reusable_stream.rs index 882040db..381cf561 100644 --- a/node/actors/network/src/mux/reusable_stream.rs +++ b/node/actors/network/src/mux/reusable_stream.rs @@ -71,7 +71,7 @@ pub(crate) struct StreamQueue { } impl StreamQueue { - /// Constructs a new StreamQueue with the specificied number of reusable streams. + /// Constructs a new StreamQueue with the specified number of reusable streams. /// During multiplexer handshake, peers exchange information about /// how many reusable streams they support per capability. pub(crate) fn new(max_streams: u32) -> Arc { @@ -289,7 +289,7 @@ impl ReusableStream { read_receiver = new_read_receiver; let (write_lock, new_write_receiver) = sync::ExclusiveLock::new(write); write_receiver = new_write_receiver; - // Sending may fail because the requestor is not interested in the stream any more. + // Sending may fail because the requester is not interested in the stream any more. // In this case we just close the transient stream immediately. let _ = reservation.send(Stream { read: ReadStream(read_lock), diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index f57ea8bd..bc589ccb 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -63,17 +63,14 @@ impl PeerStates { /// Updates the known `BlockStore` state of the given peer. /// This information is used to decide from which peer to fetch - /// a given block from. + /// a given block. pub(crate) fn update( &self, peer: &node::PublicKey, state: BlockStoreState, ) -> anyhow::Result<()> { use std::collections::hash_map::Entry; - let Some(last) = &state.last else { - return Ok(()); - }; - last.verify(self.genesis()).context("state.last.verify()")?; + state.verify(self.genesis()).context("state.verify()")?; let mut peers = self.peers.lock().unwrap(); match peers.entry(peer.clone()) { Entry::Occupied(mut e) => e.get_mut().state = state.clone(), @@ -85,14 +82,16 @@ impl PeerStates { }); } } - self.highest_peer_block - .send_if_modified(|highest_peer_block| { - if *highest_peer_block >= last.header().number { - return false; - } - *highest_peer_block = last.header().number; - true - }); + if let Some(last) = &state.last { + self.highest_peer_block + .send_if_modified(|highest_peer_block| { + if *highest_peer_block >= last.header().number { + return false; + } + *highest_peer_block = last.header().number; + true + }); + } Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests/mod.rs b/node/actors/sync_blocks/src/peers/tests/mod.rs index d6c9d66d..ee28e436 100644 --- a/node/actors/sync_blocks/src/peers/tests/mod.rs +++ b/node/actors/sync_blocks/src/peers/tests/mod.rs @@ -98,3 +98,62 @@ async fn test_peer_states(test: T) { .await .unwrap(); } + +#[tokio::test] +async fn test_try_acquire_peer_permit() { + let clock = ctx::ManualClock::new(); + let ctx = &ctx::test_root(&clock); + let rng = &mut ctx.rng(); + let mut setup = validator::testonly::Setup::new(rng, 1); + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + let (send, _recv) = ctx::channel::unbounded(); + let peer_states = PeerStates::new(Config::default(), store, send); + + let peer: node::PublicKey = rng.gen(); + let b = &setup.blocks; + for s in [ + // Empty entry. + BlockStoreState { + first: b[0].number(), + last: None, + }, + // Entry with some blocks. + BlockStoreState { + first: b[0].number(), + last: Some(b[3].justification.clone()), + }, + // Entry with changed first. + BlockStoreState { + first: b[1].number(), + last: Some(b[3].justification.clone()), + }, + // Empty entry again. + BlockStoreState { + first: b[1].number(), + last: None, + }, + ] { + peer_states.update(&peer, s.clone()).unwrap(); + for block in b { + let got = peer_states + .try_acquire_peer_permit(block.number()) + .map(|p| p.0); + if s.first <= block.number() + && s.last + .as_ref() + .map_or(false, |last| block.number() <= last.header().number) + { + assert_eq!(Some(peer.clone()), got); + } else { + assert_eq!(None, got); + } + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/sync_blocks/src/peers/tests/snapshots.rs b/node/actors/sync_blocks/src/peers/tests/snapshots.rs deleted file mode 100644 index bdb7116d..00000000 --- a/node/actors/sync_blocks/src/peers/tests/snapshots.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Tests related to snapshot storage. - -use super::*; -use crate::tests::{send_block, sync_state}; -use zksync_consensus_network::io::GetBlockError; - -#[tokio::test] -async fn backfilling_peer_history() { - test_peer_states(BackfillingPeerHistory).await; -} diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 018b99ee..b078a804 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -13,7 +13,7 @@ use zksync_concurrency::{ testonly::{abort_on_panic, set_timeout}, }; use zksync_consensus_network as network; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_storage::testonly::new_store_with_first; type NetworkDispatcherPipe = pipe::DispatcherPipe; @@ -27,7 +27,16 @@ struct Node { impl Node { async fn new(ctx: &ctx::Ctx, network: network::Config, setup: &Setup) -> (Self, NodeRunner) { - let (store, store_runner) = new_store(ctx, &setup.genesis).await; + Self::new_with_first(ctx, network, setup, setup.genesis.fork.first_block).await + } + + async fn new_with_first( + ctx: &ctx::Ctx, + network: network::Config, + setup: &Setup, + first: validator::BlockNumber, + ) -> (Self, NodeRunner) { + let (store, store_runner) = new_store_with_first(ctx, &setup.genesis, first).await; let (start_send, start_recv) = channel::bounded(1); let (terminate_send, terminate_recv) = channel::bounded(1); @@ -331,3 +340,54 @@ impl GossipNetworkTest for SwitchingOnNodes { async fn switching_on_nodes(node_count: usize) { test_sync_blocks(SwitchingOnNodes { node_count }).await; } + +/// Test checking that nodes with different first block can synchronize. +#[tokio::test(flavor = "multi_thread")] +async fn test_different_first_block() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(25.)); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 2); + let n = 4; + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let mut nodes = vec![]; + // Spawn `n` nodes, all connected to each other. + for (i, net) in network::testonly::new_configs(rng, &setup, n) + .into_iter() + .enumerate() + { + // Choose the first block for the node at random. + let first = setup.blocks.choose(rng).unwrap().number(); + let (node, runner) = Node::new_with_first(ctx, net, &setup, first).await; + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + node.start(); + nodes.push(node); + } + // Randomize the order of nodes. + nodes.shuffle(rng); + + for block in &setup.blocks { + // Find nodes interested in the next block. + let interested_nodes: Vec<_> = nodes + .iter() + .filter(|n| n.store.subscribe().borrow().first <= block.number()) + .collect(); + // Store this block to one of them. + if let Some(node) = interested_nodes.choose(rng) { + node.store.queue_block(ctx, block.clone()).await.unwrap(); + } + // Wait until all remaining nodes get the new block. + for node in interested_nodes { + node.store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/libs/concurrency/src/ctx/clock.rs b/node/libs/concurrency/src/ctx/clock.rs index 31396ca9..68138b34 100644 --- a/node/libs/concurrency/src/ctx/clock.rs +++ b/node/libs/concurrency/src/ctx/clock.rs @@ -27,7 +27,7 @@ use std::{ }; use tokio::sync::watch; -// Instant doesn't have a deterministic contructor. +// Instant doesn't have a deterministic constructor. // However since Instant is not convertible to an unix timestamp, // we can snapshot Instant::now() once and treat it as a constant. // All observable effects will be then deterministic. diff --git a/node/libs/concurrency/src/limiter/mod.rs b/node/libs/concurrency/src/limiter/mod.rs index 57dc4e33..9f609b26 100644 --- a/node/libs/concurrency/src/limiter/mod.rs +++ b/node/libs/concurrency/src/limiter/mod.rs @@ -1,6 +1,6 @@ //! Rate limiter which supports delayed permit consumption. use crate::{ctx, sync, time}; -use std::sync::Mutex; +use std::{fmt, sync::Mutex}; #[cfg(test)] mod tests; @@ -102,6 +102,12 @@ pub struct Limiter { acquire: sync::Mutex>, } +impl fmt::Debug for Limiter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Limiter").finish_non_exhaustive() + } +} + /// Permit reservation returned by `Limit::acquire()`. /// Represents a set of reserved permits. /// You need to drop it for the permits to get consumed @@ -151,7 +157,7 @@ impl Limiter { } /// Acquires reservation for `permits` permits from the rate limiter. - /// It blocks until enought permits are available. + /// It blocks until enough permits are available. /// It is fair in a sense that in case a later acquire() call is /// executed, but for a smaller number of permits, it has to wait /// until the previous call (for a larger number of permits) completes. diff --git a/node/libs/concurrency/src/limiter/tests.rs b/node/libs/concurrency/src/limiter/tests.rs index 978c81db..713fc107 100644 --- a/node/libs/concurrency/src/limiter/tests.rs +++ b/node/libs/concurrency/src/limiter/tests.rs @@ -45,7 +45,7 @@ async fn immediate_permit_consumption() { } #[tokio::test] -async fn inifinite_refresh_rate() { +async fn infinite_refresh_rate() { testonly::abort_on_panic(); let clock = &ctx::ManualClock::new(); let ctx = &ctx::test_root(clock); diff --git a/node/libs/concurrency/src/scope/task.rs b/node/libs/concurrency/src/scope/task.rs index 85958dba..35b45dc9 100644 --- a/node/libs/concurrency/src/scope/task.rs +++ b/node/libs/concurrency/src/scope/task.rs @@ -30,7 +30,7 @@ //! Task can be either async or blocking: //! * Async tasks are Futures executed via `Task::run`. They MUSN'T call blocking operations, //! because they are executed on a shared thread pool. -//! * Blocking tasks are `FnOnce()` functions/closures exeucted via `Task::run_blocking`. Blocking +//! * Blocking tasks are `FnOnce()` functions/closures executed via `Task::run_blocking`. Blocking //! task MUST be executed on a dedicated thread rather than a shared thread pool. //! * All functions which perform blocking calls should be documented as blocking. //! If a function has multiple versions and the async version is called ``, then the sync @@ -117,7 +117,7 @@ impl Task { } /// Runs an sync blocking task in the scope. MUST be executed on a dedicated thread. - /// See `Task::run` for behavior. See module docs for desciption of blocking tasks. + /// See `Task::run` for behavior. See module docs for description of blocking tasks. pub(super) fn run_blocking(self, f: impl FnOnce() -> Result) -> Result { let panic_reporter = PanicReporter::new(self); let res = f(); diff --git a/node/libs/crypto/src/bn254/testonly.rs b/node/libs/crypto/src/bn254/testonly.rs index 8ee10d18..711f3a14 100644 --- a/node/libs/crypto/src/bn254/testonly.rs +++ b/node/libs/crypto/src/bn254/testonly.rs @@ -1,7 +1,7 @@ -//! Random key generation, intended for use in testing +//! Random key generation, intended for use in testing. use super::{AggregateSignature, PublicKey, SecretKey, Signature}; -use pairing::bn256::{Fr, G1, G2}; +use pairing::bn256::{Fr, G1}; use rand::{distributions::Standard, prelude::Distribution, Rng, RngCore}; use rand04::Rand; @@ -17,7 +17,6 @@ impl rand04::Rng for RngWrapper { } } -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let scalar = Fr::rand(&mut RngWrapper(rng)); @@ -25,15 +24,12 @@ impl Distribution for Standard { } } -/// Generates a random PublicKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> PublicKey { - let p = G2::rand(&mut RngWrapper(rng)); - PublicKey(p) + rng.gen::().public() } } -/// Generates a random Signature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let p = G1::rand(&mut RngWrapper(rng)); @@ -41,7 +37,6 @@ impl Distribution for Standard { } } -/// Generates a random AggregateSignature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { let p = G1::rand(&mut RngWrapper(rng)); diff --git a/node/libs/crypto/src/ed25519/testonly.rs b/node/libs/crypto/src/ed25519/testonly.rs index ecca0148..6b8d800f 100644 --- a/node/libs/crypto/src/ed25519/testonly.rs +++ b/node/libs/crypto/src/ed25519/testonly.rs @@ -1,6 +1,6 @@ //! Random key generation, intended for use in testing -use super::{SecretKey, Signature}; +use super::{PublicKey, SecretKey, Signature}; use crate::ByteFmt; use ed25519_dalek as ed; use rand::{ @@ -8,7 +8,6 @@ use rand::{ Rng, }; -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let raw: [u8; ed::SECRET_KEY_LENGTH] = rng.gen(); @@ -16,7 +15,12 @@ impl Distribution for Standard { } } -/// Generates a random Signature. This is meant for testing purposes. +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + rng.gen::().public() + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let key = rng.gen::(); diff --git a/node/libs/protobuf/src/proto_fmt.rs b/node/libs/protobuf/src/proto_fmt.rs index 53238a64..2031cefc 100644 --- a/node/libs/protobuf/src/proto_fmt.rs +++ b/node/libs/protobuf/src/proto_fmt.rs @@ -197,7 +197,7 @@ pub(super) fn read_fields( } /// Converts an encoded protobuf message to its canonical form, given the descriptor of the message -/// type. Retuns an error if: +/// type. Returns an error if: /// * an unknown field is detected /// * the message type doesn't support canonical encoding (implicit presence, map fields) pub fn canonical_raw( diff --git a/node/libs/roles/src/node/testonly.rs b/node/libs/roles/src/node/testonly.rs index 1d712ebb..c8efd30f 100644 --- a/node/libs/roles/src/node/testonly.rs +++ b/node/libs/roles/src/node/testonly.rs @@ -1,4 +1,4 @@ -use super::{Msg, MsgHash, SecretKey, SessionId, Signature, Signed}; +use super::{Msg, MsgHash, PublicKey, SecretKey, SessionId, Signature, Signed}; use rand::{ distributions::{Distribution, Standard}, Rng, @@ -27,6 +27,12 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + PublicKey(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { SecretKey(Arc::new(rng.gen())) diff --git a/node/libs/roles/src/proto/validator.proto b/node/libs/roles/src/proto/validator.proto index f1ec2438..2b1d6329 100644 --- a/node/libs/roles/src/proto/validator.proto +++ b/node/libs/roles/src/proto/validator.proto @@ -7,7 +7,6 @@ import "zksync/std.proto"; message Fork { optional uint64 number = 1; // required; ForkId optional uint64 first_block = 2; // required; BlockNumber - optional BlockHeaderHash first_parent = 3; // optional } message Genesis { @@ -23,17 +22,11 @@ message PayloadHash { optional bytes keccak256 = 1; // required } -message BlockHeaderHash { - optional bytes keccak256 = 1; // required -} - message BlockHeader { - // Hash of the parent Block. - optional BlockHeaderHash parent = 2; // optional // Sequential number of the block = parent.number + 1. - optional uint64 number = 3; // required + optional uint64 number = 1; // required // Hash of the block payload. - optional PayloadHash payload = 4; // required + optional PayloadHash payload = 2; // required } message FinalBlock { @@ -117,7 +110,7 @@ message NetAddress { // Currently the IP of the validator is static, but this scheme might also // be used to provide dynamic IP support (if validator discovers that its // own IP has changed - by pinging a trusted STUN server for example - it can - // broadcast a new discovery message), or (mutli)proxy support (a validator + // broadcast a new discovery message), or (multi)proxy support (a validator // may maintain a dynamic set of trusted proxy servers which forward traffic // to it - this way validator wouldn't have to have a public IP at all). optional uint64 version = 2; // required diff --git a/node/libs/roles/src/validator/conv.rs b/node/libs/roles/src/validator/conv.rs index 04886834..9e616320 100644 --- a/node/libs/roles/src/validator/conv.rs +++ b/node/libs/roles/src/validator/conv.rs @@ -1,7 +1,7 @@ use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use crate::{node::SessionId, proto::validator as proto}; @@ -17,14 +17,12 @@ impl ProtoFmt for Fork { Ok(Self { number: ForkNumber(*required(&r.number).context("number")?), first_block: BlockNumber(*required(&r.first_block).context("first_block")?), - first_parent: read_optional(&r.first_parent).context("first_parent")?, }) } fn build(&self) -> Self::Proto { Self::Proto { number: Some(self.number.0), first_block: Some(self.first_block.0), - first_parent: self.first_parent.as_ref().map(|x| x.build()), } } } @@ -64,18 +62,6 @@ impl ProtoFmt for GenesisHash { } } -impl ProtoFmt for BlockHeaderHash { - type Proto = proto::BlockHeaderHash; - fn read(r: &Self::Proto) -> anyhow::Result { - Ok(Self(ByteFmt::decode(required(&r.keccak256)?)?)) - } - fn build(&self) -> Self::Proto { - Self::Proto { - keccak256: Some(self.0.encode()), - } - } -} - impl ProtoFmt for PayloadHash { type Proto = proto::PayloadHash; fn read(r: &Self::Proto) -> anyhow::Result { @@ -92,14 +78,12 @@ impl ProtoFmt for BlockHeader { type Proto = proto::BlockHeader; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - parent: read_optional(&r.parent).context("parent")?, number: BlockNumber(*required(&r.number).context("number")?), payload: read_required(&r.payload).context("payload")?, }) } fn build(&self) -> Self::Proto { Self::Proto { - parent: self.parent.as_ref().map(ProtoFmt::build), number: Some(self.number.0), payload: Some(self.payload.build()), } diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 261035e5..43d49ac3 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -52,15 +52,13 @@ impl Payload { } /// Sequential number of the block. -/// Genesis block can have an arbitrary block number. -/// For blocks other than genesis: block.number = block.parent.number + 1. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockNumber(pub u64); impl BlockNumber { /// Returns the next block number. pub fn next(self) -> Self { - Self(self.0 + 1) + Self(self.0.checked_add(1).unwrap()) } /// Returns the previous block number. @@ -69,77 +67,28 @@ impl BlockNumber { } } -impl fmt::Display for BlockNumber { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, formatter) - } -} - -/// Hash of the block header. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct BlockHeaderHash(pub(crate) Keccak256); - -impl BlockHeaderHash { - /// Interprets the specified `bytes` as a block header hash digest (i.e., a reverse operation to [`Self::as_bytes()`]). - /// It is caller's responsibility to ensure that `bytes` are actually a block header hash digest. - pub fn from_bytes(bytes: [u8; 32]) -> Self { - Self(Keccak256::from_bytes(bytes)) - } - - /// Returns a reference to the bytes of this hash. - pub fn as_bytes(&self) -> &[u8; 32] { - self.0.as_bytes() - } -} - -impl TextFmt for BlockHeaderHash { - fn decode(text: Text) -> anyhow::Result { - text.strip("block_header_hash:keccak256:")? - .decode_hex() - .map(Self) - } - - fn encode(&self) -> String { - format!( - "block_header_hash:keccak256:{}", - hex::encode(ByteFmt::encode(&self.0)) - ) +impl std::ops::Add for BlockNumber { + type Output = BlockNumber; + fn add(self, n: u64) -> Self { + Self(self.0.checked_add(n).unwrap()) } } -impl fmt::Debug for BlockHeaderHash { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.write_str(&TextFmt::encode(self)) +impl fmt::Display for BlockNumber { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, formatter) } } /// A block header. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockHeader { - /// Hash of the parent block. - pub parent: Option, /// Number of the block. pub number: BlockNumber, /// Payload of the block. pub payload: PayloadHash, } -impl BlockHeader { - /// Returns the hash of the block. - pub fn hash(&self) -> BlockHeaderHash { - BlockHeaderHash(Keccak256::new(&zksync_protobuf::canonical(self))) - } - - /// Creates a child block for the given parent. - pub fn next(parent: &BlockHeader, payload: PayloadHash) -> Self { - Self { - parent: Some(parent.hash()), - number: parent.number.next(), - payload, - } - } -} - /// A block that has been finalized by the consensus protocol. #[derive(Clone, Debug, PartialEq, Eq)] pub struct FinalBlock { @@ -194,16 +143,6 @@ impl ByteFmt for FinalBlock { } } -impl TextFmt for FinalBlock { - fn decode(text: Text) -> anyhow::Result { - text.strip("final_block:")?.decode_hex() - } - - fn encode(&self) -> String { - format!("final_block:{}", hex::encode(ByteFmt::encode(self))) - } -} - /// Errors that can occur validating a `FinalBlock` received from a node. #[derive(Debug, thiserror::Error)] #[non_exhaustive] diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 870355b0..4261443e 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,7 +1,5 @@ //! Messages related to the consensus protocol. -use super::{ - BlockHeaderHash, BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare, -}; +use super::{BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare}; use crate::validator; use bit_vec::BitVec; use std::{ @@ -62,8 +60,6 @@ pub struct Fork { pub number: ForkNumber, /// First block of a fork. pub first_block: BlockNumber, - /// Parent fo the first block of a fork. - pub first_parent: Option, } impl Default for Fork { @@ -71,7 +67,6 @@ impl Default for Fork { Self { number: ForkNumber(0), first_block: BlockNumber(0), - first_parent: None, } } } diff --git a/node/libs/roles/src/validator/messages/leader_prepare.rs b/node/libs/roles/src/validator/messages/leader_prepare.rs index 15580bb3..37d07e14 100644 --- a/node/libs/roles/src/validator/messages/leader_prepare.rs +++ b/node/libs/roles/src/validator/messages/leader_prepare.rs @@ -1,5 +1,5 @@ use super::{ - BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, + BlockHeader, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, ReplicaPrepareVerifyError, Signed, Signers, View, }; use crate::validator; @@ -177,14 +177,6 @@ pub enum LeaderPrepareVerifyError { /// Received proposal number. got: BlockNumber, }, - /// Bad parent hash. - #[error("bad parent hash: got {got:?}, want {want:?}")] - BadParentHash { - /// Correct parent hash. - want: Option, - /// Received parent hash. - got: Option, - }, /// New block proposal when the previous proposal was not finalized. #[error("new block proposal when the previous proposal was not finalized")] ProposalWhenPreviousNotFinalized, @@ -231,16 +223,10 @@ impl LeaderPrepare { { return Err(Error::ProposalWhenPreviousNotFinalized); } - let (want_parent, want_number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (genesis.fork.first_parent, genesis.fork.first_block), + let want_number = match high_qc { + Some(qc) => qc.header().number.next(), + None => genesis.fork.first_block, }; - if self.proposal.parent != want_parent { - return Err(Error::BadParentHash { - got: self.proposal.parent, - want: want_parent, - }); - } if self.proposal.number != want_number { return Err(Error::BadBlockNumber { got: self.proposal.number, diff --git a/node/libs/roles/src/validator/messages/replica_commit.rs b/node/libs/roles/src/validator/messages/replica_commit.rs index 963f8f39..d08a7069 100644 --- a/node/libs/roles/src/validator/messages/replica_commit.rs +++ b/node/libs/roles/src/validator/messages/replica_commit.rs @@ -14,12 +14,6 @@ impl ReplicaCommit { pub fn verify(&self, genesis: &Genesis) -> anyhow::Result<()> { anyhow::ensure!(self.view.fork == genesis.fork.number); anyhow::ensure!(self.proposal.number >= genesis.fork.first_block); - if self.proposal.number == genesis.fork.first_block { - anyhow::ensure!( - self.proposal.parent == genesis.fork.first_parent, - "bad parent of the first block of the fork" - ); - } Ok(()) } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index 27290f29..4bc7db79 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -1,8 +1,8 @@ //! Test-only utilities. use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use bit_vec::BitVec; @@ -38,7 +38,6 @@ impl Setup { let fork = Fork { number: ForkNumber(rng.gen_range(0..100)), first_block: BlockNumber(rng.gen_range(0..100)), - first_parent: Some(rng.gen()), }; Self::new_with_fork(rng, validators, fork) } @@ -64,9 +63,11 @@ impl Setup { .unwrap_or(ViewNumber(0)), }; let proposal = match self.0.blocks.last() { - Some(b) => BlockHeader::next(b.header(), payload.hash()), + Some(b) => BlockHeader { + number: b.number().next(), + payload: payload.hash(), + }, None => BlockHeader { - parent: self.genesis.fork.first_parent, number: self.genesis.fork.first_block, payload: payload.hash(), }, @@ -181,7 +182,6 @@ impl Distribution for Standard { Fork { number: rng.gen(), first_block: rng.gen(), - first_parent: Some(rng.gen()), } } } @@ -201,16 +201,9 @@ impl Distribution for Standard { } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockHeaderHash { - BlockHeaderHash(rng.gen()) - } -} - impl Distribution for Standard { fn sample(&self, rng: &mut R) -> BlockHeader { BlockHeader { - parent: rng.gen(), number: rng.gen(), payload: rng.gen(), } diff --git a/node/libs/roles/src/validator/tests.rs b/node/libs/roles/src/validator/tests.rs index 5f38b3d8..9affb1e0 100644 --- a/node/libs/roles/src/validator/tests.rs +++ b/node/libs/roles/src/validator/tests.rs @@ -72,17 +72,6 @@ fn test_text_encoding() { Text::new(&t).decode::().unwrap() ); - let block_header_hash: BlockHeaderHash = rng.gen(); - let t = TextFmt::encode(&block_header_hash); - assert_eq!( - block_header_hash, - Text::new(&t).decode::().unwrap() - ); - - let final_block: FinalBlock = rng.gen(); - let t = TextFmt::encode(&final_block); - assert_eq!(final_block, Text::new(&t).decode::().unwrap()); - let msg_hash: MsgHash = rng.gen(); let t = TextFmt::encode(&msg_hash); assert_eq!(msg_hash, Text::new(&t).decode::().unwrap()); @@ -98,7 +87,6 @@ fn test_schema_encoding() { let rng = &mut ctx.rng(); test_encode_random::(rng); test_encode_random::(rng); - test_encode_random::(rng); test_encode_random::(rng); test_encode_random::>(rng); test_encode_random::(rng); diff --git a/node/libs/storage/src/block_store/metrics.rs b/node/libs/storage/src/block_store/metrics.rs index 70447ad7..05b64133 100644 --- a/node/libs/storage/src/block_store/metrics.rs +++ b/node/libs/storage/src/block_store/metrics.rs @@ -7,9 +7,9 @@ pub(super) struct PersistentBlockStore { /// Latency of a successful `genesis()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) genesis_latency: vise::Histogram, - /// Latency of a successful `last()` call. + /// Latency of a successful `state()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] - pub(super) last_latency: vise::Histogram, + pub(super) state_latency: vise::Histogram, /// Latency of a successful `block()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) block_latency: vise::Histogram, diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index 456bc2e1..e769aecf 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -10,7 +10,7 @@ mod metrics; #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockStoreState { /// Stored block with the lowest number. - /// Currently always same as `genesis.first_block`. + /// If last is `None`, this is the first block that should be fetched. pub first: validator::BlockNumber, /// Stored block with the highest number. /// None iff store is empty. @@ -32,21 +32,41 @@ impl BlockStoreState { None => self.first, } } + + /// Verifies `BlockStoreState'. + pub fn verify(&self, genesis: &validator::Genesis) -> anyhow::Result<()> { + anyhow::ensure!( + genesis.fork.first_block <= self.first, + "first block ({}) doesn't belong to the fork (which starts at block {})", + self.first, + genesis.fork.first_block + ); + if let Some(last) = &self.last { + anyhow::ensure!( + self.first <= last.header().number, + "first block {} has bigger number than the last block {}", + self.first, + last.header().number + ); + last.verify(genesis).context("last.verify()")?; + } + Ok(()) + } } /// Storage of a continuous range of L2 blocks. /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait PersistentBlockStore: fmt::Debug + Send + Sync { +pub trait PersistentBlockStore: 'static + fmt::Debug + Send + Sync { /// Genesis matching the block store content. /// Consensus code calls this method only once. async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result; - /// Last block available in storage. + /// Range of blocks available in storage. /// Consensus code calls this method only once and then tracks the /// range of available blocks internally. - async fn last(&self, ctx: &ctx::Ctx) -> ctx::Result>; + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; /// Gets a block by its number. /// Returns error if block is missing. @@ -115,11 +135,11 @@ impl BlockStoreRunner { tracing::info!( "stored block #{}: {:#?}", block.header().number, - block.header().hash() + block.header().payload ); self.0.inner.send_modify(|inner| { - debug_assert_eq!(inner.persisted_state.next(), block.header().number); + debug_assert_eq!(inner.persisted_state.next(), block.number()); inner.persisted_state.last = Some(block.justification.clone()); inner.queue.pop_front(); }); @@ -145,16 +165,10 @@ impl BlockStore { let t = metrics::PERSISTENT_BLOCK_STORE.genesis_latency.start(); let genesis = persistent.genesis(ctx).await.wrap("persistent.genesis()")?; t.observe(); - let t = metrics::PERSISTENT_BLOCK_STORE.last_latency.start(); - let last = persistent.last(ctx).await.wrap("persistent.last()")?; + let t = metrics::PERSISTENT_BLOCK_STORE.state_latency.start(); + let state = persistent.state(ctx).await.wrap("persistent.state()")?; t.observe(); - if let Some(last) = &last { - last.verify(&genesis).context("last.verify()")?; - } - let state = BlockStoreState { - first: genesis.fork.first_block, - last, - }; + state.verify(&genesis).context("state.verify()")?; let this = Arc::new(Self { inner: sync::watch::channel(Inner { queued_state: sync::watch::channel(state.clone()).0, @@ -165,12 +179,6 @@ impl BlockStore { genesis, persistent, }); - // Verify the first block. - if let Some(block) = this.block(ctx, this.genesis.fork.first_block).await? { - block - .verify(&this.genesis) - .with_context(|| format!("verify({:?})", this.genesis.fork.first_block))?; - } Ok((this.clone(), BlockStoreRunner(this))) } @@ -227,17 +235,6 @@ impl BlockStore { return Ok(()); } block.verify(&self.genesis).context("block.verify()")?; - // Verify parent hash, if previous block is available. - if let Some(last) = queued_state.last.as_ref() { - if Some(last.header().hash()) != block.header().parent { - return Err(anyhow::format_err!( - "block.parent = {:?}, want {:?}", - block.header().parent, - last.header().hash() - ) - .into()); - } - } } self.inner.send_if_modified(|inner| { let modified = inner.queued_state.send_if_modified(|queued_state| { @@ -258,6 +255,7 @@ impl BlockStore { } /// Waits until the given block is queued to be stored. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -271,6 +269,7 @@ impl BlockStore { } /// Waits until the given block is stored persistently. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs deleted file mode 100644 index 634118dc..00000000 --- a/node/libs/storage/src/replica_state.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! `FallbackReplicaStateStore` type. - - diff --git a/node/libs/storage/src/replica_store.rs b/node/libs/storage/src/replica_store.rs index 0eaa96fd..465d26d6 100644 --- a/node/libs/storage/src/replica_store.rs +++ b/node/libs/storage/src/replica_store.rs @@ -10,7 +10,7 @@ use zksync_protobuf::{read_optional, read_required, required, ProtoFmt}; /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait ReplicaStore: fmt::Debug + Send + Sync { +pub trait ReplicaStore: 'static + fmt::Debug + Send + Sync { /// Gets the replica state, if it is contained in the database. async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 209d046c..1fd7398c 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,5 +1,5 @@ //! In-memory storage implementation. -use crate::{PersistentBlockStore, ReplicaState}; +use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; use anyhow::Context as _; use std::{ collections::VecDeque, @@ -10,6 +10,7 @@ use zksync_consensus_roles::validator; #[derive(Debug)] struct BlockStoreInner { + first: validator::BlockNumber, genesis: validator::Genesis, blocks: Mutex>, } @@ -24,8 +25,10 @@ pub struct ReplicaStore(Arc>); impl BlockStore { /// New In-memory `BlockStore`. - pub fn new(genesis: validator::Genesis) -> Self { + pub fn new(genesis: validator::Genesis, first: validator::BlockNumber) -> Self { + assert!(genesis.fork.first_block <= first); Self(Arc::new(BlockStoreInner { + first, genesis, blocks: Mutex::default(), })) @@ -38,14 +41,17 @@ impl PersistentBlockStore for BlockStore { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(self - .0 - .blocks - .lock() - .unwrap() - .back() - .map(|b| b.justification.clone())) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + first: self.0.first, + last: self + .0 + .blocks + .lock() + .unwrap() + .back() + .map(|b| b.justification.clone()), + }) } async fn block( @@ -69,11 +75,12 @@ impl PersistentBlockStore for BlockStore { ) -> ctx::Result<()> { let mut blocks = self.0.blocks.lock().unwrap(); let got = block.header().number; - if let Some(last) = blocks.back() { - let want = last.header().number.next(); - if got != want { - return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); - } + let want = match blocks.back() { + Some(last) => last.header().number.next(), + None => self.0.first, + }; + if got != want { + return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); } blocks.push_back(block.clone()); Ok(()) diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index 3a44800b..f0d66052 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -29,49 +29,63 @@ impl Distribution for Standard { } } -/// Constructs a new in-memory store with a genesis block. +/// Constructs a new in-memory store. pub async fn new_store( ctx: &ctx::Ctx, genesis: &validator::Genesis, ) -> (Arc, BlockStoreRunner) { - BlockStore::new(ctx, Box::new(in_memory::BlockStore::new(genesis.clone()))) - .await - .unwrap() + new_store_with_first(ctx, genesis, genesis.fork.first_block).await +} + +/// Constructs a new in-memory store with a custom expected first block +/// (i.e. possibly different than `genesis.fork.first_block`). +pub async fn new_store_with_first( + ctx: &ctx::Ctx, + genesis: &validator::Genesis, + first: validator::BlockNumber, +) -> (Arc, BlockStoreRunner) { + BlockStore::new( + ctx, + Box::new(in_memory::BlockStore::new(genesis.clone(), first)), + ) + .await + .unwrap() } /// Dumps all the blocks stored in `store`. pub async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { let genesis = store.genesis(ctx).await.unwrap(); - let last = store.last(ctx).await.unwrap(); + let state = store.state(ctx).await.unwrap(); + assert!(genesis.fork.first_block <= state.first); let mut blocks = vec![]; - let begin = genesis.fork.first_block; - let end = last + let after = state + .last .as_ref() .map(|qc| qc.header().number.next()) - .unwrap_or(begin); - for n in (begin.0..end.0).map(validator::BlockNumber) { + .unwrap_or(state.first); + for n in (state.first.0..after.0).map(validator::BlockNumber) { let block = store.block(ctx, n).await.unwrap(); assert_eq!(block.header().number, n); blocks.push(block); } - assert!(store.block(ctx, end).await.is_err()); + if let Some(before) = state.first.prev() { + assert!(store.block(ctx, before).await.is_err()); + } + assert!(store.block(ctx, after).await.is_err()); blocks } /// Verifies storage content. pub async fn verify(ctx: &ctx::Ctx, store: &BlockStore) -> anyhow::Result<()> { let range = store.subscribe().borrow().clone(); - let mut parent: Option = None; for n in (range.first.0..range.next().0).map(validator::BlockNumber) { async { - let block = store.block(ctx, n).await?.context("missing")?; - block.verify(store.genesis())?; - // Ignore checking the first block parent - if parent.is_some() { - anyhow::ensure!(parent == block.header().parent); - } - parent = Some(block.header().hash()); - Ok(()) + store + .block(ctx, n) + .await? + .context("missing")? + .verify(store.genesis()) + .context("verify()") } .await .context(n)?; diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 4a50c8a0..221be6f7 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -1,6 +1,6 @@ use super::*; -use crate::{testonly::new_store, ReplicaState}; -use zksync_concurrency::{ctx, scope, sync, testonly::abort_on_panic}; +use crate::{testonly::new_store_with_first, ReplicaState}; +use zksync_concurrency::{ctx, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator::testonly::Setup; #[tokio::test] @@ -10,7 +10,10 @@ async fn test_inmemory_block_store() { let mut setup = Setup::new(rng, 3); setup.push_blocks(rng, 5); - let store = &testonly::in_memory::BlockStore::new(setup.genesis.clone()); + let store = &testonly::in_memory::BlockStore::new( + setup.genesis.clone(), + setup.genesis.fork.first_block, + ); let mut want = vec![]; for block in &setup.blocks { store.store_next_block(ctx, block).await.unwrap(); @@ -32,26 +35,50 @@ async fn test_state_updates() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let mut setup = Setup::new(rng, 1); - setup.push_blocks(rng, 1); - let (store, runner) = new_store(ctx, &setup.genesis).await; + setup.push_blocks(rng, 5); + // Create store with non-trivial first block. + let first_block = &setup.blocks[2]; + let (store, runner) = new_store_with_first(ctx, &setup.genesis, first_block.number()).await; scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx)); let sub = &mut store.subscribe(); - let state = sub.borrow().clone(); - assert_eq!(state.first, setup.genesis.fork.first_block); - assert_eq!(state.last, None); + let want = BlockStoreState { + first: first_block.number(), + last: None, + }; - store - .queue_block(ctx, setup.blocks[0].clone()) - .await - .unwrap(); + // Waiting for blocks before genesis first block (or before `state.first_block`) should be ok + // and should complete immediately. + for n in [ + setup.genesis.fork.first_block.prev().unwrap(), + first_block.number().prev().unwrap(), + ] { + store.wait_until_queued(ctx, n).await.unwrap(); + store.wait_until_persisted(ctx, n).await.unwrap(); + assert_eq!(want, *sub.borrow()); + } - let state = sync::wait_for(ctx, sub, |state| { - state.last.as_ref() == Some(&setup.blocks[0].justification) - }) - .await? - .clone(); - assert_eq!(state.first, setup.blocks[0].header().number); + for block in &setup.blocks { + store.queue_block(ctx, block.clone()).await.unwrap(); + if block.number() < first_block.number() { + // Queueing block before first block should be a noop. + store.wait_until_queued(ctx, block.number()).await.unwrap(); + store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + assert_eq!(want, *sub.borrow()); + } else { + // Otherwise the state should be updated as soon as block is queued. + assert_eq!( + BlockStoreState { + first: first_block.number(), + last: Some(block.justification.clone()), + }, + *sub.borrow() + ); + } + } Ok(()) }) .await diff --git a/node/tests/src/main.rs b/node/tests/src/main.rs index 5b873846..46bebaec 100644 --- a/node/tests/src/main.rs +++ b/node/tests/src/main.rs @@ -1,9 +1,8 @@ //! This is a simple test for the RPC server. It checks if the server is running and can respond to. -use std::{fs, io::Write, net::SocketAddr, path::PathBuf, str::FromStr}; - use anyhow::{ensure, Context}; use clap::{Parser, Subcommand}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params, types::Params}; +use std::{fs, io::Write, net::SocketAddr, path::PathBuf, str::FromStr}; use zksync_consensus_tools::{ k8s, rpc::methods::{health_check::HealthCheck, RPCMethod}, diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 93e80fa4..3546e97c 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -26,7 +26,6 @@ rocksdb.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true -tokio-retry.workspace = true tracing.workspace = true tracing-subscriber.workspace = true vise-exporter.workspace = true diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index ccfa2310..bf71df85 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,13 +1,10 @@ //! Deployer for the kubernetes cluster. -use std::net::SocketAddr; -use std::str::FromStr; -use std::{fs, path::PathBuf}; - use anyhow::Context; use clap::{Parser, Subcommand}; +use std::{collections::HashMap, fs, path::PathBuf}; use zksync_consensus_crypto::{Text, TextFmt}; use zksync_consensus_roles::{node, validator}; -use zksync_consensus_tools::{k8s, AppConfig, NodeAddr, NODES_PORT}; +use zksync_consensus_tools::{k8s, AppConfig}; /// K8s namespace for consensus nodes. const NAMESPACE: &str = "consensus"; @@ -94,54 +91,79 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> { } /// Deploys the nodes to the kubernetes cluster. -async fn deploy(nodes: usize, seed_nodes: Option) -> anyhow::Result<()> { +async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow::Result<()> { let client = k8s::get_client().await?; k8s::create_or_reuse_namespace(&client, NAMESPACE).await?; - - let seed_nodes = seed_nodes.unwrap_or(1); - - // deploy seed peer(s) - for i in 0..seed_nodes { - k8s::deploy_node( - &client, - i, - true, - vec![], // Seed peers don't have other peer information - NAMESPACE, - ) - .await?; + let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); + + let seed_nodes = &mut HashMap::new(); + let mut non_seed_nodes = HashMap::new(); + + // Split the nodes in different hash maps as they will be deployed at different stages + let mut consensus_nodes = from_configs(nodes_amount)?; + for (index, node) in consensus_nodes.iter_mut().enumerate() { + if index < seed_nodes_amount { + node.is_seed = true; + seed_nodes.insert(node.id.to_owned(), node); + } else { + non_seed_nodes.insert(node.id.to_owned(), node); + } } - // obtain seed peer(s) IP(s) - let peer_ips = k8s::get_seed_node_addrs(&client, seed_nodes, NAMESPACE).await?; - - let mut peers = vec![]; + // Deploy seed peer(s) + for node in seed_nodes.values_mut() { + node.deploy(&client, NAMESPACE).await?; + } - for i in 0..seed_nodes { - let node_id = &format!("consensus-node-{i:0>2}"); - let node_key = read_node_key_from_config(node_id)?; - let address = peer_ips.get(node_id).context("IP address not found")?; - peers.push(NodeAddr { - key: node_key.public(), - addr: SocketAddr::from_str(&format!("{address}:{NODES_PORT}"))?, - }); + // Fetch and complete node addrs into seed nodes + for node in seed_nodes.values_mut() { + node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?; } - // deploy the rest of nodes - for i in seed_nodes..nodes { - k8s::deploy_node(&client, i, false, peers.clone(), NAMESPACE).await?; + // Build a vector of seed peers NodeAddrs to provide as gossip_static_outbound to the rest of the nodes + let peers: Vec<_> = seed_nodes + .values() + .map(|n| { + n.node_addr + .as_ref() + .expect("Seed node address not defined") + .clone() + }) + .collect(); + + // Deploy the rest of the nodes + for node in non_seed_nodes.values_mut() { + node.gossip_static_outbound = peers.clone(); + node.deploy(&client, NAMESPACE).await?; } Ok(()) } -/// Obtain node key from config file. -fn read_node_key_from_config(node_id: &String) -> anyhow::Result { +/// Build ConsensusNodes representation list from configurations +// TODO once we can provide config via cli args, this will be replaced +// using in-memory config structs +fn from_configs(nodes: usize) -> anyhow::Result> { let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; let root = PathBuf::from(manifest_path).join("k8s_configs"); - let node_key_path = root.join(node_id).join("node_key"); - let key = fs::read_to_string(node_key_path).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") + let mut consensus_nodes = vec![]; + + for i in 0..nodes { + let node_id = format!("consensus-node-{i:0>2}"); + let node_key_path = root.join(&node_id).join("node_key"); + let key_string = fs::read_to_string(node_key_path).context("failed reading file")?; + let key = Text::new(&key_string) + .decode() + .context("failed decoding key")?; + consensus_nodes.push(k8s::ConsensusNode { + id: node_id, + key, + node_addr: None, + is_seed: false, + gossip_static_outbound: vec![], + }); + } + Ok(consensus_nodes) } #[tokio::main] diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 43494057..53f608e2 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -2,13 +2,12 @@ use crate::{proto, store}; use anyhow::Context as _; use serde_json::{ser::Formatter, Serializer}; -use std::net::Ipv4Addr; -use std::str::FromStr; use std::{ collections::{HashMap, HashSet}, fs, - net::SocketAddr, + net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, + str::FromStr, }; use zksync_concurrency::ctx; use zksync_consensus_bft as bft; diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index 89735d03..f53bfa12 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -1,5 +1,5 @@ use crate::{config, NodeAddr}; -use anyhow::{anyhow, ensure, Context}; +use anyhow::{ensure, Context}; use k8s_openapi::{ api::{ apps::v1::{Deployment, DeploymentSpec}, @@ -12,16 +12,13 @@ use k8s_openapi::{ }; use kube::{ api::{ListParams, PostParams}, - core::{ObjectList, ObjectMeta}, - Api, Client, ResourceExt, + core::ObjectMeta, + Api, Client, }; -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::Retry; +use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; +use tokio::time; use tracing::log::info; +use zksync_consensus_roles::node; use zksync_protobuf::serde::Serde; /// Docker image name for consensus nodes. @@ -30,6 +27,156 @@ const DOCKER_IMAGE_NAME: &str = "consensus-node"; /// K8s namespace for consensus nodes. pub const DEFAULT_NAMESPACE: &str = "consensus"; +/// Consensus Node Representation +#[derive(Debug)] +pub struct ConsensusNode { + /// Node identifier + pub id: String, + /// Node key + pub key: node::SecretKey, + /// Full NodeAddr + pub node_addr: Option, + /// Is seed node (meaning it has no gossipStaticOutbound configuration) + pub is_seed: bool, + /// known gossipStaticOutbound peers + pub gossip_static_outbound: Vec, +} + +impl ConsensusNode { + /// Wait for a deployed consensus node to be ready and have an IP address + pub async fn await_running_pod( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + // Wait until the pod is running, otherwise we get an error. + retry(15, Duration::from_millis(1000), || async { + get_running_pod(&pods, &self.id).await + }) + .await + } + + /// Fetchs the pod's IP address and assignts to self.node_addr + pub async fn fetch_and_assign_pod_ip( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result<()> { + let ip = self + .await_running_pod(client, namespace) + .await? + .status + .context("Status not present")? + .pod_ip + .context("Pod IP address not present")?; + self.node_addr = Some(NodeAddr { + key: self.key.public(), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + }); + Ok(()) + } + + /// Creates a deployment + pub async fn deploy(&self, client: &Client, namespace: &str) -> anyhow::Result<()> { + let cli_args = get_cli_args(&self.gossip_static_outbound); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some(self.id.to_owned()), + namespace: Some(namespace.to_owned()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + selector: LabelSelector { + match_labels: Some(BTreeMap::from([("app".to_owned(), self.id.to_owned())])), + ..Default::default() + }, + replicas: Some(1), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(BTreeMap::from([ + ("app".to_owned(), self.id.to_owned()), + ("id".to_owned(), self.id.to_owned()), + ("seed".to_owned(), self.is_seed.to_string()), + ])), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: self.id.to_owned(), + image: Some(DOCKER_IMAGE_NAME.to_owned()), + env: Some(vec![ + EnvVar { + name: "NODE_ID".to_owned(), + value: Some(self.id.to_owned()), + ..Default::default() + }, + EnvVar { + name: "PUBLIC_ADDR".to_owned(), + value_from: Some(EnvVarSource { + field_ref: Some(ObjectFieldSelector { + field_path: "status.podIP".to_owned(), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, + ]), + command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), + args: Some(cli_args), + image_pull_policy: Some("Never".to_owned()), + ports: Some(vec![ + ContainerPort { + container_port: i32::from(config::NODES_PORT), + ..Default::default() + }, + ContainerPort { + container_port: 3154, + ..Default::default() + }, + ]), + liveness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + readiness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + + let deployments: Api = Api::namespaced(client.clone(), namespace); + let post_params = PostParams::default(); + let result = deployments.create(&post_params, &deployment).await?; + + info!( + "Deployment: {} , created", + result + .metadata + .name + .context("Name not defined in metadata")? + ); + Ok(()) + } +} + /// Get a kube client pub async fn get_client() -> anyhow::Result { Ok(Client::try_default().await?) @@ -167,146 +314,18 @@ pub async fn create_tests_deployment(client: &Client) -> anyhow::Result<()> { Ok(()) } -/// Creates a deployment -pub async fn deploy_node( - client: &Client, - node_index: usize, - is_seed: bool, - peers: Vec, - namespace: &str, -) -> anyhow::Result<()> { - let cli_args = get_cli_args(peers); - let node_name = format!("consensus-node-{node_index:0>2}"); - let deployment = Deployment { - metadata: ObjectMeta { - name: Some(node_name.to_owned()), - namespace: Some(namespace.to_owned()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - selector: LabelSelector { - match_labels: Some(BTreeMap::from([("app".to_owned(), node_name.to_owned())])), - ..Default::default() - }, - replicas: Some(1), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some(BTreeMap::from([ - ("app".to_owned(), node_name.to_owned()), - ("id".to_owned(), node_name.to_owned()), - ("seed".to_owned(), is_seed.to_string()), - ])), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![Container { - name: node_name.to_owned(), - image: Some("consensus-node".to_owned()), - env: Some(vec![ - EnvVar { - name: "NODE_ID".to_owned(), - value: Some(node_name.to_owned()), - ..Default::default() - }, - EnvVar { - name: "PUBLIC_ADDR".to_owned(), - value_from: Some(EnvVarSource { - field_ref: Some(ObjectFieldSelector { - field_path: "status.podIP".to_owned(), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }, - ]), - command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), - args: Some(cli_args), - image_pull_policy: Some("Never".to_owned()), - ports: Some(vec![ - ContainerPort { - container_port: i32::from(config::NODES_PORT), - ..Default::default() - }, - ContainerPort { - container_port: 3154, - ..Default::default() - }, - ]), - liveness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - readiness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }], - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployments: Api = Api::namespaced(client.clone(), namespace); - let post_params = PostParams::default(); - let result = deployments.create(&post_params, &deployment).await?; - - info!( - "Deployment: {} , created", - result - .metadata - .name - .context("Name not defined in metadata")? - ); - Ok(()) -} - -/// Returns a HashMap with mapping: node_id -> IP address -pub async fn get_seed_node_addrs( - client: &Client, - amount: usize, - namespace: &str, -) -> anyhow::Result> { - let mut seed_nodes = HashMap::new(); - let pods: Api = Api::namespaced(client.clone(), namespace); - - // Will retry 15 times during 15 seconds to allow pods to start and obtain an IP - let retry_strategy = FixedInterval::from_millis(1000).take(15); - let pod_list = Retry::spawn(retry_strategy, || get_seed_pods(&pods, amount)).await?; - - for p in pod_list { - let node_id = p.labels()["id"].to_owned(); - seed_nodes.insert( - node_id, - p.status - .context("Status not present")? - .pod_ip - .context("Pod IP address not present")?, - ); - } - Ok(seed_nodes) -} - -async fn get_seed_pods(pods: &Api, amount: usize) -> anyhow::Result> { - let lp = ListParams::default().labels("seed=true"); - let p = pods.list(&lp).await?; - if p.items.len() == amount && p.iter().all(is_pod_running) { - Ok(p) - } else { - Err(anyhow!("Pods are not ready")) +async fn get_running_pod(pods: &Api, label: &str) -> anyhow::Result { + let lp = ListParams::default().labels(&format!("app={label}")); + let pod = pods + .list(&lp) + .await? + .items + .pop() + .with_context(|| format!("Pod not found: {label}"))?; + if !is_pod_running(&pod) { + anyhow::bail!("Pod is not running"); } + Ok(pod) } fn is_pod_running(pod: &Pod) -> bool { @@ -318,7 +337,7 @@ fn is_pod_running(pod: &Pod) -> bool { false } -fn get_cli_args(peers: Vec) -> Vec { +fn get_cli_args(peers: &[NodeAddr]) -> Vec { if peers.is_empty() { [].to_vec() } else { @@ -335,3 +354,19 @@ fn get_cli_args(peers: Vec) -> Vec { .to_vec() } } + +async fn retry(retries: usize, delay: Duration, mut f: F) -> anyhow::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let mut interval = time::interval(delay); + for count in 0.. { + interval.tick().await; + let result = f().await; + if result.is_ok() || count > retries { + return result; + } + } + unreachable!("Loop sould always return") +} diff --git a/node/tools/src/rpc/methods/config.rs b/node/tools/src/rpc/methods/config.rs index f03e9353..3c94e382 100644 --- a/node/tools/src/rpc/methods/config.rs +++ b/node/tools/src/rpc/methods/config.rs @@ -1,7 +1,6 @@ //! Peers method for RPC server. -use crate::{config::encode_json, decode_json, AppConfig}; - use super::RPCMethod; +use crate::{config::encode_json, decode_json, AppConfig}; use jsonrpsee::types::{error::ErrorCode, Params}; use std::fs::{self}; use zksync_protobuf::serde::Serde; diff --git a/node/tools/src/rpc/server.rs b/node/tools/src/rpc/server.rs index 3b66dfa0..c23801be 100644 --- a/node/tools/src/rpc/server.rs +++ b/node/tools/src/rpc/server.rs @@ -1,6 +1,5 @@ -use crate::AppConfig; - use super::methods::{config::ConfigInfo, health_check::HealthCheck, peers::PeersInfo, RPCMethod}; +use crate::AppConfig; use jsonrpsee::server::{middleware::http::ProxyGetRequestLayer, RpcModule, Server}; use std::net::SocketAddr; use zksync_concurrency::{ctx, scope}; diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index f2832ae3..af581000 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -8,7 +8,7 @@ use std::{ }; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{PersistentBlockStore, ReplicaState, ReplicaStore}; +use zksync_consensus_storage::{BlockStoreState, PersistentBlockStore, ReplicaState, ReplicaStore}; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -107,8 +107,12 @@ impl PersistentBlockStore for RocksDB { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(scope::wait_blocking(|| self.last_blocking()).await?) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + // `RocksDB` is assumed to store all blocks starting from genesis. + first: self.0.genesis.fork.first_block, + last: scope::wait_blocking(|| self.last_blocking()).await?, + }) } async fn block(