Skip to content

Commit

Permalink
Sketch cancellation / filtering of GetBlock requests
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Nov 7, 2023
1 parent aaa1093 commit 6b568de
Showing 1 changed file with 75 additions and 7 deletions.
82 changes: 75 additions & 7 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ impl PeerStatesHandle {
}
}

type PendingBlocks = HashMap<BlockNumber, oneshot::Sender<()>>;

/// View of peers (or more precisely, connections with peers) w.r.t. block syncing.
#[derive(Debug)]
pub(crate) struct PeerStates {
updates_receiver: Option<channel::UnboundedReceiver<PeerStateUpdate>>,
events_sender: Option<channel::UnboundedSender<PeerStateEvent>>,
peers: Mutex<HashMap<node::PublicKey, PeerState>>,
pending_blocks: Mutex<PendingBlocks>,
message_sender: channel::UnboundedSender<io::OutputMessage>,
storage: Arc<dyn WriteBlockStore>,
config: Config,
Expand All @@ -65,6 +68,7 @@ impl PeerStates {
updates_receiver: Some(updates_receiver),
events_sender: None,
peers: Mutex::default(),
pending_blocks: Mutex::default(),
message_sender,
storage,
config,
Expand All @@ -81,6 +85,7 @@ impl PeerStates {
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> {
let updates_receiver = self.updates_receiver.take().unwrap();
let storage = self.storage.as_ref();
let blocks_subscriber = storage.subscribe_to_block_writes();
let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks);
let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0));

Expand All @@ -93,6 +98,7 @@ impl PeerStates {
new_blocks_sender.send_replace(last_block_number);

s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender));
s.spawn_bg(self.cancel_received_block_tasks(ctx, blocks_subscriber));

for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
Expand All @@ -101,14 +107,27 @@ impl PeerStates {

loop {
let new_last_block_number = *sync::changed(ctx, &mut new_blocks_subscriber).await?;
let new_block_numbers =
(last_block_number.next().0..=new_last_block_number.0).map(BlockNumber);
let new_block_numbers = last_block_number.next()..new_last_block_number.next();
if new_block_numbers.is_empty() {
continue;
}
tracing::trace!(
?new_block_numbers,
"Filtering block numbers as per storage availability"
);

let missing_blocks = storage
.missing_block_numbers(ctx, new_block_numbers)
.await?;
if missing_blocks.is_empty() {
continue;
}
tracing::trace!(
?missing_blocks,
"Enqueuing requests for getting blocks from peers"
);

for block_number in new_block_numbers {
for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
s.spawn(self.get_and_save_block(ctx, block_number, get_block_permit, storage));
}
Expand Down Expand Up @@ -139,6 +158,31 @@ impl PeerStates {
}
}

/// Cancels pending block retrieval for blocks that appear in the storage using other means
/// (e.g., thanks to the consensus algorithm). This works at best-effort basis; it's not guaranteed
/// that this method will timely cancel all block retrievals.
#[instrument(level = "trace", skip_all, err)]
async fn cancel_received_block_tasks(
&self,
ctx: &ctx::Ctx,
mut subscriber: watch::Receiver<BlockNumber>,
) -> StorageResult<()> {
loop {
let block_number = *sync::changed(ctx, &mut subscriber).await?;
if sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number)
.is_some()
{
tracing::trace!(
%block_number,
"Block persisted using other means; canceling its retrieval"
);
// Retrieval is canceled by dropping the corresponding `oneshot::Sender`.
}
}
}

/// Returns the last trusted block number stored by the peer.
#[instrument(
level = "trace",
Expand Down Expand Up @@ -221,13 +265,37 @@ impl PeerStates {
get_block_permit: sync::SemaphorePermit<'_>,
storage: &dyn WriteBlockStore,
) -> StorageResult<()> {
let block = self.get_block(ctx, block_number).await?;
let (stop_sender, stop_receiver) = oneshot::channel();
sync::lock(ctx, &self.pending_blocks)
.await?
.insert(block_number, stop_sender);

let block_result = scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
// Cancel the scope in either of these events:
// - The parent scope is canceled.
// - The `stop_sender` is dropped.
stop_receiver.recv_or_disconnected(ctx).await.ok();
s.cancel();
Ok(())
});
self.get_block(ctx, block_number).await
})
.await;

drop(get_block_permit);
sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number);

if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
if let Ok(block) = block_result {
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
}
storage.put_block(ctx, &block).await?;
} else {
tracing::trace!(%block_number, "Getting block canceled");
}
storage.put_block(ctx, &block).await?;
Ok(())
}

Expand Down

0 comments on commit 6b568de

Please sign in to comment.