diff --git a/node/Cargo.lock b/node/Cargo.lock index 07cf3024..7ed5fe20 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -2621,6 +2621,7 @@ dependencies = [ "anyhow", "prost", "rand 0.8.5", + "test-casing", "tokio", "tracing", "vise", diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index cdf437f1..496b3290 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -24,6 +24,7 @@ tracing.workspace = true vise.workspace = true [dev-dependencies] +test-casing.workspace = true tokio.workspace = true [build-dependencies] diff --git a/node/actors/executor/src/testonly.rs b/node/actors/executor/src/testonly.rs index 36fcdcf9..8b099cc8 100644 --- a/node/actors/executor/src/testonly.rs +++ b/node/actors/executor/src/testonly.rs @@ -52,33 +52,33 @@ impl FullValidatorConfig { } } - /// Creates a new external node and configures this validator to accept incoming connections from it. - pub fn connect_external_node(&mut self, rng: &mut impl Rng) -> ExternalNodeConfig { - let external_node_config = ExternalNodeConfig::new(rng, self); + /// Creates a new full node and configures this validator to accept incoming connections from it. + pub fn connect_full_node(&mut self, rng: &mut impl Rng) -> FullNodeConfig { + let full_node_config = FullNodeConfig::new(rng, self); self.node_config .gossip .static_inbound - .insert(external_node_config.node_key.public()); - external_node_config + .insert(full_node_config.node_key.public()); + full_node_config } } -/// Configuration for an external node (i.e., non-validator node). +/// Configuration for a full non-validator node. #[derive(Debug)] #[non_exhaustive] -pub struct ExternalNodeConfig { +pub struct FullNodeConfig { /// Executor configuration. pub node_config: ExecutorConfig, /// Secret key of the node used for identification in the gossip network. pub node_key: node::SecretKey, } -impl ExternalNodeConfig { +impl FullNodeConfig { fn new(rng: &mut impl Rng, validator: &FullValidatorConfig) -> Self { let node_key: node::SecretKey = rng.gen(); - let external_node_addr = net::tcp::testonly::reserve_listener(); + let full_node_addr = net::tcp::testonly::reserve_listener(); let node_config = ExecutorConfig { - server_addr: *external_node_addr, + server_addr: *full_node_addr, gossip: GossipConfig { key: node_key.public(), static_outbound: HashMap::from([( diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index cd8f6d8e..23c63e07 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,11 +1,36 @@ //! High-level tests for `Executor`. + use super::*; use crate::testonly::FullValidatorConfig; -use zksync_concurrency::{sync, testonly::abort_on_panic}; -use zksync_consensus_roles::validator::{BlockNumber, Payload}; +use rand::Rng; +use std::iter; +use test_casing::test_casing; +use zksync_concurrency::{sync, testonly::abort_on_panic, time}; +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload}; use zksync_consensus_storage::{BlockStore, InMemoryStorage}; impl FullValidatorConfig { + fn gen_blocks(&self, rng: &mut impl Rng, count: usize) -> Vec { + let genesis_block = self.node_config.genesis_block.clone(); + let validators = &self.node_config.validators; + let blocks = iter::successors(Some(genesis_block), |parent| { + let payload: Payload = rng.gen(); + let header = validator::BlockHeader { + parent: parent.header.hash(), + number: parent.header.number.next(), + payload: payload.hash(), + }; + let commit = self.validator_key.sign_msg(validator::ReplicaCommit { + protocol_version: validator::CURRENT_VERSION, + view: validator::ViewNumber(header.number.0), + proposal: header, + }); + let justification = validator::CommitQC::from(&[commit], validators).unwrap(); + Some(FinalBlock::new(header, payload, justification)) + }); + blocks.skip(1).take(count).collect() + } + fn into_executor(self, storage: Arc) -> Executor { let mut executor = Executor::new(self.node_config, self.node_key, storage.clone()).unwrap(); executor @@ -41,35 +66,117 @@ async fn executing_single_validator() { } #[tokio::test] -async fn executing_validator_and_external_node() { +async fn executing_validator_and_full_node() { abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); let rng = &mut ctx.rng(); let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])); - let external_node = validator.connect_external_node(rng); + let full_node = validator.connect_full_node(rng); let genesis_block = &validator.node_config.genesis_block; let validator_storage = InMemoryStorage::new(genesis_block.clone()); let validator_storage = Arc::new(validator_storage); - let external_node_storage = InMemoryStorage::new(genesis_block.clone()); - let external_node_storage = Arc::new(external_node_storage); - let mut en_subscriber = external_node_storage.subscribe_to_block_writes(); + let full_node_storage = InMemoryStorage::new(genesis_block.clone()); + let full_node_storage = Arc::new(full_node_storage); + let mut full_node_subscriber = full_node_storage.subscribe_to_block_writes(); let validator = validator.into_executor(validator_storage.clone()); - let external_node = Executor::new( - external_node.node_config, - external_node.node_key, - external_node_storage.clone(), + let full_node = Executor::new( + full_node.node_config, + full_node.node_key, + full_node_storage.clone(), ) .unwrap(); scope::run!(ctx, |ctx, s| async { s.spawn_bg(validator.run(ctx)); - s.spawn_bg(external_node.run(ctx)); + s.spawn_bg(full_node.run(ctx)); for _ in 0..5 { - let number = *sync::changed(ctx, &mut en_subscriber).await?; - tracing::trace!(%number, "External node received block"); + let number = *sync::changed(ctx, &mut full_node_subscriber).await?; + tracing::trace!(%number, "Full node received block"); + } + anyhow::Ok(()) + }) + .await + .unwrap(); +} + +#[test_casing(2, [false, true])] +#[tokio::test] +async fn syncing_full_node_from_snapshot(delay_block_storage: bool) { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); + let rng = &mut ctx.rng(); + + let mut validator = FullValidatorConfig::for_single_validator(rng, Payload(vec![])); + let full_node = validator.connect_full_node(rng); + + let genesis_block = &validator.node_config.genesis_block; + let blocks = validator.gen_blocks(rng, 10); + let validator_storage = InMemoryStorage::new(genesis_block.clone()); + let validator_storage = Arc::new(validator_storage); + if !delay_block_storage { + // Instead of running consensus on the validator, add the generated blocks manually. + for block in &blocks { + validator_storage.put_block(ctx, block).await.unwrap(); + } + } + let validator = Executor::new( + validator.node_config, + validator.node_key, + validator_storage.clone(), + ) + .unwrap(); + + // Start a full node from a snapshot. + let full_node_storage = InMemoryStorage::new(blocks[3].clone()); + let full_node_storage = Arc::new(full_node_storage); + let mut full_node_subscriber = full_node_storage.subscribe_to_block_writes(); + + let full_node = Executor::new( + full_node.node_config, + full_node.node_key, + full_node_storage.clone(), + ) + .unwrap(); + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(validator.run(ctx)); + s.spawn_bg(full_node.run(ctx)); + + if delay_block_storage { + // Emulate the validator gradually adding new blocks to the storage. + s.spawn_bg(async { + for block in &blocks { + ctx.sleep(time::Duration::milliseconds(500)).await?; + validator_storage.put_block(ctx, block).await?; + } + Ok(()) + }); + } + + loop { + let last_contiguous_full_node_block = + full_node_storage.last_contiguous_block_number(ctx).await?; + tracing::trace!( + %last_contiguous_full_node_block, + "Full node updated last contiguous block" + ); + if last_contiguous_full_node_block == BlockNumber(10) { + break; // The full node has received all blocks! + } + // Wait until the node storage is updated. + let number = *sync::changed(ctx, &mut full_node_subscriber).await?; + tracing::trace!(%number, "Full node received block"); + } + + // Check that the node didn't receive any blocks with number lesser than the initial snapshot block. + for lesser_block_number in 0..3 { + let block = full_node_storage + .block(ctx, BlockNumber(lesser_block_number)) + .await?; + assert!(block.is_none()); } anyhow::Ok(()) }) diff --git a/node/actors/sync_blocks/src/lib.rs b/node/actors/sync_blocks/src/lib.rs index b25211a6..0cf1fada 100644 --- a/node/actors/sync_blocks/src/lib.rs +++ b/node/actors/sync_blocks/src/lib.rs @@ -26,8 +26,6 @@ mod tests; pub use crate::config::Config; use crate::peers::PeerStates; -// FIXME(slowli): when run on validator node, the actor creates unnecessary `GetBlocks` requests - /// Block syncing actor responsible for synchronizing L2 blocks with other nodes. #[derive(Debug)] pub struct SyncBlocks { diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 7639db94..f7c8b6c7 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -3,7 +3,7 @@ use self::events::PeerStateEvent; use crate::{io, Config}; use anyhow::Context as _; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, ops, sync::Arc}; use tracing::instrument; use zksync_concurrency::{ ctx::{self, channel}, @@ -25,10 +25,18 @@ type PeerStateUpdate = (node::PublicKey, SyncState); #[derive(Debug)] struct PeerState { + first_stored_block: BlockNumber, last_contiguous_stored_block: BlockNumber, get_block_semaphore: Arc, } +impl PeerState { + fn has_block(&self, number: BlockNumber) -> bool { + let range = self.first_stored_block..=self.last_contiguous_stored_block; + range.contains(&number) + } +} + /// Handle for [`PeerStates`] allowing to send updates to it. #[derive(Debug, Clone)] pub(crate) struct PeerStatesHandle { @@ -196,8 +204,8 @@ impl PeerStates { peer_key: node::PublicKey, state: SyncState, ) -> ctx::OrCanceled { - let last_contiguous_stored_block = match self.validate_sync_state(state) { - Ok(block_number) => block_number, + let numbers = match self.validate_sync_state(state) { + Ok(numbers) => numbers, Err(err) => { tracing::warn!(%err, "Invalid `SyncState` received from peer"); if let Some(events_sender) = &self.events_sender { @@ -207,10 +215,13 @@ impl PeerStates { // TODO: ban peer etc. } }; + let first_stored_block = *numbers.start(); + let last_contiguous_stored_block = *numbers.end(); let mut peers = sync::lock(ctx, &self.peers).await?; let permits = self.config.max_concurrent_blocks_per_peer; let peer_state = peers.entry(peer_key.clone()).or_insert_with(|| PeerState { + first_stored_block, last_contiguous_stored_block, get_block_semaphore: Arc::new(Semaphore::new(permits)), }); @@ -223,6 +234,16 @@ impl PeerStates { ({last_contiguous_stored_block}) is lesser than the old value ({prev_contiguous_stored_block})" ); } + + peer_state.first_stored_block = first_stored_block; + // If `first_stored_block` increases, we could cancel getting pruned blocks from the peer here. + // However, the peer will respond such requests with a "missing block" error anyway, + // and new requests won't be routed to it because of updated `PeerState`, + // so having no special handling is fine. + // Likewise, no specialized handling is required for decreasing `first_stored_block`; + // if this leads to an ability to fetch some of the pending blocks, it'll be discovered + // after `sleep_interval_for_get_block` (i.e., soon enough). + tracing::trace!( %prev_contiguous_stored_block, %last_contiguous_stored_block, @@ -237,7 +258,10 @@ impl PeerStates { Ok(last_contiguous_stored_block) } - fn validate_sync_state(&self, state: SyncState) -> anyhow::Result { + fn validate_sync_state( + &self, + state: SyncState, + ) -> anyhow::Result> { let numbers = state.numbers(); anyhow::ensure!( numbers.first_stored_block <= numbers.last_contiguous_stored_block, @@ -252,10 +276,11 @@ impl PeerStates { .last_contiguous_stored_block .verify(&self.config.validator_set, self.config.consensus_threshold) .context("Failed verifying `last_contiguous_stored_block`")?; - // We don't verify QCs for first and last stored blocks since they are not used - // in the following logic. To reflect this, the method consumes `SyncState` and returns - // the validated block number. - Ok(numbers.last_contiguous_stored_block) + // We don't verify QCs for the last stored block since it is not used + // in the following logic. The first stored block is not verified as well since it doesn't + // extend the set of blocks a peer should have. To reflect this, the method consumes `SyncState` + // and returns the validated block numbers. + Ok(numbers.first_stored_block..=numbers.last_contiguous_stored_block) } async fn get_and_save_block( @@ -347,7 +372,7 @@ impl PeerStates { ) -> Option<(node::PublicKey, sync::OwnedSemaphorePermit)> { let mut peers_with_no_permits = vec![]; let eligible_peers_info = peers.iter().filter(|(peer_key, state)| { - if state.last_contiguous_stored_block < block_number { + if !state.has_block(block_number) { return false; } let available_permits = state.get_block_semaphore.available_permits(); diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs deleted file mode 100644 index f4fe1e15..00000000 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ /dev/null @@ -1,1133 +0,0 @@ -use super::*; -use crate::tests::TestValidators; -use assert_matches::assert_matches; -use async_trait::async_trait; -use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; -use std::{collections::HashSet, fmt}; -use test_casing::{test_casing, Product}; -use zksync_concurrency::{testonly::abort_on_panic, time}; -use zksync_consensus_roles::validator; -use zksync_consensus_storage::{BlockStore, InMemoryStorage, StorageError}; - -const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); -const BLOCK_SLEEP_INTERVAL: time::Duration = time::Duration::milliseconds(5); - -#[derive(Debug)] -struct TestHandles { - clock: ctx::ManualClock, - rng: StdRng, - test_validators: TestValidators, - peer_states_handle: PeerStatesHandle, - storage: Arc, - message_receiver: channel::UnboundedReceiver, - events_receiver: channel::UnboundedReceiver, -} - -#[async_trait] -trait Test: fmt::Debug + Send + Sync { - const BLOCK_COUNT: usize; - - fn tweak_config(&self, _config: &mut Config) { - // Does nothing by default - } - - async fn initialize_storage( - &self, - _ctx: &ctx::Ctx, - _storage: &dyn WriteBlockStore, - _test_validators: &TestValidators, - ) { - // Does nothing by default - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()>; -} - -#[instrument(level = "trace", skip(ctx, storage), err)] -async fn wait_for_stored_block( - ctx: &ctx::Ctx, - storage: &dyn WriteBlockStore, - expected_block_number: BlockNumber, -) -> ctx::OrCanceled<()> { - tracing::trace!("Started waiting for stored block"); - let mut subscriber = storage.subscribe_to_block_writes(); - let mut got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); - - while got_block < expected_block_number { - sync::changed(ctx, &mut subscriber).await?; - got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); - } - Ok(()) -} - -#[instrument(level = "trace", skip(ctx, events_receiver))] -async fn wait_for_peer_update( - ctx: &ctx::Ctx, - events_receiver: &mut channel::UnboundedReceiver, - expected_peer: &node::PublicKey, -) -> ctx::OrCanceled<()> { - loop { - let peer_event = events_receiver.recv(ctx).await?; - tracing::trace!(?peer_event, "received peer event"); - match peer_event { - PeerStateEvent::PeerUpdated(key) => { - assert_eq!(key, *expected_peer); - return Ok(()); - } - PeerStateEvent::PeerDisconnected(_) | PeerStateEvent::GotBlock(_) => { - // Skip update - } - _ => panic!("Received unexpected peer event: {peer_event:?}"), - } - } -} - -#[instrument(level = "trace")] -async fn test_peer_states(test: T) { - abort_on_panic(); - - let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT); - let clock = ctx::ManualClock::new(); - let ctx = &ctx::test_with_clock(ctx, &clock); - let mut rng = ctx.rng(); - let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng); - let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); - let storage = Arc::new(storage); - test.initialize_storage(ctx, storage.as_ref(), &test_validators) - .await; - - let (message_sender, message_receiver) = channel::unbounded(); - let (events_sender, events_receiver) = channel::unbounded(); - let mut config = test_validators.test_config(); - test.tweak_config(&mut config); - let (mut peer_states, peer_states_handle) = - PeerStates::new(message_sender, storage.clone(), config); - peer_states.events_sender = Some(events_sender); - let test_handles = TestHandles { - clock, - rng, - test_validators, - peer_states_handle, - storage, - message_receiver, - events_receiver, - }; - - scope::run!(ctx, |ctx, s| async { - s.spawn_bg(async { - peer_states.run(ctx).await.or_else(|err| match err { - StorageError::Canceled(_) => Ok(()), // Swallow cancellation errors after the test is finished - StorageError::Database(err) => Err(err), - }) - }); - test.test(ctx, test_handles).await - }) - .await - .unwrap(); -} - -#[derive(Debug)] -struct UpdatingPeerStateWithSingleBlock; - -#[async_trait] -impl Test for UpdatingPeerStateWithSingleBlock { - const BLOCK_COUNT: usize = 2; - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - .. - } = handles; - let mut storage_subscriber = storage.subscribe_to_block_writes(); - - let peer_key = rng.gen::().public(); - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - // Check that the actor has sent a `get_block` request to the peer - let message = message_receiver.recv(ctx).await?; - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(1)); - - // Emulate the peer sending a correct response. - test_validators.send_block(BlockNumber(1), response); - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(1))); - - // Check that the block has been saved locally. - let saved_block = *sync::changed(ctx, &mut storage_subscriber).await?; - assert_eq!(saved_block, BlockNumber(1)); - Ok(()) - } -} - -#[tokio::test] -async fn updating_peer_state_with_single_block() { - test_peer_states(UpdatingPeerStateWithSingleBlock).await; -} - -#[derive(Debug)] -struct CancelingBlockRetrieval; - -#[async_trait] -impl Test for CancelingBlockRetrieval { - const BLOCK_COUNT: usize = 5; - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - .. - } = handles; - - let peer_key = rng.gen::().public(); - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - // Check that the actor has sent a `get_block` request to the peer - let message = message_receiver.recv(ctx).await?; - assert_matches!( - message, - io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. }) - ); - - // Emulate receiving block using external means. - storage - .put_block(ctx, &test_validators.final_blocks[1]) - .await?; - // Retrieval of the block must be canceled. - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1))); - Ok(()) - } -} - -#[tokio::test] -async fn canceling_block_retrieval() { - test_peer_states(CancelingBlockRetrieval).await; -} - -#[derive(Debug)] -struct FilteringBlockRetrieval; - -#[async_trait] -impl Test for FilteringBlockRetrieval { - const BLOCK_COUNT: usize = 5; - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - .. - } = handles; - - // Emulate receiving block using external means. - storage - .put_block(ctx, &test_validators.final_blocks[1]) - .await?; - - let peer_key = rng.gen::().public(); - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - // Check that the actor has sent `get_block` request to the peer, but only for block #2. - let message = message_receiver.recv(ctx).await?; - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, number, .. - }) = message; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(2)); - - assert!(message_receiver.try_recv().is_none()); - Ok(()) - } -} - -#[tokio::test] -async fn filtering_block_retrieval() { - test_peer_states(FilteringBlockRetrieval).await; -} - -#[derive(Debug)] -struct UpdatingPeerStateWithMultipleBlocks; - -impl UpdatingPeerStateWithMultipleBlocks { - const MAX_CONCURRENT_BLOCKS: usize = 3; -} - -#[async_trait] -impl Test for UpdatingPeerStateWithMultipleBlocks { - const BLOCK_COUNT: usize = 10; - - fn tweak_config(&self, config: &mut Config) { - config.max_concurrent_blocks_per_peer = Self::MAX_CONCURRENT_BLOCKS; - // ^ We want to test rate limiting for peers - config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let peer_key = rng.gen::().public(); - peer_states_handle.update( - peer_key.clone(), - test_validators.sync_state(Self::BLOCK_COUNT - 1), - ); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - let mut requested_blocks = HashMap::with_capacity(Self::MAX_CONCURRENT_BLOCKS); - for _ in 1..Self::BLOCK_COUNT { - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message_receiver.recv(ctx).await?; - - tracing::trace!("Received request for block #{number}"); - assert_eq!(recipient, peer_key); - assert!( - requested_blocks.insert(number, response).is_none(), - "Block #{number} requested twice" - ); - - if requested_blocks.len() == Self::MAX_CONCURRENT_BLOCKS || rng.gen() { - // Answer a random request. - let number = *requested_blocks.keys().choose(&mut rng).unwrap(); - let response = requested_blocks.remove(&number).unwrap(); - test_validators.send_block(number, response); - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(got) if got == number); - } - clock.advance(BLOCK_SLEEP_INTERVAL); - } - - // Answer all remaining requests. - for (number, response) in requested_blocks { - test_validators.send_block(number, response); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(got) if got == number); - } - - let expected_block_number = BlockNumber(Self::BLOCK_COUNT as u64 - 1); - wait_for_stored_block(ctx, storage.as_ref(), expected_block_number).await?; - Ok(()) - } -} - -#[tokio::test] -async fn updating_peer_state_with_multiple_blocks() { - test_peer_states(UpdatingPeerStateWithMultipleBlocks).await; -} - -#[derive(Debug)] -struct DownloadingBlocksInGaps { - local_block_numbers: Vec, - increase_peer_block_number_during_test: bool, -} - -impl DownloadingBlocksInGaps { - fn new(local_block_numbers: &[usize]) -> Self { - Self { - local_block_numbers: local_block_numbers - .iter() - .copied() - .inspect(|&number| assert!(number > 0 && number < Self::BLOCK_COUNT)) - .collect(), - increase_peer_block_number_during_test: false, - } - } -} - -#[async_trait] -impl Test for DownloadingBlocksInGaps { - const BLOCK_COUNT: usize = 10; - - fn tweak_config(&self, config: &mut Config) { - config.max_concurrent_blocks = 1; - // ^ Forces the node to download blocks in a deterministic order - config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; - } - - async fn initialize_storage( - &self, - ctx: &ctx::Ctx, - storage: &dyn WriteBlockStore, - test_validators: &TestValidators, - ) { - for &block_number in &self.local_block_numbers { - storage - .put_block(ctx, &test_validators.final_blocks[block_number]) - .await - .unwrap(); - } - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let peer_key = rng.gen::().public(); - let mut last_peer_block_number = if self.increase_peer_block_number_during_test { - rng.gen_range(1..Self::BLOCK_COUNT) - } else { - Self::BLOCK_COUNT - 1 - }; - peer_states_handle.update( - peer_key.clone(), - test_validators.sync_state(last_peer_block_number), - ); - wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; - clock.advance(BLOCK_SLEEP_INTERVAL); - - let expected_block_numbers = - (1..Self::BLOCK_COUNT).filter(|number| !self.local_block_numbers.contains(number)); - - // Check that all missing blocks are requested. - for expected_number in expected_block_numbers { - if expected_number > last_peer_block_number { - last_peer_block_number = rng.gen_range(expected_number..Self::BLOCK_COUNT); - peer_states_handle.update( - peer_key.clone(), - test_validators.sync_state(last_peer_block_number), - ); - // Wait until the update is processed. - wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; - - clock.advance(BLOCK_SLEEP_INTERVAL); - } - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message_receiver.recv(ctx).await?; - - assert_eq!(recipient, peer_key); - assert_eq!(number.0 as usize, expected_number); - test_validators.send_block(number, response); - wait_for_stored_block(ctx, storage.as_ref(), number).await?; - clock.advance(BLOCK_SLEEP_INTERVAL); - } - Ok(()) - } -} - -const LOCAL_BLOCK_NUMBERS: [&[usize]; 3] = [&[1, 9], &[3, 5, 6, 8], &[4]]; - -#[test_casing(6, Product((LOCAL_BLOCK_NUMBERS, [false, true])))] -#[tokio::test] -async fn downloading_blocks_in_gaps( - local_block_numbers: &[usize], - increase_peer_block_number_during_test: bool, -) { - let mut test = DownloadingBlocksInGaps::new(local_block_numbers); - test.increase_peer_block_number_during_test = increase_peer_block_number_during_test; - test_peer_states(test).await; -} - -#[derive(Debug)] -struct LimitingGetBlockConcurrency; - -#[async_trait] -impl Test for LimitingGetBlockConcurrency { - const BLOCK_COUNT: usize = 5; - - fn tweak_config(&self, config: &mut Config) { - config.max_concurrent_blocks = 3; - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - .. - } = handles; - let mut storage_subscriber = storage.subscribe_to_block_writes(); - - let peer_key = rng.gen::().public(); - peer_states_handle.update( - peer_key.clone(), - test_validators.sync_state(Self::BLOCK_COUNT - 1), - ); - wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; - - // The actor should request 3 new blocks it's now aware of from the only peer it's currently - // aware of. Note that blocks may be queried in any order. - let mut message_responses = HashMap::with_capacity(3); - for _ in 0..3 { - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, peer_key); - assert!(message_responses.insert(number.0, response).is_none()); - } - assert_matches!(message_receiver.try_recv(), None); - assert_eq!( - message_responses.keys().copied().collect::>(), - HashSet::from([1, 2, 3]) - ); - - // Send a correct response out of order. - let response = message_responses.remove(&3).unwrap(); - test_validators.send_block(BlockNumber(3), response); - - let saved_block = *sync::changed(ctx, &mut storage_subscriber).await?; - assert_eq!(saved_block, BlockNumber(3)); - - // The actor should now request another block. - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, number, .. - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(4)); - - Ok(()) - } -} - -#[tokio::test] -async fn limiting_get_block_concurrency() { - test_peer_states(LimitingGetBlockConcurrency).await; -} - -#[derive(Debug)] -struct RequestingBlocksFromTwoPeers; - -#[async_trait] -impl Test for RequestingBlocksFromTwoPeers { - const BLOCK_COUNT: usize = 5; - - fn tweak_config(&self, config: &mut Config) { - config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; - config.max_concurrent_blocks = 2; - config.max_concurrent_blocks_per_peer = 1; - // ^ Necessary for blocks numbers in tests to be deterministic - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let first_peer = rng.gen::().public(); - peer_states_handle.update(first_peer.clone(), test_validators.sync_state(2)); - wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number: first_peer_block_number, - response: first_peer_response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, first_peer); - assert!( - first_peer_block_number == BlockNumber(1) || first_peer_block_number == BlockNumber(2) - ); - - let second_peer = rng.gen::().public(); - peer_states_handle.update(second_peer.clone(), test_validators.sync_state(4)); - wait_for_peer_update(ctx, &mut events_receiver, &second_peer).await?; - clock.advance(BLOCK_SLEEP_INTERVAL); - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number: second_peer_block_number, - response: second_peer_response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, second_peer); - assert!( - second_peer_block_number == BlockNumber(1) - || second_peer_block_number == BlockNumber(2) - ); - - test_validators.send_block(first_peer_block_number, first_peer_response); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); - // The node shouldn't send more requests to the first peer since it would be beyond - // its known latest block number (2). - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(message_receiver.try_recv(), None); - - peer_states_handle.update(first_peer.clone(), test_validators.sync_state(4)); - wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; - clock.advance(BLOCK_SLEEP_INTERVAL); - // Now the actor can get block #3 from the peer. - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number: first_peer_block_number, - response: first_peer_response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, first_peer); - assert!( - first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) - ); - - test_validators.send_block(first_peer_block_number, first_peer_response); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); - clock.advance(BLOCK_SLEEP_INTERVAL); - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number: first_peer_block_number, - response: first_peer_response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, first_peer); - assert!( - first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) - ); - - test_validators.send_block(second_peer_block_number, second_peer_response); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == second_peer_block_number); - test_validators.send_block(first_peer_block_number, first_peer_response); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); - // No more blocks should be requested from peers. - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(message_receiver.try_recv(), None); - - wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(4)).await?; - Ok(()) - } -} - -#[tokio::test] -async fn requesting_blocks_from_two_peers() { - test_peer_states(RequestingBlocksFromTwoPeers).await; -} - -#[derive(Debug, Clone, Copy)] -struct PeerBehavior { - /// The peer will go offline after this block. - last_block: BlockNumber, - /// The peer will stop responding after this block, but will still announce `SyncState` updates. - /// Logically, should be `<= last_block`. - last_block_to_return: BlockNumber, -} - -impl Default for PeerBehavior { - fn default() -> Self { - Self { - last_block: BlockNumber(u64::MAX), - last_block_to_return: BlockNumber(u64::MAX), - } - } -} - -#[derive(Debug, Clone)] -struct RequestingBlocksFromMultiplePeers { - peer_behavior: Vec, - max_concurrent_blocks_per_peer: usize, - respond_probability: f64, -} - -impl RequestingBlocksFromMultiplePeers { - fn new(peer_count: usize, max_concurrent_blocks_per_peer: usize) -> Self { - Self { - peer_behavior: vec![PeerBehavior::default(); peer_count], - max_concurrent_blocks_per_peer, - respond_probability: 0.0, - } - } - - fn create_peers(&self, rng: &mut impl Rng) -> HashMap { - let last_block_number = BlockNumber(Self::BLOCK_COUNT as u64 - 1); - let peers = self.peer_behavior.iter().copied().map(|behavior| { - let behavior = PeerBehavior { - last_block: behavior.last_block.min(last_block_number), - last_block_to_return: behavior.last_block_to_return.min(last_block_number), - }; - let peer_key = rng.gen::().public(); - (peer_key, behavior) - }); - peers.collect() - } -} - -#[async_trait] -impl Test for RequestingBlocksFromMultiplePeers { - const BLOCK_COUNT: usize = 20; - - fn tweak_config(&self, config: &mut Config) { - config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; - config.max_concurrent_blocks_per_peer = self.max_concurrent_blocks_per_peer; - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let peers = &self.create_peers(&mut rng); - - scope::run!(ctx, |ctx, s| async { - // Announce peer states. - for (peer_key, peer) in peers { - let last_block = peer.last_block.0 as usize; - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(last_block)); - } - - s.spawn_bg(async { - let mut responses_by_peer: HashMap<_, Vec<_>> = HashMap::new(); - let mut requested_blocks = HashSet::new(); - while requested_blocks.len() < Self::BLOCK_COUNT - 1 { - let Ok(message) = message_receiver.recv(ctx).await else { - return Ok(()); // Test is finished - }; - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message; - - tracing::trace!("Block #{number} requested from {recipient:?}"); - assert!(number <= peers[&recipient].last_block); - - if peers[&recipient].last_block_to_return < number { - tracing::trace!("Dropping request for block #{number} to {recipient:?}"); - continue; - } - - assert!( - requested_blocks.insert(number), - "Block #{number} requested twice from a responsive peer" - ); - let peer_responses = responses_by_peer.entry(recipient).or_default(); - peer_responses.push((number, response)); - assert!(peer_responses.len() <= self.max_concurrent_blocks_per_peer); - if peer_responses.len() == self.max_concurrent_blocks_per_peer { - // Peer is at capacity, respond to a random request in order to progress - let idx = rng.gen_range(0..peer_responses.len()); - let (number, response) = peer_responses.remove(idx); - test_validators.send_block(number, response); - } - - // Respond to some other random requests. - for peer_responses in responses_by_peer.values_mut() { - // Indexes are reversed in order to not be affected by removals. - for idx in (0..peer_responses.len()).rev() { - if !rng.gen_bool(self.respond_probability) { - continue; - } - let (number, response) = peer_responses.remove(idx); - test_validators.send_block(number, response); - } - } - } - - // Answer to all remaining responses - for (number, response) in responses_by_peer.into_values().flatten() { - test_validators.send_block(number, response); - } - Ok(()) - }); - - // We advance the clock when a node receives a new block or updates a peer state, - // since in both cases some new blocks may become available for download. - let mut block_numbers = HashSet::with_capacity(Self::BLOCK_COUNT - 1); - while block_numbers.len() < Self::BLOCK_COUNT - 1 { - let peer_event = events_receiver.recv(ctx).await?; - match peer_event { - PeerStateEvent::GotBlock(number) => { - assert!( - block_numbers.insert(number), - "Block #{number} received twice" - ); - clock.advance(BLOCK_SLEEP_INTERVAL); - } - PeerStateEvent::PeerUpdated(_) => { - clock.advance(BLOCK_SLEEP_INTERVAL); - } - PeerStateEvent::PeerDisconnected(_) => { /* Do nothing */ } - _ => panic!("Unexpected peer event: {peer_event:?}"), - } - } - - wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(19)).await?; - Ok(()) - }) - .await - } -} - -const RESPOND_PROBABILITIES: [f64; 5] = [0.0, 0.1, 0.2, 0.5, 0.9]; - -#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] -#[tokio::test] -async fn requesting_blocks(max_concurrent_blocks_per_peer: usize, respond_probability: f64) { - let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); - test.respond_probability = respond_probability; - test_peer_states(test.clone()).await; -} - -#[derive(Debug)] -struct DisconnectingPeer; - -#[async_trait] -impl Test for DisconnectingPeer { - const BLOCK_COUNT: usize = 5; - - fn tweak_config(&self, config: &mut Config) { - config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; - } - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let peer_key = rng.gen::().public(); - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - // Drop the response sender emulating peer disconnect. - message_receiver.recv(ctx).await?; - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); - - // Check that no new requests are sent (there are no peers to send them to). - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(message_receiver.try_recv(), None); - - // Re-connect the peer with an updated state. - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - // Ensure that blocks are re-requested. - clock.advance(BLOCK_SLEEP_INTERVAL); - - let mut responses = HashMap::with_capacity(2); - for _ in 0..2 { - let message = message_receiver.recv(ctx).await?; - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message; - assert_eq!(recipient, peer_key); - assert!(responses.insert(number.0, response).is_none()); - } - - assert!(responses.contains_key(&1)); - assert!(responses.contains_key(&2)); - // Send one of the responses and drop the other request. - let response = responses.remove(&2).unwrap(); - test_validators.send_block(BlockNumber(2), response); - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(2))); - drop(responses); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); - - // Check that no new requests are sent (there are no peers to send them to). - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(message_receiver.try_recv(), None); - - // Re-connect the peer with the same state. - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - clock.advance(BLOCK_SLEEP_INTERVAL); - - let message = message_receiver.recv(ctx).await?; - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(1)); - test_validators.send_block(BlockNumber(1), response); - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(1))); - - // Check that no new requests are sent (all blocks are downloaded). - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(message_receiver.try_recv(), None); - - wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(2)).await?; - Ok(()) - } -} - -#[tokio::test] -async fn disconnecting_peer() { - test_peer_states(DisconnectingPeer).await; -} - -#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] -#[tokio::test] -async fn requesting_blocks_with_failures( - max_concurrent_blocks_per_peer: usize, - respond_probability: f64, -) { - let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); - test.respond_probability = respond_probability; - test.peer_behavior[0].last_block = BlockNumber(5); - test.peer_behavior[1].last_block = BlockNumber(15); - test_peer_states(test).await; -} - -#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] -#[tokio::test] -async fn requesting_blocks_with_unreliable_peers( - max_concurrent_blocks_per_peer: usize, - respond_probability: f64, -) { - let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); - test.respond_probability = respond_probability; - test.peer_behavior[0].last_block_to_return = BlockNumber(5); - test.peer_behavior[1].last_block_to_return = BlockNumber(15); - test_peer_states(test).await; -} - -#[tokio::test] -async fn processing_invalid_sync_states() { - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - let test_validators = TestValidators::new(4, 3, rng); - let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); - let storage = Arc::new(storage); - - let (message_sender, _) = channel::unbounded(); - let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); - - let mut invalid_sync_state = test_validators.sync_state(1); - invalid_sync_state.first_stored_block = test_validators.final_blocks[2].justification.clone(); - assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); - - let mut invalid_sync_state = test_validators.sync_state(1); - invalid_sync_state.last_contiguous_stored_block = - test_validators.final_blocks[2].justification.clone(); - assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); - - let mut invalid_sync_state = test_validators.sync_state(1); - invalid_sync_state - .last_contiguous_stored_block - .message - .proposal - .number = BlockNumber(5); - invalid_sync_state.last_stored_block.message.proposal.number = BlockNumber(5); - assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); - - let other_network = TestValidators::new(4, 2, rng); - let invalid_sync_state = other_network.sync_state(1); - assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); -} - -#[tokio::test] -async fn processing_invalid_blocks() { - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - let test_validators = TestValidators::new(4, 3, rng); - let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); - let storage = Arc::new(storage); - - let (message_sender, _) = channel::unbounded(); - let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); - - let invalid_block = &test_validators.final_blocks[0]; - let err = peer_states - .validate_block(BlockNumber(1), invalid_block) - .unwrap_err(); - assert_matches!( - err, - BlockValidationError::NumberMismatch { - requested: BlockNumber(1), - got: BlockNumber(0), - } - ); - - let mut invalid_block = test_validators.final_blocks[1].clone(); - invalid_block.justification = test_validators.final_blocks[0].justification.clone(); - let err = peer_states - .validate_block(BlockNumber(1), &invalid_block) - .unwrap_err(); - assert_matches!(err, BlockValidationError::ProposalMismatch { .. }); - - let mut invalid_block = test_validators.final_blocks[1].clone(); - invalid_block.payload = validator::Payload(b"invalid".to_vec()); - let err = peer_states - .validate_block(BlockNumber(1), &invalid_block) - .unwrap_err(); - assert_matches!(err, BlockValidationError::HashMismatch { .. }); - - let other_network = TestValidators::new(4, 2, rng); - let invalid_block = &other_network.final_blocks[1]; - let err = peer_states - .validate_block(BlockNumber(1), invalid_block) - .unwrap_err(); - assert_matches!(err, BlockValidationError::Justification(_)); -} - -#[derive(Debug)] -struct PeerWithFakeSyncState; - -#[async_trait] -impl Test for PeerWithFakeSyncState { - const BLOCK_COUNT: usize = 10; - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - mut events_receiver, - .. - } = handles; - - let peer_key = rng.gen::().public(); - let mut fake_sync_state = test_validators.sync_state(1); - fake_sync_state - .last_contiguous_stored_block - .message - .proposal - .number = BlockNumber(42); - peer_states_handle.update(peer_key.clone(), fake_sync_state); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::InvalidPeerUpdate(key) if key == peer_key); - - clock.advance(BLOCK_SLEEP_INTERVAL); - assert_matches!(events_receiver.try_recv(), None); - Ok(()) - } -} - -#[tokio::test] -async fn receiving_fake_sync_state_from_peer() { - test_peer_states(PeerWithFakeSyncState).await; -} - -#[derive(Debug)] -struct PeerWithFakeBlock; - -#[async_trait] -impl Test for PeerWithFakeBlock { - const BLOCK_COUNT: usize = 10; - - async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { - let TestHandles { - clock, - mut rng, - test_validators, - peer_states_handle, - storage, - mut message_receiver, - mut events_receiver, - } = handles; - - let peer_key = rng.gen::().public(); - peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); - - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, - number, - response, - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(1)); - - let mut fake_block = test_validators.final_blocks[2].clone(); - fake_block.header.number = BlockNumber(1); - response.send(Ok(fake_block)).unwrap(); - - let peer_event = events_receiver.recv(ctx).await?; - assert_matches!( - peer_event, - PeerStateEvent::GotInvalidBlock { - block_number: BlockNumber(1), - peer_key: key, - } if key == peer_key - ); - clock.advance(BLOCK_SLEEP_INTERVAL); - - // The invalid block must not be saved. - assert_matches!(events_receiver.try_recv(), None); - assert!(storage.block(ctx, BlockNumber(1)).await?.is_none()); - - // Since we don't ban misbehaving peers, the node will send a request to the same peer again. - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { - recipient, number, .. - }) = message_receiver.recv(ctx).await?; - assert_eq!(recipient, peer_key); - assert_eq!(number, BlockNumber(1)); - - Ok(()) - } -} - -#[tokio::test] -async fn receiving_fake_block_from_peer() { - test_peer_states(PeerWithFakeBlock).await; -} diff --git a/node/actors/sync_blocks/src/peers/tests/basics.rs b/node/actors/sync_blocks/src/peers/tests/basics.rs new file mode 100644 index 00000000..984ec1d9 --- /dev/null +++ b/node/actors/sync_blocks/src/peers/tests/basics.rs @@ -0,0 +1,517 @@ +//! Basic tests. + +use super::*; + +#[derive(Debug)] +struct UpdatingPeerStateWithSingleBlock; + +#[async_trait] +impl Test for UpdatingPeerStateWithSingleBlock { + const BLOCK_COUNT: usize = 2; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + let mut storage_subscriber = storage.subscribe_to_block_writes(); + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent a `get_block` request to the peer + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + // Emulate the peer sending a correct response. + test_validators.send_block(BlockNumber(1), response); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(1))); + + // Check that the block has been saved locally. + let saved_block = *sync::changed(ctx, &mut storage_subscriber).await?; + assert_eq!(saved_block, BlockNumber(1)); + Ok(()) + } +} + +#[tokio::test] +async fn updating_peer_state_with_single_block() { + test_peer_states(UpdatingPeerStateWithSingleBlock).await; +} + +#[derive(Debug)] +struct CancelingBlockRetrieval; + +#[async_trait] +impl Test for CancelingBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent a `get_block` request to the peer + let message = message_receiver.recv(ctx).await?; + assert_matches!( + message, + io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. }) + ); + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + // Retrieval of the block must be canceled. + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1))); + Ok(()) + } +} + +#[tokio::test] +async fn canceling_block_retrieval() { + test_peer_states(CancelingBlockRetrieval).await; +} + +#[derive(Debug)] +struct FilteringBlockRetrieval; + +#[async_trait] +impl Test for FilteringBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent `get_block` request to the peer, but only for block #2. + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(2)); + + assert!(message_receiver.try_recv().is_none()); + Ok(()) + } +} + +#[tokio::test] +async fn filtering_block_retrieval() { + test_peer_states(FilteringBlockRetrieval).await; +} + +#[derive(Debug)] +struct UpdatingPeerStateWithMultipleBlocks; + +impl UpdatingPeerStateWithMultipleBlocks { + const MAX_CONCURRENT_BLOCKS: usize = 3; +} + +#[async_trait] +impl Test for UpdatingPeerStateWithMultipleBlocks { + const BLOCK_COUNT: usize = 10; + + fn tweak_config(&self, config: &mut Config) { + config.max_concurrent_blocks_per_peer = Self::MAX_CONCURRENT_BLOCKS; + // ^ We want to test rate limiting for peers + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update( + peer_key.clone(), + test_validators.sync_state(Self::BLOCK_COUNT - 1), + ); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let mut requested_blocks = HashMap::with_capacity(Self::MAX_CONCURRENT_BLOCKS); + for _ in 1..Self::BLOCK_COUNT { + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message_receiver.recv(ctx).await?; + + tracing::trace!("Received request for block #{number}"); + assert_eq!(recipient, peer_key); + assert!( + requested_blocks.insert(number, response).is_none(), + "Block #{number} requested twice" + ); + + if requested_blocks.len() == Self::MAX_CONCURRENT_BLOCKS || rng.gen() { + // Answer a random request. + let number = *requested_blocks.keys().choose(&mut rng).unwrap(); + let response = requested_blocks.remove(&number).unwrap(); + test_validators.send_block(number, response); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(got) if got == number); + } + clock.advance(BLOCK_SLEEP_INTERVAL); + } + + // Answer all remaining requests. + for (number, response) in requested_blocks { + test_validators.send_block(number, response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(got) if got == number); + } + + let expected_block_number = BlockNumber(Self::BLOCK_COUNT as u64 - 1); + wait_for_stored_block(ctx, storage.as_ref(), expected_block_number).await?; + Ok(()) + } +} + +#[tokio::test] +async fn updating_peer_state_with_multiple_blocks() { + test_peer_states(UpdatingPeerStateWithMultipleBlocks).await; +} + +#[derive(Debug)] +struct DisconnectingPeer; + +#[async_trait] +impl Test for DisconnectingPeer { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Drop the response sender emulating peer disconnect. + message_receiver.recv(ctx).await?; + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); + + // Check that no new requests are sent (there are no peers to send them to). + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(message_receiver.try_recv(), None); + + // Re-connect the peer with an updated state. + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + // Ensure that blocks are re-requested. + clock.advance(BLOCK_SLEEP_INTERVAL); + + let mut responses = HashMap::with_capacity(2); + for _ in 0..2 { + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert!(responses.insert(number.0, response).is_none()); + } + + assert!(responses.contains_key(&1)); + assert!(responses.contains_key(&2)); + // Send one of the responses and drop the other request. + let response = responses.remove(&2).unwrap(); + test_validators.send_block(BlockNumber(2), response); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(2))); + drop(responses); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); + + // Check that no new requests are sent (there are no peers to send them to). + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(message_receiver.try_recv(), None); + + // Re-connect the peer with the same state. + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + clock.advance(BLOCK_SLEEP_INTERVAL); + + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + test_validators.send_block(BlockNumber(1), response); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(1))); + + // Check that no new requests are sent (all blocks are downloaded). + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(message_receiver.try_recv(), None); + + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(2)).await?; + Ok(()) + } +} + +#[tokio::test] +async fn disconnecting_peer() { + test_peer_states(DisconnectingPeer).await; +} + +#[derive(Debug)] +struct DownloadingBlocksInGaps { + local_block_numbers: Vec, + increase_peer_block_number_during_test: bool, +} + +impl DownloadingBlocksInGaps { + fn new(local_block_numbers: &[usize]) -> Self { + Self { + local_block_numbers: local_block_numbers + .iter() + .copied() + .inspect(|&number| assert!(number > 0 && number < Self::BLOCK_COUNT)) + .collect(), + increase_peer_block_number_during_test: false, + } + } +} + +#[async_trait] +impl Test for DownloadingBlocksInGaps { + const BLOCK_COUNT: usize = 10; + + fn tweak_config(&self, config: &mut Config) { + config.max_concurrent_blocks = 1; + // ^ Forces the node to download blocks in a deterministic order + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn initialize_storage( + &self, + ctx: &ctx::Ctx, + storage: &dyn WriteBlockStore, + test_validators: &TestValidators, + ) { + for &block_number in &self.local_block_numbers { + storage + .put_block(ctx, &test_validators.final_blocks[block_number]) + .await + .unwrap(); + } + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peer_key = rng.gen::().public(); + let mut last_peer_block_number = if self.increase_peer_block_number_during_test { + rng.gen_range(1..Self::BLOCK_COUNT) + } else { + Self::BLOCK_COUNT - 1 + }; + peer_states_handle.update( + peer_key.clone(), + test_validators.sync_state(last_peer_block_number), + ); + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); + + let expected_block_numbers = + (1..Self::BLOCK_COUNT).filter(|number| !self.local_block_numbers.contains(number)); + + // Check that all missing blocks are requested. + for expected_number in expected_block_numbers { + if expected_number > last_peer_block_number { + last_peer_block_number = rng.gen_range(expected_number..Self::BLOCK_COUNT); + peer_states_handle.update( + peer_key.clone(), + test_validators.sync_state(last_peer_block_number), + ); + // Wait until the update is processed. + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; + + clock.advance(BLOCK_SLEEP_INTERVAL); + } + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message_receiver.recv(ctx).await?; + + assert_eq!(recipient, peer_key); + assert_eq!(number.0 as usize, expected_number); + test_validators.send_block(number, response); + wait_for_stored_block(ctx, storage.as_ref(), number).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); + } + Ok(()) + } +} + +const LOCAL_BLOCK_NUMBERS: [&[usize]; 3] = [&[1, 9], &[3, 5, 6, 8], &[4]]; + +#[test_casing(6, Product((LOCAL_BLOCK_NUMBERS, [false, true])))] +#[tokio::test] +async fn downloading_blocks_in_gaps( + local_block_numbers: &[usize], + increase_peer_block_number_during_test: bool, +) { + let mut test = DownloadingBlocksInGaps::new(local_block_numbers); + test.increase_peer_block_number_during_test = increase_peer_block_number_during_test; + test_peer_states(test).await; +} + +#[derive(Debug)] +struct LimitingGetBlockConcurrency; + +#[async_trait] +impl Test for LimitingGetBlockConcurrency { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.max_concurrent_blocks = 3; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + let mut storage_subscriber = storage.subscribe_to_block_writes(); + + let peer_key = rng.gen::().public(); + peer_states_handle.update( + peer_key.clone(), + test_validators.sync_state(Self::BLOCK_COUNT - 1), + ); + wait_for_peer_update(ctx, &mut events_receiver, &peer_key).await?; + + // The actor should request 3 new blocks it's now aware of from the only peer it's currently + // aware of. Note that blocks may be queried in any order. + let mut message_responses = HashMap::with_capacity(3); + for _ in 0..3 { + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert!(message_responses.insert(number.0, response).is_none()); + } + assert_matches!(message_receiver.try_recv(), None); + assert_eq!( + message_responses.keys().copied().collect::>(), + HashSet::from([1, 2, 3]) + ); + + // Send a correct response out of order. + let response = message_responses.remove(&3).unwrap(); + test_validators.send_block(BlockNumber(3), response); + + let saved_block = *sync::changed(ctx, &mut storage_subscriber).await?; + assert_eq!(saved_block, BlockNumber(3)); + + // The actor should now request another block. + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(4)); + + Ok(()) + } +} + +#[tokio::test] +async fn limiting_get_block_concurrency() { + test_peer_states(LimitingGetBlockConcurrency).await; +} diff --git a/node/actors/sync_blocks/src/peers/tests/fakes.rs b/node/actors/sync_blocks/src/peers/tests/fakes.rs new file mode 100644 index 00000000..1c59758d --- /dev/null +++ b/node/actors/sync_blocks/src/peers/tests/fakes.rs @@ -0,0 +1,186 @@ +//! Tests focused on handling peers providing fake information to the node. + +use super::*; + +#[tokio::test] +async fn processing_invalid_sync_states() { + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let test_validators = TestValidators::new(4, 3, rng); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); + + let (message_sender, _) = channel::unbounded(); + let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); + + let mut invalid_sync_state = test_validators.sync_state(1); + invalid_sync_state.first_stored_block = test_validators.final_blocks[2].justification.clone(); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); + + let mut invalid_sync_state = test_validators.sync_state(1); + invalid_sync_state.last_contiguous_stored_block = + test_validators.final_blocks[2].justification.clone(); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); + + let mut invalid_sync_state = test_validators.sync_state(1); + invalid_sync_state + .last_contiguous_stored_block + .message + .proposal + .number = BlockNumber(5); + invalid_sync_state.last_stored_block.message.proposal.number = BlockNumber(5); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); + + let other_network = TestValidators::new(4, 2, rng); + let invalid_sync_state = other_network.sync_state(1); + assert!(peer_states.validate_sync_state(invalid_sync_state).is_err()); +} + +#[tokio::test] +async fn processing_invalid_blocks() { + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let test_validators = TestValidators::new(4, 3, rng); + let storage = InMemoryStorage::new(test_validators.final_blocks[0].clone()); + let storage = Arc::new(storage); + + let (message_sender, _) = channel::unbounded(); + let (peer_states, _) = PeerStates::new(message_sender, storage, test_validators.test_config()); + + let invalid_block = &test_validators.final_blocks[0]; + let err = peer_states + .validate_block(BlockNumber(1), invalid_block) + .unwrap_err(); + assert_matches!( + err, + BlockValidationError::NumberMismatch { + requested: BlockNumber(1), + got: BlockNumber(0), + } + ); + + let mut invalid_block = test_validators.final_blocks[1].clone(); + invalid_block.justification = test_validators.final_blocks[0].justification.clone(); + let err = peer_states + .validate_block(BlockNumber(1), &invalid_block) + .unwrap_err(); + assert_matches!(err, BlockValidationError::ProposalMismatch { .. }); + + let mut invalid_block = test_validators.final_blocks[1].clone(); + invalid_block.payload = validator::Payload(b"invalid".to_vec()); + let err = peer_states + .validate_block(BlockNumber(1), &invalid_block) + .unwrap_err(); + assert_matches!(err, BlockValidationError::HashMismatch { .. }); + + let other_network = TestValidators::new(4, 2, rng); + let invalid_block = &other_network.final_blocks[1]; + let err = peer_states + .validate_block(BlockNumber(1), invalid_block) + .unwrap_err(); + assert_matches!(err, BlockValidationError::Justification(_)); +} + +#[derive(Debug)] +struct PeerWithFakeSyncState; + +#[async_trait] +impl Test for PeerWithFakeSyncState { + const BLOCK_COUNT: usize = 10; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + mut events_receiver, + .. + } = handles; + + let peer_key = rng.gen::().public(); + let mut fake_sync_state = test_validators.sync_state(1); + fake_sync_state + .last_contiguous_stored_block + .message + .proposal + .number = BlockNumber(42); + peer_states_handle.update(peer_key.clone(), fake_sync_state); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::InvalidPeerUpdate(key) if key == peer_key); + + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(events_receiver.try_recv(), None); + Ok(()) + } +} + +#[tokio::test] +async fn receiving_fake_sync_state_from_peer() { + test_peer_states(PeerWithFakeSyncState).await; +} + +#[derive(Debug)] +struct PeerWithFakeBlock; + +#[async_trait] +impl Test for PeerWithFakeBlock { + const BLOCK_COUNT: usize = 10; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + let mut fake_block = test_validators.final_blocks[2].clone(); + fake_block.header.number = BlockNumber(1); + response.send(Ok(fake_block)).unwrap(); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!( + peer_event, + PeerStateEvent::GotInvalidBlock { + block_number: BlockNumber(1), + peer_key: key, + } if key == peer_key + ); + clock.advance(BLOCK_SLEEP_INTERVAL); + + // The invalid block must not be saved. + assert_matches!(events_receiver.try_recv(), None); + assert!(storage.block(ctx, BlockNumber(1)).await?.is_none()); + + // Since we don't ban misbehaving peers, the node will send a request to the same peer again. + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + Ok(()) + } +} + +#[tokio::test] +async fn receiving_fake_block_from_peer() { + test_peer_states(PeerWithFakeBlock).await; +} diff --git a/node/actors/sync_blocks/src/peers/tests/mod.rs b/node/actors/sync_blocks/src/peers/tests/mod.rs new file mode 100644 index 00000000..33f4ba16 --- /dev/null +++ b/node/actors/sync_blocks/src/peers/tests/mod.rs @@ -0,0 +1,134 @@ +use super::*; +use crate::tests::TestValidators; +use assert_matches::assert_matches; +use async_trait::async_trait; +use rand::{rngs::StdRng, seq::IteratorRandom, Rng}; +use std::{collections::HashSet, fmt}; +use test_casing::{test_casing, Product}; +use zksync_concurrency::{testonly::abort_on_panic, time}; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::{BlockStore, InMemoryStorage, StorageError}; + +mod basics; +mod fakes; +mod multiple_peers; +mod snapshots; + +const TEST_TIMEOUT: time::Duration = time::Duration::seconds(5); +const BLOCK_SLEEP_INTERVAL: time::Duration = time::Duration::milliseconds(5); + +#[derive(Debug)] +struct TestHandles { + clock: ctx::ManualClock, + rng: StdRng, + test_validators: TestValidators, + peer_states_handle: PeerStatesHandle, + storage: Arc, + message_receiver: channel::UnboundedReceiver, + events_receiver: channel::UnboundedReceiver, +} + +#[async_trait] +trait Test: fmt::Debug + Send + Sync { + const BLOCK_COUNT: usize; + const GENESIS_BLOCK_NUMBER: usize = 0; + + fn tweak_config(&self, _config: &mut Config) { + // Does nothing by default + } + + async fn initialize_storage( + &self, + _ctx: &ctx::Ctx, + _storage: &dyn WriteBlockStore, + _test_validators: &TestValidators, + ) { + // Does nothing by default + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()>; +} + +#[instrument(level = "trace", skip(ctx, storage), err)] +async fn wait_for_stored_block( + ctx: &ctx::Ctx, + storage: &dyn WriteBlockStore, + expected_block_number: BlockNumber, +) -> ctx::OrCanceled<()> { + tracing::trace!("Started waiting for stored block"); + let mut subscriber = storage.subscribe_to_block_writes(); + let mut got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); + + while got_block < expected_block_number { + sync::changed(ctx, &mut subscriber).await?; + got_block = storage.last_contiguous_block_number(ctx).await.unwrap(); + } + Ok(()) +} + +#[instrument(level = "trace", skip(ctx, events_receiver))] +async fn wait_for_peer_update( + ctx: &ctx::Ctx, + events_receiver: &mut channel::UnboundedReceiver, + expected_peer: &node::PublicKey, +) -> ctx::OrCanceled<()> { + loop { + let peer_event = events_receiver.recv(ctx).await?; + tracing::trace!(?peer_event, "received peer event"); + match peer_event { + PeerStateEvent::PeerUpdated(key) => { + assert_eq!(key, *expected_peer); + return Ok(()); + } + PeerStateEvent::PeerDisconnected(_) | PeerStateEvent::GotBlock(_) => { + // Skip update + } + _ => panic!("Received unexpected peer event: {peer_event:?}"), + } + } +} + +#[instrument(level = "trace")] +async fn test_peer_states(test: T) { + abort_on_panic(); + + let ctx = &ctx::test_root(&ctx::RealClock).with_timeout(TEST_TIMEOUT); + let clock = ctx::ManualClock::new(); + let ctx = &ctx::test_with_clock(ctx, &clock); + let mut rng = ctx.rng(); + let test_validators = TestValidators::new(4, T::BLOCK_COUNT, &mut rng); + let storage = + InMemoryStorage::new(test_validators.final_blocks[T::GENESIS_BLOCK_NUMBER].clone()); + let storage = Arc::new(storage); + test.initialize_storage(ctx, storage.as_ref(), &test_validators) + .await; + + let (message_sender, message_receiver) = channel::unbounded(); + let (events_sender, events_receiver) = channel::unbounded(); + let mut config = test_validators.test_config(); + test.tweak_config(&mut config); + let (mut peer_states, peer_states_handle) = + PeerStates::new(message_sender, storage.clone(), config); + peer_states.events_sender = Some(events_sender); + let test_handles = TestHandles { + clock, + rng, + test_validators, + peer_states_handle, + storage, + message_receiver, + events_receiver, + }; + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(async { + peer_states.run(ctx).await.or_else(|err| match err { + StorageError::Canceled(_) => Ok(()), // Swallow cancellation errors after the test is finished + StorageError::Database(err) => Err(err), + }) + }); + test.test(ctx, test_handles).await + }) + .await + .unwrap(); +} diff --git a/node/actors/sync_blocks/src/peers/tests/multiple_peers.rs b/node/actors/sync_blocks/src/peers/tests/multiple_peers.rs new file mode 100644 index 00000000..74bf5480 --- /dev/null +++ b/node/actors/sync_blocks/src/peers/tests/multiple_peers.rs @@ -0,0 +1,312 @@ +//! Tests focused on interaction with multiple peers. + +use super::*; + +#[derive(Debug)] +struct RequestingBlocksFromTwoPeers; + +#[async_trait] +impl Test for RequestingBlocksFromTwoPeers { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + config.max_concurrent_blocks = 2; + config.max_concurrent_blocks_per_peer = 1; + // ^ Necessary for blocks numbers in tests to be deterministic + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let first_peer = rng.gen::().public(); + peer_states_handle.update(first_peer.clone(), test_validators.sync_state(2)); + wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number: first_peer_block_number, + response: first_peer_response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, first_peer); + assert!( + first_peer_block_number == BlockNumber(1) || first_peer_block_number == BlockNumber(2) + ); + + let second_peer = rng.gen::().public(); + peer_states_handle.update(second_peer.clone(), test_validators.sync_state(4)); + wait_for_peer_update(ctx, &mut events_receiver, &second_peer).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number: second_peer_block_number, + response: second_peer_response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, second_peer); + assert!( + second_peer_block_number == BlockNumber(1) + || second_peer_block_number == BlockNumber(2) + ); + + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); + // The node shouldn't send more requests to the first peer since it would be beyond + // its known latest block number (2). + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(message_receiver.try_recv(), None); + + peer_states_handle.update(first_peer.clone(), test_validators.sync_state(4)); + wait_for_peer_update(ctx, &mut events_receiver, &first_peer).await?; + clock.advance(BLOCK_SLEEP_INTERVAL); + // Now the actor can get block #3 from the peer. + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number: first_peer_block_number, + response: first_peer_response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, first_peer); + assert!( + first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) + ); + + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); + clock.advance(BLOCK_SLEEP_INTERVAL); + + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number: first_peer_block_number, + response: first_peer_response, + }) = message_receiver.recv(ctx).await?; + assert_eq!(recipient, first_peer); + assert!( + first_peer_block_number == BlockNumber(3) || first_peer_block_number == BlockNumber(4) + ); + + test_validators.send_block(second_peer_block_number, second_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == second_peer_block_number); + test_validators.send_block(first_peer_block_number, first_peer_response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(num) if num == first_peer_block_number); + // No more blocks should be requested from peers. + clock.advance(BLOCK_SLEEP_INTERVAL); + assert_matches!(message_receiver.try_recv(), None); + + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(4)).await?; + Ok(()) + } +} + +#[tokio::test] +async fn requesting_blocks_from_two_peers() { + test_peer_states(RequestingBlocksFromTwoPeers).await; +} + +#[derive(Debug, Clone, Copy)] +struct PeerBehavior { + /// The peer will go offline after this block. + last_block: BlockNumber, + /// The peer will stop responding after this block, but will still announce `SyncState` updates. + /// Logically, should be `<= last_block`. + last_block_to_return: BlockNumber, +} + +impl Default for PeerBehavior { + fn default() -> Self { + Self { + last_block: BlockNumber(u64::MAX), + last_block_to_return: BlockNumber(u64::MAX), + } + } +} + +#[derive(Debug, Clone)] +struct RequestingBlocksFromMultiplePeers { + peer_behavior: Vec, + max_concurrent_blocks_per_peer: usize, + respond_probability: f64, +} + +impl RequestingBlocksFromMultiplePeers { + fn new(peer_count: usize, max_concurrent_blocks_per_peer: usize) -> Self { + Self { + peer_behavior: vec![PeerBehavior::default(); peer_count], + max_concurrent_blocks_per_peer, + respond_probability: 0.0, + } + } + + fn create_peers(&self, rng: &mut impl Rng) -> HashMap { + let last_block_number = BlockNumber(Self::BLOCK_COUNT as u64 - 1); + let peers = self.peer_behavior.iter().copied().map(|behavior| { + let behavior = PeerBehavior { + last_block: behavior.last_block.min(last_block_number), + last_block_to_return: behavior.last_block_to_return.min(last_block_number), + }; + let peer_key = rng.gen::().public(); + (peer_key, behavior) + }); + peers.collect() + } +} + +#[async_trait] +impl Test for RequestingBlocksFromMultiplePeers { + const BLOCK_COUNT: usize = 20; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + config.max_concurrent_blocks_per_peer = self.max_concurrent_blocks_per_peer; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + clock, + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + } = handles; + + let peers = &self.create_peers(&mut rng); + + scope::run!(ctx, |ctx, s| async { + // Announce peer states. + for (peer_key, peer) in peers { + let last_block = peer.last_block.0 as usize; + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(last_block)); + } + + s.spawn_bg(async { + let mut responses_by_peer: HashMap<_, Vec<_>> = HashMap::new(); + let mut requested_blocks = HashSet::new(); + while requested_blocks.len() < Self::BLOCK_COUNT - 1 { + let Ok(message) = message_receiver.recv(ctx).await else { + return Ok(()); // Test is finished + }; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + + tracing::trace!("Block #{number} requested from {recipient:?}"); + assert!(number <= peers[&recipient].last_block); + + if peers[&recipient].last_block_to_return < number { + tracing::trace!("Dropping request for block #{number} to {recipient:?}"); + continue; + } + + assert!( + requested_blocks.insert(number), + "Block #{number} requested twice from a responsive peer" + ); + let peer_responses = responses_by_peer.entry(recipient).or_default(); + peer_responses.push((number, response)); + assert!(peer_responses.len() <= self.max_concurrent_blocks_per_peer); + if peer_responses.len() == self.max_concurrent_blocks_per_peer { + // Peer is at capacity, respond to a random request in order to progress + let idx = rng.gen_range(0..peer_responses.len()); + let (number, response) = peer_responses.remove(idx); + test_validators.send_block(number, response); + } + + // Respond to some other random requests. + for peer_responses in responses_by_peer.values_mut() { + // Indexes are reversed in order to not be affected by removals. + for idx in (0..peer_responses.len()).rev() { + if !rng.gen_bool(self.respond_probability) { + continue; + } + let (number, response) = peer_responses.remove(idx); + test_validators.send_block(number, response); + } + } + } + + // Answer to all remaining responses + for (number, response) in responses_by_peer.into_values().flatten() { + test_validators.send_block(number, response); + } + Ok(()) + }); + + // We advance the clock when a node receives a new block or updates a peer state, + // since in both cases some new blocks may become available for download. + let mut block_numbers = HashSet::with_capacity(Self::BLOCK_COUNT - 1); + while block_numbers.len() < Self::BLOCK_COUNT - 1 { + let peer_event = events_receiver.recv(ctx).await?; + match peer_event { + PeerStateEvent::GotBlock(number) => { + assert!( + block_numbers.insert(number), + "Block #{number} received twice" + ); + clock.advance(BLOCK_SLEEP_INTERVAL); + } + PeerStateEvent::PeerUpdated(_) => { + clock.advance(BLOCK_SLEEP_INTERVAL); + } + PeerStateEvent::PeerDisconnected(_) => { /* Do nothing */ } + _ => panic!("Unexpected peer event: {peer_event:?}"), + } + } + + wait_for_stored_block(ctx, storage.as_ref(), BlockNumber(19)).await?; + Ok(()) + }) + .await + } +} + +const RESPOND_PROBABILITIES: [f64; 5] = [0.0, 0.1, 0.2, 0.5, 0.9]; + +#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] +#[tokio::test] +async fn requesting_blocks(max_concurrent_blocks_per_peer: usize, respond_probability: f64) { + let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); + test.respond_probability = respond_probability; + test_peer_states(test.clone()).await; +} + +#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] +#[tokio::test] +async fn requesting_blocks_with_failures( + max_concurrent_blocks_per_peer: usize, + respond_probability: f64, +) { + let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); + test.respond_probability = respond_probability; + test.peer_behavior[0].last_block = BlockNumber(5); + test.peer_behavior[1].last_block = BlockNumber(15); + test_peer_states(test).await; +} + +#[test_casing(15, Product(([1, 2, 3], RESPOND_PROBABILITIES)))] +#[tokio::test] +async fn requesting_blocks_with_unreliable_peers( + max_concurrent_blocks_per_peer: usize, + respond_probability: f64, +) { + let mut test = RequestingBlocksFromMultiplePeers::new(3, max_concurrent_blocks_per_peer); + test.respond_probability = respond_probability; + test.peer_behavior[0].last_block_to_return = BlockNumber(5); + test.peer_behavior[1].last_block_to_return = BlockNumber(15); + test_peer_states(test).await; +} diff --git a/node/actors/sync_blocks/src/peers/tests/snapshots.rs b/node/actors/sync_blocks/src/peers/tests/snapshots.rs new file mode 100644 index 00000000..c95e8fb2 --- /dev/null +++ b/node/actors/sync_blocks/src/peers/tests/snapshots.rs @@ -0,0 +1,319 @@ +//! Tests related to snapshot storage. + +use super::*; +use zksync_consensus_network::io::GetBlockError; + +#[derive(Debug)] +struct UpdatingPeerStateWithStorageSnapshot; + +#[async_trait] +impl Test for UpdatingPeerStateWithStorageSnapshot { + const BLOCK_COUNT: usize = 5; + const GENESIS_BLOCK_NUMBER: usize = 2; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + clock, + } = handles; + let mut storage_subscriber = storage.subscribe_to_block_writes(); + + let peer_key = rng.gen::().public(); + for stale_block_number in [1, 2] { + peer_states_handle.update( + peer_key.clone(), + test_validators.sync_state(stale_block_number), + ); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // No new block requests should be issued. + clock.advance(BLOCK_SLEEP_INTERVAL); + sync::yield_now().await; + assert!(message_receiver.try_recv().is_none()); + } + + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent a `get_block` request to the peer + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(3)); + + // Emulate the peer sending a correct response. + test_validators.send_block(BlockNumber(3), response); + + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(3))); + + // Check that the block has been saved locally. + let saved_block = *sync::changed(ctx, &mut storage_subscriber).await?; + assert_eq!(saved_block, BlockNumber(3)); + Ok(()) + } +} + +#[tokio::test] +async fn updating_peer_state_with_storage_snapshot() { + test_peer_states(UpdatingPeerStateWithStorageSnapshot).await; +} + +#[derive(Debug)] +struct FilteringRequestsForSnapshotPeer; + +#[async_trait] +impl Test for FilteringRequestsForSnapshotPeer { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + mut message_receiver, + mut events_receiver, + clock, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.snapshot_sync_state(2..=2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // The peer should only be queried for blocks that it actually has (#2 in this case). + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(2)); + + // Emulate the peer sending a correct response. + test_validators.send_block(BlockNumber(2), response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(2))); + + // No further requests should be made. + clock.advance(BLOCK_SLEEP_INTERVAL); + sync::yield_now().await; + assert!(message_receiver.try_recv().is_none()); + + // Emulate peer receiving / producing a new block. + peer_states_handle.update(peer_key.clone(), test_validators.snapshot_sync_state(2..=3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response: block3_response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(3)); + + // Emulate another peer with full history. + let full_peer_key = rng.gen::().public(); + peer_states_handle.update(full_peer_key.clone(), test_validators.sync_state(3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == full_peer_key); + + clock.advance(BLOCK_SLEEP_INTERVAL); + + // A node should only request block #1 from the peer; block #3 is already requested, + // and it has #2 locally. + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, full_peer_key); + assert_eq!(number, BlockNumber(1)); + + test_validators.send_block(BlockNumber(1), response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(1))); + + drop(block3_response); // Emulate first peer disconnecting. + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerDisconnected(key) if key == peer_key); + clock.advance(BLOCK_SLEEP_INTERVAL); + + // Now, block #3 will be requested from the peer with full history. + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message; + assert_eq!(recipient, full_peer_key); + assert_eq!(number, BlockNumber(3)); + + Ok(()) + } +} + +#[tokio::test] +async fn filtering_requests_for_snapshot_peer() { + test_peer_states(FilteringRequestsForSnapshotPeer).await; +} + +#[derive(Debug)] +struct PruningPeerHistory; + +#[async_trait] +impl Test for PruningPeerHistory { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + mut message_receiver, + mut events_receiver, + clock, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response: block1_response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(1)); + + // Emulate peer pruning blocks. + peer_states_handle.update(peer_key.clone(), test_validators.snapshot_sync_state(3..=3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + response, + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(3)); + + test_validators.send_block(BlockNumber(3), response); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::GotBlock(BlockNumber(3))); + + // No new blocks should be requested (the peer has no block #2). + clock.advance(BLOCK_SLEEP_INTERVAL); + sync::yield_now().await; + assert!(message_receiver.try_recv().is_none()); + + block1_response.send(Err(GetBlockError::NotSynced)).unwrap(); + // Block #1 should not be requested again (the peer no longer has it). + clock.advance(BLOCK_SLEEP_INTERVAL); + sync::yield_now().await; + assert!(message_receiver.try_recv().is_none()); + + Ok(()) + } +} + +#[tokio::test] +async fn pruning_peer_history() { + test_peer_states(PruningPeerHistory).await; +} + +#[derive(Debug)] +struct BackfillingPeerHistory; + +#[async_trait] +impl Test for BackfillingPeerHistory { + const BLOCK_COUNT: usize = 5; + + fn tweak_config(&self, config: &mut Config) { + config.sleep_interval_for_get_block = BLOCK_SLEEP_INTERVAL; + } + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + mut message_receiver, + mut events_receiver, + clock, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.snapshot_sync_state(3..=3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(3)); + + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(3)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + clock.advance(BLOCK_SLEEP_INTERVAL); + let mut new_requested_numbers = HashSet::new(); + for _ in 0..2 { + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + .. + }) = message; + assert_eq!(recipient, peer_key); + new_requested_numbers.insert(number); + } + assert_eq!( + new_requested_numbers, + HashSet::from([BlockNumber(1), BlockNumber(2)]) + ); + + Ok(()) + } +} + +#[tokio::test] +async fn backfilling_peer_history() { + test_peer_states(BackfillingPeerHistory).await; +} diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index fe0fe2b2..25569cba 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -4,7 +4,7 @@ use rand::{ distributions::{Distribution, Standard}, Rng, }; -use std::iter; +use std::{iter, ops}; use zksync_concurrency::{oneshot, testonly::abort_on_panic, time}; use zksync_consensus_network::io::{GetBlockError, GetBlockResponse, SyncBlocksRequest}; use zksync_consensus_roles::validator::{ @@ -81,8 +81,21 @@ impl TestValidators { } pub(crate) fn sync_state(&self, last_block_number: usize) -> SyncState { - let first_block = self.final_blocks[0].justification.clone(); - let last_block = self.final_blocks[last_block_number].justification.clone(); + self.snapshot_sync_state(1..=last_block_number) + } + + pub(crate) fn snapshot_sync_state( + &self, + block_numbers: ops::RangeInclusive, + ) -> SyncState { + assert!(!block_numbers.is_empty()); + + let first_block = self.final_blocks[*block_numbers.start()] + .justification + .clone(); + let last_block = self.final_blocks[*block_numbers.end()] + .justification + .clone(); SyncState { first_stored_block: first_block, last_contiguous_stored_block: last_block.clone(), diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index ae7879b5..3b213eab 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -106,7 +106,7 @@ impl RocksdbStorage { let this = Self { inner: RwLock::new(db), - cached_last_contiguous_block_number: AtomicU64::new(0), + cached_last_contiguous_block_number: AtomicU64::new(genesis_block.header.number.0), block_writes_sender: watch::channel(genesis_block.header.number).0, }; if let Some(stored_genesis_block) = this.block(ctx, genesis_block.header.number).await? { diff --git a/node/libs/storage/src/tests/mod.rs b/node/libs/storage/src/tests/mod.rs index 7f840de3..2ca0abcb 100644 --- a/node/libs/storage/src/tests/mod.rs +++ b/node/libs/storage/src/tests/mod.rs @@ -3,6 +3,7 @@ use crate::types::ReplicaState; use async_trait::async_trait; use rand::{seq::SliceRandom, Rng}; use std::iter; +use test_casing::test_casing; use zksync_concurrency::ctx; use zksync_consensus_roles::validator::{ testonly::make_block, BlockHeader, BlockNumber, FinalBlock, Payload, @@ -77,26 +78,34 @@ async fn putting_block_for_in_memory_store() { test_put_block(&()).await; } -async fn test_get_missing_block_numbers(store_factory: &impl InitStore) { +async fn test_get_missing_block_numbers(store_factory: &impl InitStore, skip_count: usize) { + assert!(skip_count < 100); + let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let genesis_block = genesis_block(rng); + let mut genesis_block = genesis_block(rng); + let mut blocks = gen_blocks(rng, genesis_block.clone(), 100); + if skip_count > 0 { + genesis_block = blocks[skip_count - 1].clone(); + blocks = blocks[skip_count..].to_vec(); + } + let block_range = BlockNumber(skip_count as u64)..BlockNumber(101); + let block_store = store_factory.init_store(ctx, &genesis_block).await; - let mut blocks = gen_blocks(rng, genesis_block, 100); blocks.shuffle(rng); assert!(block_store - .missing_block_numbers(ctx, BlockNumber(0)..BlockNumber(101)) + .missing_block_numbers(ctx, block_range.clone()) .await .unwrap() .into_iter() .map(|number| number.0) - .eq(1..101)); + .eq(skip_count as u64 + 1..101)); for (i, block) in blocks.iter().enumerate() { block_store.put_block(ctx, block).await.unwrap(); let missing_block_numbers = block_store - .missing_block_numbers(ctx, BlockNumber(0)..BlockNumber(101)) + .missing_block_numbers(ctx, block_range.clone()) .await .unwrap(); let last_contiguous_block_number = @@ -120,7 +129,13 @@ async fn test_get_missing_block_numbers(store_factory: &impl InitStore) { #[tokio::test] async fn getting_missing_block_numbers_for_in_memory_store() { - test_get_missing_block_numbers(&()).await; + test_get_missing_block_numbers(&(), 0).await; +} + +#[test_casing(4, [1, 10, 23, 42])] +#[tokio::test] +async fn getting_missing_block_numbers_for_snapshot(skip_count: usize) { + test_get_missing_block_numbers(&(), skip_count).await; } #[test] diff --git a/node/libs/storage/src/tests/rocksdb.rs b/node/libs/storage/src/tests/rocksdb.rs index 0188143b..26ef9066 100644 --- a/node/libs/storage/src/tests/rocksdb.rs +++ b/node/libs/storage/src/tests/rocksdb.rs @@ -39,5 +39,11 @@ async fn putting_block_for_rocksdb_store() { #[tokio::test] async fn getting_missing_block_numbers_for_rocksdb_store() { - test_get_missing_block_numbers(&TempDir::new().unwrap()).await; + test_get_missing_block_numbers(&TempDir::new().unwrap(), 0).await; +} + +#[test_casing(4, [1, 10, 23, 42])] +#[tokio::test] +async fn getting_missing_block_numbers_for_rocksdb_snapshot(skip_count: usize) { + test_get_missing_block_numbers(&TempDir::new().unwrap(), skip_count).await; } diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs index b53f6d4c..9aa9298f 100644 --- a/node/libs/storage/src/traits.rs +++ b/node/libs/storage/src/traits.rs @@ -19,6 +19,10 @@ pub trait BlockStore: fmt::Debug + Send + Sync { /// Returns the number of the last block in the first contiguous range of blocks stored in this DB. /// If there are no missing blocks, this is equal to the number of [`Self::get_head_block()`], /// if there *are* missing blocks, the returned number will be lower. + /// + /// The returned number cannot underflow the [first block](Self::first_block()) stored in the DB; + /// all blocks preceding the first block are ignored when computing this number. For example, + /// if the storage contains blocks #5, 6 and 9, this method will return 6. async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> StorageResult; /// Gets a block by its number.