diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 1e2d5da6..7ece2520 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -42,12 +42,15 @@ impl PeerStatesHandle { } } +type PendingBlocks = HashMap>; + /// View of peers (or more precisely, connections with peers) w.r.t. block syncing. #[derive(Debug)] pub(crate) struct PeerStates { updates_receiver: Option>, events_sender: Option>, peers: Mutex>, + pending_blocks: Mutex, message_sender: channel::UnboundedSender, storage: Arc, config: Config, @@ -65,6 +68,7 @@ impl PeerStates { updates_receiver: Some(updates_receiver), events_sender: None, peers: Mutex::default(), + pending_blocks: Mutex::default(), message_sender, storage, config, @@ -81,6 +85,7 @@ impl PeerStates { pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { let updates_receiver = self.updates_receiver.take().unwrap(); let storage = self.storage.as_ref(); + let blocks_subscriber = storage.subscribe_to_block_writes(); let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks); let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0)); @@ -93,6 +98,7 @@ impl PeerStates { new_blocks_sender.send_replace(last_block_number); s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender)); + s.spawn_bg(self.cancel_received_block_tasks(ctx, blocks_subscriber)); for block_number in missing_blocks { let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?; @@ -101,14 +107,27 @@ impl PeerStates { loop { let new_last_block_number = *sync::changed(ctx, &mut new_blocks_subscriber).await?; - let new_block_numbers = - (last_block_number.next().0..=new_last_block_number.0).map(BlockNumber); + let new_block_numbers = last_block_number.next()..new_last_block_number.next(); + if new_block_numbers.is_empty() { + continue; + } tracing::trace!( ?new_block_numbers, + "Filtering block numbers as per storage availability" + ); + + let missing_blocks = storage + .missing_block_numbers(ctx, new_block_numbers) + .await?; + if missing_blocks.is_empty() { + continue; + } + tracing::trace!( + ?missing_blocks, "Enqueuing requests for getting blocks from peers" ); - for block_number in new_block_numbers { + for block_number in missing_blocks { let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?; s.spawn(self.get_and_save_block(ctx, block_number, get_block_permit, storage)); } @@ -139,6 +158,31 @@ impl PeerStates { } } + /// Cancels pending block retrieval for blocks that appear in the storage using other means + /// (e.g., thanks to the consensus algorithm). This works at best-effort basis; it's not guaranteed + /// that this method will timely cancel all block retrievals. + #[instrument(level = "trace", skip_all, err)] + async fn cancel_received_block_tasks( + &self, + ctx: &ctx::Ctx, + mut subscriber: watch::Receiver, + ) -> StorageResult<()> { + loop { + let block_number = *sync::changed(ctx, &mut subscriber).await?; + if sync::lock(ctx, &self.pending_blocks) + .await? + .remove(&block_number) + .is_some() + { + tracing::trace!( + %block_number, + "Block persisted using other means; canceling its retrieval" + ); + // Retrieval is canceled by dropping the corresponding `oneshot::Sender`. + } + } + } + /// Returns the last trusted block number stored by the peer. #[instrument( level = "trace", @@ -221,13 +265,37 @@ impl PeerStates { get_block_permit: sync::SemaphorePermit<'_>, storage: &dyn WriteBlockStore, ) -> StorageResult<()> { - let block = self.get_block(ctx, block_number).await?; + let (stop_sender, stop_receiver) = oneshot::channel(); + sync::lock(ctx, &self.pending_blocks) + .await? + .insert(block_number, stop_sender); + + let block_result = scope::run!(ctx, |ctx, s| async { + s.spawn_bg(async { + // Cancel the scope in either of these events: + // - The parent scope is canceled. + // - The `stop_sender` is dropped. + stop_receiver.recv_or_disconnected(ctx).await.ok(); + s.cancel(); + Ok(()) + }); + self.get_block(ctx, block_number).await + }) + .await; + drop(get_block_permit); + sync::lock(ctx, &self.pending_blocks) + .await? + .remove(&block_number); - if let Some(events_sender) = &self.events_sender { - events_sender.send(PeerStateEvent::GotBlock(block_number)); + if let Ok(block) = block_result { + if let Some(events_sender) = &self.events_sender { + events_sender.send(PeerStateEvent::GotBlock(block_number)); + } + storage.put_block(ctx, &block).await?; + } else { + tracing::trace!(%block_number, "Getting block canceled"); } - storage.put_block(ctx, &block).await?; Ok(()) }