Skip to content

Commit

Permalink
Merge branch 'main' into aov-bft-373-use-packagelinks-trick-for-proto…
Browse files Browse the repository at this point in the history
…buf-compilation
  • Loading branch information
slowli authored Nov 10, 2023
2 parents 72a3a10 + 04c9639 commit cd08b9c
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 7 deletions.
2 changes: 2 additions & 0 deletions node/actors/sync_blocks/src/peers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use zksync_consensus_roles::{node, validator::BlockNumber};
pub(super) enum PeerStateEvent {
/// Node has successfully downloaded the specified block.
GotBlock(BlockNumber),
/// Block retrieval was canceled due to block getting persisted using other means.
CanceledBlock(BlockNumber),
/// Received an invalid block from the peer.
GotInvalidBlock {
peer_key: node::PublicKey,
Expand Down
85 changes: 78 additions & 7 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ impl PeerStatesHandle {
}
}

type PendingBlocks = HashMap<BlockNumber, oneshot::Sender<()>>;

/// View of peers (or more precisely, connections with peers) w.r.t. block syncing.
#[derive(Debug)]
pub(crate) struct PeerStates {
updates_receiver: Option<channel::UnboundedReceiver<PeerStateUpdate>>,
events_sender: Option<channel::UnboundedSender<PeerStateEvent>>,
peers: Mutex<HashMap<node::PublicKey, PeerState>>,
pending_blocks: Mutex<PendingBlocks>,
message_sender: channel::UnboundedSender<io::OutputMessage>,
storage: Arc<dyn WriteBlockStore>,
config: Config,
Expand All @@ -65,6 +68,7 @@ impl PeerStates {
updates_receiver: Some(updates_receiver),
events_sender: None,
peers: Mutex::default(),
pending_blocks: Mutex::default(),
message_sender,
storage,
config,
Expand All @@ -81,6 +85,7 @@ impl PeerStates {
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> {
let updates_receiver = self.updates_receiver.take().unwrap();
let storage = self.storage.as_ref();
let blocks_subscriber = storage.subscribe_to_block_writes();
let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks);
let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0));

Expand All @@ -93,6 +98,7 @@ impl PeerStates {
new_blocks_sender.send_replace(last_block_number);

s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender));
s.spawn_bg(self.cancel_received_block_tasks(ctx, blocks_subscriber));

for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
Expand All @@ -101,14 +107,27 @@ impl PeerStates {

loop {
let new_last_block_number = *sync::changed(ctx, &mut new_blocks_subscriber).await?;
let new_block_numbers =
(last_block_number.next().0..=new_last_block_number.0).map(BlockNumber);
let new_block_numbers = last_block_number.next()..new_last_block_number.next();
if new_block_numbers.is_empty() {
continue;
}
tracing::trace!(
?new_block_numbers,
"Filtering block numbers as per storage availability"
);

let missing_blocks = storage
.missing_block_numbers(ctx, new_block_numbers)
.await?;
if missing_blocks.is_empty() {
continue;
}
tracing::trace!(
?missing_blocks,
"Enqueuing requests for getting blocks from peers"
);

for block_number in new_block_numbers {
for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
s.spawn(self.get_and_save_block(ctx, block_number, get_block_permit, storage));
}
Expand Down Expand Up @@ -139,6 +158,31 @@ impl PeerStates {
}
}

/// Cancels pending block retrieval for blocks that appear in the storage using other means
/// (e.g., thanks to the consensus algorithm). This works at best-effort basis; it's not guaranteed
/// that this method will timely cancel all block retrievals.
#[instrument(level = "trace", skip_all, err)]
async fn cancel_received_block_tasks(
&self,
ctx: &ctx::Ctx,
mut subscriber: watch::Receiver<BlockNumber>,
) -> StorageResult<()> {
loop {
let block_number = *sync::changed(ctx, &mut subscriber).await?;
if sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number)
.is_some()
{
tracing::trace!(
%block_number,
"Block persisted using other means; canceling its retrieval"
);
// Retrieval is canceled by dropping the corresponding `oneshot::Sender`.
}
}
}

/// Returns the last trusted block number stored by the peer.
#[instrument(
level = "trace",
Expand Down Expand Up @@ -221,13 +265,40 @@ impl PeerStates {
get_block_permit: sync::SemaphorePermit<'_>,
storage: &dyn WriteBlockStore,
) -> StorageResult<()> {
let block = self.get_block(ctx, block_number).await?;
let (stop_sender, stop_receiver) = oneshot::channel();
sync::lock(ctx, &self.pending_blocks)
.await?
.insert(block_number, stop_sender);

let block_result = scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
// Cancel the scope in either of these events:
// - The parent scope is canceled.
// - The `stop_sender` is dropped.
stop_receiver.recv_or_disconnected(ctx).await.ok();
s.cancel();
Ok(())
});
self.get_block(ctx, block_number).await
})
.await;

drop(get_block_permit);
sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number);

if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
if let Ok(block) = block_result {
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
}
storage.put_block(ctx, &block).await?;
} else {
tracing::trace!(%block_number, "Getting block canceled");
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::CanceledBlock(block_number));
}
}
storage.put_block(ctx, &block).await?;
Ok(())
}

Expand Down
92 changes: 92 additions & 0 deletions node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,98 @@ async fn updating_peer_state_with_single_block() {
test_peer_states(UpdatingPeerStateWithSingleBlock).await;
}

#[derive(Debug)]
struct CancelingBlockRetrieval;

#[async_trait]
impl Test for CancelingBlockRetrieval {
const BLOCK_COUNT: usize = 5;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
mut rng,
test_validators,
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
..
} = handles;

let peer_key = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1));
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key);

// Check that the actor has sent a `get_block` request to the peer
let message = message_receiver.recv(ctx).await?;
assert_matches!(
message,
io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. })
);

// Emulate receiving block using external means.
storage
.put_block(ctx, &test_validators.final_blocks[1])
.await?;
// Retrieval of the block must be canceled.
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1)));
Ok(())
}
}

#[tokio::test]
async fn canceling_block_retrieval() {
test_peer_states(CancelingBlockRetrieval).await;
}

#[derive(Debug)]
struct FilteringBlockRetrieval;

#[async_trait]
impl Test for FilteringBlockRetrieval {
const BLOCK_COUNT: usize = 5;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
mut rng,
test_validators,
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
..
} = handles;

// Emulate receiving block using external means.
storage
.put_block(ctx, &test_validators.final_blocks[1])
.await?;

let peer_key = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2));
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key);

// Check that the actor has sent `get_block` request to the peer, but only for block #2.
let message = message_receiver.recv(ctx).await?;
let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient, number, ..
}) = message;
assert_eq!(recipient, peer_key);
assert_eq!(number, BlockNumber(2));

assert!(message_receiver.try_recv().is_none());
Ok(())
}
}

#[tokio::test]
async fn filtering_block_retrieval() {
test_peer_states(FilteringBlockRetrieval).await;
}

#[derive(Debug)]
struct UpdatingPeerStateWithMultipleBlocks;

Expand Down

0 comments on commit cd08b9c

Please sign in to comment.