diff --git a/node/actors/bft/src/testonly/twins/scenario.rs b/node/actors/bft/src/testonly/twins/scenario.rs index 7c9913d8..c1522af1 100644 --- a/node/actors/bft/src/testonly/twins/scenario.rs +++ b/node/actors/bft/src/testonly/twins/scenario.rs @@ -1,8 +1,6 @@ -use std::collections::BTreeSet; - -use rand::{seq::SliceRandom, Rng}; - use super::{splits, HasKey, Split, Twin}; +use rand::{seq::SliceRandom, Rng}; +use std::collections::BTreeSet; /// A cluster holds all the nodes in the simulation, some of which are twins. pub struct Cluster { diff --git a/node/actors/bft/src/testonly/twins/tests.rs b/node/actors/bft/src/testonly/twins/tests.rs index 0a438380..e499a811 100644 --- a/node/actors/bft/src/testonly/twins/tests.rs +++ b/node/actors/bft/src/testonly/twins/tests.rs @@ -1,15 +1,12 @@ +use super::{splits, Cluster, HasKey, ScenarioGenerator, Split, Twin}; +use crate::testonly::twins::unique_key_count; +use rand::Rng; use std::{ collections::{BTreeSet, HashSet}, fmt::Debug, }; - -use rand::Rng; use zksync_concurrency::ctx; -use crate::testonly::twins::unique_key_count; - -use super::{splits, Cluster, HasKey, ScenarioGenerator, Split, Twin}; - #[test] fn test_splits() { let got = splits(&["foo", "bar", "baz"], 2); diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 7ee4088a..2579b5ad 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -274,7 +274,7 @@ async fn run_twins( // A single scenario with 11 replicas took 3-5 seconds. // Panic on timeout; works with `cargo nextest` and the `abort_on_panic` above. - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(60)); let ctx = &ctx::test_root(&ctx::RealClock); #[derive(PartialEq, Debug)] diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 68d2562f..6156a257 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -239,10 +239,7 @@ impl Network { loop { let req = rpc::push_block_store_state::Req(state.clone()); push_block_store_state_client.call(ctx, &req, kB).await?; - state = self - .block_store - .wait_until_queued(ctx, state.next()) - .await?; + state = self.block_store.wait_for_queued_change(ctx, &state).await?; } }); diff --git a/node/actors/network/src/gossip/tests/fetch_blocks.rs b/node/actors/network/src/gossip/tests/fetch_blocks.rs index 154d2984..c42e50a5 100644 --- a/node/actors/network/src/gossip/tests/fetch_blocks.rs +++ b/node/actors/network/src/gossip/tests/fetch_blocks.rs @@ -5,7 +5,10 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::{ctx, limiter, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStoreState}; +use zksync_consensus_storage::{ + testonly::{in_memory, TestMemoryStorage}, + BlockStore, BlockStoreState, PersistentBlockStore as _, +}; #[tokio::test] async fn test_simple() { @@ -329,3 +332,74 @@ async fn test_retry() { .await .unwrap(); } + +/// Test checking that if storage is truncated, +/// then the node announces that to peers. +#[tokio::test] +async fn test_announce_truncated_block_range() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut setup = validator::testonly::Setup::new(rng, 1); + setup.push_blocks(rng, 10); + let mut cfg = crate::testonly::new_configs(rng, &setup, 0)[0].clone(); + cfg.rpc.push_block_store_state_rate = limiter::Rate::INF; + cfg.rpc.get_block_rate = limiter::Rate::INF; + cfg.rpc.get_block_timeout = None; + cfg.validator_key = None; + + scope::run!(ctx, |ctx, s| async { + // Build a custom persistent store, so that we can tweak it later. + let mut persistent = + in_memory::BlockStore::new(setup.genesis.clone(), setup.genesis.first_block); + let (block_store, runner) = BlockStore::new(ctx, Box::new(persistent.clone())).await?; + s.spawn_bg(runner.run(ctx)); + // Use the standard batch store since it doesn't matter. + let store = TestMemoryStorage::new(ctx, &setup.genesis).await; + let (_node, runner) = + crate::testonly::Instance::new(cfg.clone(), block_store, store.batches); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); + // Fill in all the blocks. + for b in &setup.blocks { + persistent.queue_next_block(ctx, b.clone()).await?; + } + + // Connect to the node. + let (conn, runner) = gossip::testonly::connect(ctx, &cfg, setup.genesis.hash()) + .await + .unwrap(); + s.spawn_bg(async { + assert_matches!(runner.run(ctx).await, Err(mux::RunError::Canceled(_))); + Ok(()) + }); + + let mut first = setup.genesis.first_block; + loop { + tracing::info!("Truncate up to {first}"); + persistent.truncate(first); + first = first + 3; + + // Listen to `PublicBlockStoreState` messages. + // Until it is consistent with storage. + loop { + let mut stream = conn + .open_server::(ctx) + .await?; + let state = stream.recv(ctx).await.unwrap(); + stream.send(ctx, &()).await.unwrap(); + if state.0 == *persistent.persisted().borrow() { + break; + } + } + + // If there are no blocks left, we are done. + let left = persistent.persisted().borrow().clone(); + if left.next() <= left.first { + break; + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 5109083c..8e3e1526 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -1,3 +1,4 @@ +use super::ValidatorAddrs; use crate::{ gossip::{batch_votes::BatchVotesWatch, handshake, validator_addrs::ValidatorAddrsWatch}, metrics, preface, rpc, testonly, @@ -19,8 +20,6 @@ use zksync_concurrency::{ use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::testonly::TestMemoryStorage; -use super::ValidatorAddrs; - mod fetch_batches; mod fetch_blocks; mod syncing; diff --git a/node/actors/network/src/gossip/tests/syncing.rs b/node/actors/network/src/gossip/tests/syncing.rs index f092bbf7..b456350f 100644 --- a/node/actors/network/src/gossip/tests/syncing.rs +++ b/node/actors/network/src/gossip/tests/syncing.rs @@ -10,7 +10,10 @@ use zksync_concurrency::{ time, }; use zksync_consensus_roles::validator; -use zksync_consensus_storage::testonly::TestMemoryStorage; +use zksync_consensus_storage::{ + testonly::{dump, in_memory, TestMemoryStorage}, + BlockStore, +}; const EXCHANGED_STATE_COUNT: usize = 5; const NETWORK_CONNECTIVITY_CASES: [(usize, usize); 5] = [(2, 1), (3, 2), (5, 3), (10, 4), (10, 7)]; @@ -414,3 +417,92 @@ async fn uncoordinated_batch_syncing( .await .unwrap(); } + +/// Test checking that if blocks that weren't queued get persisted, +/// the syncing can behave accordingly. +#[tokio::test(flavor = "multi_thread")] +async fn test_sidechannel_sync() { + abort_on_panic(); + let _guard = set_timeout(time::Duration::seconds(20)); + + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut setup = validator::testonly::Setup::new(rng, 2); + setup.push_blocks(rng, 10); + let cfgs = testonly::new_configs(rng, &setup, 1); + scope::run!(ctx, |ctx, s| async { + let mut stores = vec![]; + let mut nodes = vec![]; + for (i, mut cfg) in cfgs.into_iter().enumerate() { + cfg.rpc.push_block_store_state_rate = limiter::Rate::INF; + cfg.rpc.get_block_rate = limiter::Rate::INF; + cfg.rpc.get_block_timeout = None; + cfg.validator_key = None; + + // Build a custom persistent store, so that we can tweak it later. + let persistent = + in_memory::BlockStore::new(setup.genesis.clone(), setup.genesis.first_block); + stores.push(persistent.clone()); + let (block_store, runner) = BlockStore::new(ctx, Box::new(persistent)).await?; + s.spawn_bg(runner.run(ctx)); + // Use the standard batch store since it doesn't matter. + let store = TestMemoryStorage::new(ctx, &setup.genesis).await; + let (node, runner) = testonly::Instance::new(cfg, block_store, store.batches); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + nodes.push(node); + } + + { + // Truncate at the start. + stores[1].truncate(setup.blocks[3].number()); + + // Sync a block prefix. + let prefix = &setup.blocks[0..5]; + for b in prefix { + nodes[0] + .net + .gossip + .block_store + .queue_block(ctx, b.clone()) + .await?; + } + nodes[1] + .net + .gossip + .block_store + .wait_until_persisted(ctx, prefix.last().unwrap().number()) + .await?; + + // Check that the expected block range is actually stored. + assert_eq!(setup.blocks[3..5], dump(ctx, &stores[1]).await); + } + + { + // Truncate more than prefix. + stores[1].truncate(setup.blocks[8].number()); + + // Sync a block suffix. + let suffix = &setup.blocks[5..10]; + for b in suffix { + nodes[0] + .net + .gossip + .block_store + .queue_block(ctx, b.clone()) + .await?; + } + nodes[1] + .net + .gossip + .block_store + .wait_until_persisted(ctx, suffix.last().unwrap().number()) + .await?; + + // Check that the expected block range is actually stored. + assert_eq!(setup.blocks[8..10], dump(ctx, &stores[1]).await); + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/http/mod.rs b/node/actors/network/src/http/mod.rs index 87eb9922..bb796389 100644 --- a/node/actors/network/src/http/mod.rs +++ b/node/actors/network/src/http/mod.rs @@ -1,4 +1,5 @@ //! Http Server to export debug information +use crate::{consensus, MeteredStreamStats, Network}; use anyhow::Context as _; use base64::Engine; use build_html::{Html, HtmlContainer, HtmlPage, Table, TableCell, TableCellType, TableRow}; @@ -29,8 +30,6 @@ use tokio_rustls::{ use zksync_concurrency::{ctx, scope}; use zksync_consensus_utils::debug_page; -use crate::{consensus, MeteredStreamStats, Network}; - const STYLE: &str = include_str!("style.css"); /// Http debug page configuration. diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index f152d72a..31e465a6 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -27,9 +27,8 @@ pub mod testonly; #[cfg(test)] mod tests; mod watch; -pub use metrics::MeteredStreamStats; - pub use config::*; +pub use metrics::MeteredStreamStats; /// State of the network actor observable outside of the actor. pub struct Network { diff --git a/node/libs/concurrency/src/sync/mod.rs b/node/libs/concurrency/src/sync/mod.rs index 43cd5c0b..46d78235 100644 --- a/node/libs/concurrency/src/sync/mod.rs +++ b/node/libs/concurrency/src/sync/mod.rs @@ -107,6 +107,23 @@ pub async fn notified(ctx: &ctx::Ctx, notify: &Notify) -> ctx::OrCanceled<()> { ctx.wait(notify.notified()).await } +/// Sends the modified value iff `f()` returns `Ok`. +/// Forwards the result of `f()` to the caller. +pub fn try_send_modify( + send: &watch::Sender, + f: impl FnOnce(&mut T) -> Result, +) -> Result { + let mut res = None; + send.send_if_modified(|v| { + let x = f(v); + let s = x.is_ok(); + res = Some(x); + s + }); + // safe, since `res` is set by `send_if_modified`. + res.unwrap() +} + /// Waits for a watch change notification. /// Immediately borrows a reference to the new value. pub async fn changed<'a, T>( @@ -138,6 +155,20 @@ pub async fn wait_for<'a, T>( Err(ctx::Canceled) } +/// Waits until predicate is different than `None`. +pub async fn wait_for_some( + ctx: &ctx::Ctx, + recv: &mut watch::Receiver, + pred: impl Fn(&T) -> Option, +) -> ctx::OrCanceled { + recv.mark_changed(); + loop { + if let Some(v) = pred(&*changed(ctx, recv).await?) { + return Ok(v); + } + } +} + struct ExclusiveLockInner { value: T, drop_sender: oneshot::Sender, diff --git a/node/libs/crypto/src/secp256k1/mod.rs b/node/libs/crypto/src/secp256k1/mod.rs index b68c775a..0645beac 100644 --- a/node/libs/crypto/src/secp256k1/mod.rs +++ b/node/libs/crypto/src/secp256k1/mod.rs @@ -1,12 +1,10 @@ //! ECDSA signatures over the Secp256k1 curve, chosen to work with EVM precompiles. -use std::hash::Hash; - +use crate::{keccak256::Keccak256, ByteFmt}; use anyhow::bail; +use std::hash::Hash; use zeroize::ZeroizeOnDrop; -use crate::{keccak256::Keccak256, ByteFmt}; - mod testonly; #[cfg(test)] diff --git a/node/libs/crypto/src/secp256k1/tests.rs b/node/libs/crypto/src/secp256k1/tests.rs index 7223e098..0d0a8300 100644 --- a/node/libs/crypto/src/secp256k1/tests.rs +++ b/node/libs/crypto/src/secp256k1/tests.rs @@ -1,12 +1,10 @@ -use std::fmt::Debug; - +use crate::{secp256k1::*, ByteFmt}; use rand::{ distributions::{Distribution, Standard}, rngs::StdRng, Rng, SeedableRng, }; - -use crate::{secp256k1::*, ByteFmt}; +use std::fmt::Debug; fn make_rng() -> StdRng { StdRng::seed_from_u64(29483920) diff --git a/node/libs/roles/src/attester/keys/aggregate_signature.rs b/node/libs/roles/src/attester/keys/aggregate_signature.rs index d7496e01..09f5105d 100644 --- a/node/libs/roles/src/attester/keys/aggregate_signature.rs +++ b/node/libs/roles/src/attester/keys/aggregate_signature.rs @@ -1,10 +1,8 @@ +use crate::attester::{Batch, MsgHash, Signers}; use std::fmt; - use zksync_consensus_crypto::{bls12_381, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; -use crate::attester::{Batch, MsgHash, Signers}; - // TODO: Once EIP-2537 is merged, `attester::Signature` could be changed to point at the BLS signature. // For now these are just a placeholders so we can keep an `AggregateSignature` around. type BlsSignature = bls12_381::Signature; diff --git a/node/libs/roles/src/attester/testonly.rs b/node/libs/roles/src/attester/testonly.rs index 950bc347..940f34f1 100644 --- a/node/libs/roles/src/attester/testonly.rs +++ b/node/libs/roles/src/attester/testonly.rs @@ -1,9 +1,8 @@ -use crate::validator::Payload; - use super::{ AggregateMultiSig, AggregateSignature, Batch, BatchNumber, BatchQC, Committee, Msg, MsgHash, MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, WeightedAttester, }; +use crate::validator::Payload; use bit_vec::BitVec; use rand::{ distributions::{Distribution, Standard}, diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index b1577056..d463b66a 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -1,10 +1,8 @@ //! Defines storage layer for batches of blocks. use anyhow::Context as _; -use std::collections::VecDeque; -use std::{fmt, sync::Arc}; +use std::{collections::VecDeque, fmt, sync::Arc}; use zksync_concurrency::{ctx, scope, sync}; -use zksync_consensus_roles::attester; -use zksync_consensus_roles::validator; +use zksync_consensus_roles::{attester, validator}; /// State of the `BatchStore`: continuous range of batches. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index e7ea19a1..b29a1036 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -116,9 +116,29 @@ impl Inner { true } + /// Updates `persisted` field. + fn update_persisted(&mut self, persisted: BlockStoreState) -> anyhow::Result<()> { + if persisted.next() < self.persisted.next() { + anyhow::bail!("head block has been removed from storage, this is not supported"); + } + self.persisted = persisted; + if self.queued.first < self.persisted.first { + self.queued.first = self.persisted.first; + } + // If persisted blocks overtook the queue (blocks were fetched via some side-channel), + // it means we need to reset the cache - otherwise we would have a gap. + if self.queued.next() < self.persisted.next() { + self.queued = self.persisted.clone(); + self.cache.clear(); + } + self.truncate_cache(); + Ok(()) + } + + /// If cache size has been exceeded, remove entries which were already persisted. fn truncate_cache(&mut self) { while self.cache.len() > Self::CACHE_CAPACITY - && self.persisted.contains(self.cache[0].number()) + && self.persisted.next() > self.cache[0].number() { self.cache.pop_front(); } @@ -155,28 +175,24 @@ impl BlockStoreRunner { let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); let res = scope::run!(ctx, |ctx, s| async { - let persisted = self.0.persistent.persisted(); - let mut queue_next = persisted.borrow().next(); - // Task truncating cache whenever a block gets persisted. s.spawn::<()>(async { - let mut persisted = persisted; + // Task watching the persisted state. + let mut persisted = self.0.persistent.persisted(); + persisted.mark_changed(); loop { - let persisted = sync::changed(ctx, &mut persisted).await?.clone(); - self.0.inner.send_modify(|inner| { - inner.persisted = persisted; - inner.truncate_cache(); - }); + let new = sync::changed(ctx, &mut persisted).await?.clone(); + sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?; } }); // Task queueing blocks to be persisted. let inner = &mut self.0.inner.subscribe(); + let mut queue_next = validator::BlockNumber(0); loop { - let block = sync::wait_for(ctx, inner, |inner| inner.queued.contains(queue_next)) - .await? - .block(queue_next) - .unwrap(); - queue_next = queue_next.next(); - + let block = sync::wait_for_some(ctx, inner, |inner| { + inner.block(queue_next.max(inner.persisted.next())) + }) + .await?; + queue_next = block.number().next(); // TODO: monitor errors as well. let t = metrics::PERSISTENT_BLOCK_STORE .queue_next_block_latency @@ -275,6 +291,21 @@ impl BlockStore { Ok(()) } + /// Waits until the queued blocks range is different than `old`. + pub async fn wait_for_queued_change( + &self, + ctx: &ctx::Ctx, + old: &BlockStoreState, + ) -> ctx::OrCanceled { + sync::wait_for_some(ctx, &mut self.inner.subscribe(), |inner| { + if &inner.queued == old { + return None; + } + Some(inner.queued.clone()) + }) + .await + } + /// Waits until the given block is queued (in memory, or persisted). /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned. pub async fn wait_until_queued( diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 45cb543d..cde40a51 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -48,6 +48,24 @@ impl BlockStore { blocks: Mutex::default(), })) } + + /// Truncates the storage to blocks `>=first`. + pub fn truncate(&mut self, first: validator::BlockNumber) { + let mut blocks = self.0.blocks.lock().unwrap(); + while blocks.front().map_or(false, |b| b.number() < first) { + blocks.pop_front(); + } + self.0.persisted.send_if_modified(|s| { + if s.first >= first { + return false; + } + if s.next() <= first { + s.last = None; + } + s.first = first; + true + }); + } } impl BatchStore { @@ -92,7 +110,12 @@ impl PersistentBlockStore for BlockStore { ) -> ctx::Result<()> { let mut blocks = self.0.blocks.lock().unwrap(); let want = self.0.persisted.borrow().next(); - if block.number() != want { + if block.number() < want { + // It may happen that a block gets fetched which is not needed any more. + return Ok(()); + } + if block.number() > want { + // Blocks should be stored in order though. return Err(anyhow::anyhow!("got block {:?}, want {want:?}", block.number()).into()); } self.0