From d20d2ec0a799daeb97356cb5b50431b29d25313d Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 7 Nov 2023 11:38:43 +0200 Subject: [PATCH] Test cancellation / filtering of `GetBlock` --- node/actors/sync_blocks/src/peers/events.rs | 2 + node/actors/sync_blocks/src/peers/mod.rs | 3 + node/actors/sync_blocks/src/peers/tests.rs | 92 +++++++++++++++++++++ 3 files changed, 97 insertions(+) diff --git a/node/actors/sync_blocks/src/peers/events.rs b/node/actors/sync_blocks/src/peers/events.rs index 97980d2a..c6757372 100644 --- a/node/actors/sync_blocks/src/peers/events.rs +++ b/node/actors/sync_blocks/src/peers/events.rs @@ -8,6 +8,8 @@ use roles::{node, validator::BlockNumber}; pub(super) enum PeerStateEvent { /// Node has successfully downloaded the specified block. GotBlock(BlockNumber), + /// Block retrieval was canceled due to block getting persisted using other means. + CanceledBlock(BlockNumber), /// Received an invalid block from the peer. GotInvalidBlock { peer_key: node::PublicKey, diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 7ece2520..8b7203fc 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -295,6 +295,9 @@ impl PeerStates { storage.put_block(ctx, &block).await?; } else { tracing::trace!(%block_number, "Getting block canceled"); + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::CanceledBlock(block_number)); + } } Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 6eea24fa..4bc03de3 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -178,6 +178,98 @@ async fn updating_peer_state_with_single_block() { test_peer_states(UpdatingPeerStateWithSingleBlock).await; } +#[derive(Debug)] +struct CancelingBlockRetrieval; + +#[async_trait] +impl Test for CancelingBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent a `get_block` request to the peer + let message = message_receiver.recv(ctx).await?; + assert_matches!( + message, + io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. }) + ); + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + // Retrieval of the block must be canceled. + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1))); + Ok(()) + } +} + +#[tokio::test] +async fn canceling_block_retrieval() { + test_peer_states(CancelingBlockRetrieval).await; +} + +#[derive(Debug)] +struct FilteringBlockRetrieval; + +#[async_trait] +impl Test for FilteringBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent `get_block` request to the peer, but only for block #2. + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(2)); + + assert!(message_receiver.try_recv().is_none()); + Ok(()) + } +} + +#[tokio::test] +async fn filtering_block_retrieval() { + test_peer_states(FilteringBlockRetrieval).await; +} + #[derive(Debug)] struct UpdatingPeerStateWithMultipleBlocks;