From 04c9639a15868ed659d47e43dde9d332a865fc7c Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 10 Nov 2023 11:11:29 +0200 Subject: [PATCH] fix: Fix extra `GetBlock` requests for validators (#27) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ If the `SyncBlocks` actor runs on a validator node, the node previously produced unnecessary `GetBlock` requests to peers. This is because `SyncBlocks` logic assumed that new blocks are only added within the actor. This PR lifts this assumption; it filters `GetBlock` requests and cancels pending requests reacting to new blocks being put in the storage. ## Why ❔ Extra `GetBlock` requests waste traffic and processing time of peers. --- node/actors/sync_blocks/src/peers/events.rs | 2 + node/actors/sync_blocks/src/peers/mod.rs | 85 +++++++++++++++++-- node/actors/sync_blocks/src/peers/tests.rs | 92 +++++++++++++++++++++ 3 files changed, 172 insertions(+), 7 deletions(-) diff --git a/node/actors/sync_blocks/src/peers/events.rs b/node/actors/sync_blocks/src/peers/events.rs index 186099be..0a4409ff 100644 --- a/node/actors/sync_blocks/src/peers/events.rs +++ b/node/actors/sync_blocks/src/peers/events.rs @@ -8,6 +8,8 @@ use zksync_consensus_roles::{node, validator::BlockNumber}; pub(super) enum PeerStateEvent { /// Node has successfully downloaded the specified block. GotBlock(BlockNumber), + /// Block retrieval was canceled due to block getting persisted using other means. + CanceledBlock(BlockNumber), /// Received an invalid block from the peer. GotInvalidBlock { peer_key: node::PublicKey, diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 43d98246..7639db94 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -42,12 +42,15 @@ impl PeerStatesHandle { } } +type PendingBlocks = HashMap>; + /// View of peers (or more precisely, connections with peers) w.r.t. block syncing. #[derive(Debug)] pub(crate) struct PeerStates { updates_receiver: Option>, events_sender: Option>, peers: Mutex>, + pending_blocks: Mutex, message_sender: channel::UnboundedSender, storage: Arc, config: Config, @@ -65,6 +68,7 @@ impl PeerStates { updates_receiver: Some(updates_receiver), events_sender: None, peers: Mutex::default(), + pending_blocks: Mutex::default(), message_sender, storage, config, @@ -81,6 +85,7 @@ impl PeerStates { pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { let updates_receiver = self.updates_receiver.take().unwrap(); let storage = self.storage.as_ref(); + let blocks_subscriber = storage.subscribe_to_block_writes(); let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks); let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0)); @@ -93,6 +98,7 @@ impl PeerStates { new_blocks_sender.send_replace(last_block_number); s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender)); + s.spawn_bg(self.cancel_received_block_tasks(ctx, blocks_subscriber)); for block_number in missing_blocks { let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?; @@ -101,14 +107,27 @@ impl PeerStates { loop { let new_last_block_number = *sync::changed(ctx, &mut new_blocks_subscriber).await?; - let new_block_numbers = - (last_block_number.next().0..=new_last_block_number.0).map(BlockNumber); + let new_block_numbers = last_block_number.next()..new_last_block_number.next(); + if new_block_numbers.is_empty() { + continue; + } tracing::trace!( ?new_block_numbers, + "Filtering block numbers as per storage availability" + ); + + let missing_blocks = storage + .missing_block_numbers(ctx, new_block_numbers) + .await?; + if missing_blocks.is_empty() { + continue; + } + tracing::trace!( + ?missing_blocks, "Enqueuing requests for getting blocks from peers" ); - for block_number in new_block_numbers { + for block_number in missing_blocks { let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?; s.spawn(self.get_and_save_block(ctx, block_number, get_block_permit, storage)); } @@ -139,6 +158,31 @@ impl PeerStates { } } + /// Cancels pending block retrieval for blocks that appear in the storage using other means + /// (e.g., thanks to the consensus algorithm). This works at best-effort basis; it's not guaranteed + /// that this method will timely cancel all block retrievals. + #[instrument(level = "trace", skip_all, err)] + async fn cancel_received_block_tasks( + &self, + ctx: &ctx::Ctx, + mut subscriber: watch::Receiver, + ) -> StorageResult<()> { + loop { + let block_number = *sync::changed(ctx, &mut subscriber).await?; + if sync::lock(ctx, &self.pending_blocks) + .await? + .remove(&block_number) + .is_some() + { + tracing::trace!( + %block_number, + "Block persisted using other means; canceling its retrieval" + ); + // Retrieval is canceled by dropping the corresponding `oneshot::Sender`. + } + } + } + /// Returns the last trusted block number stored by the peer. #[instrument( level = "trace", @@ -221,13 +265,40 @@ impl PeerStates { get_block_permit: sync::SemaphorePermit<'_>, storage: &dyn WriteBlockStore, ) -> StorageResult<()> { - let block = self.get_block(ctx, block_number).await?; + let (stop_sender, stop_receiver) = oneshot::channel(); + sync::lock(ctx, &self.pending_blocks) + .await? + .insert(block_number, stop_sender); + + let block_result = scope::run!(ctx, |ctx, s| async { + s.spawn_bg(async { + // Cancel the scope in either of these events: + // - The parent scope is canceled. + // - The `stop_sender` is dropped. + stop_receiver.recv_or_disconnected(ctx).await.ok(); + s.cancel(); + Ok(()) + }); + self.get_block(ctx, block_number).await + }) + .await; + drop(get_block_permit); + sync::lock(ctx, &self.pending_blocks) + .await? + .remove(&block_number); - if let Some(events_sender) = &self.events_sender { - events_sender.send(PeerStateEvent::GotBlock(block_number)); + if let Ok(block) = block_result { + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::GotBlock(block_number)); + } + storage.put_block(ctx, &block).await?; + } else { + tracing::trace!(%block_number, "Getting block canceled"); + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::CanceledBlock(block_number)); + } } - storage.put_block(ctx, &block).await?; Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 74ac69db..f4fe1e15 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -178,6 +178,98 @@ async fn updating_peer_state_with_single_block() { test_peer_states(UpdatingPeerStateWithSingleBlock).await; } +#[derive(Debug)] +struct CancelingBlockRetrieval; + +#[async_trait] +impl Test for CancelingBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent a `get_block` request to the peer + let message = message_receiver.recv(ctx).await?; + assert_matches!( + message, + io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. }) + ); + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + // Retrieval of the block must be canceled. + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1))); + Ok(()) + } +} + +#[tokio::test] +async fn canceling_block_retrieval() { + test_peer_states(CancelingBlockRetrieval).await; +} + +#[derive(Debug)] +struct FilteringBlockRetrieval; + +#[async_trait] +impl Test for FilteringBlockRetrieval { + const BLOCK_COUNT: usize = 5; + + async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> { + let TestHandles { + mut rng, + test_validators, + peer_states_handle, + storage, + mut message_receiver, + mut events_receiver, + .. + } = handles; + + // Emulate receiving block using external means. + storage + .put_block(ctx, &test_validators.final_blocks[1]) + .await?; + + let peer_key = rng.gen::().public(); + peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2)); + let peer_event = events_receiver.recv(ctx).await?; + assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key); + + // Check that the actor has sent `get_block` request to the peer, but only for block #2. + let message = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, number, .. + }) = message; + assert_eq!(recipient, peer_key); + assert_eq!(number, BlockNumber(2)); + + assert!(message_receiver.try_recv().is_none()); + Ok(()) + } +} + +#[tokio::test] +async fn filtering_block_retrieval() { + test_peer_states(FilteringBlockRetrieval).await; +} + #[derive(Debug)] struct UpdatingPeerStateWithMultipleBlocks;