Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix extra GetBlock requests for validators #27

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node/actors/sync_blocks/src/peers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use zksync_consensus_roles::{node, validator::BlockNumber};
pub(super) enum PeerStateEvent {
/// Node has successfully downloaded the specified block.
GotBlock(BlockNumber),
/// Block retrieval was canceled due to block getting persisted using other means.
CanceledBlock(BlockNumber),
/// Received an invalid block from the peer.
GotInvalidBlock {
peer_key: node::PublicKey,
Expand Down
85 changes: 78 additions & 7 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ impl PeerStatesHandle {
}
}

type PendingBlocks = HashMap<BlockNumber, oneshot::Sender<()>>;

/// View of peers (or more precisely, connections with peers) w.r.t. block syncing.
#[derive(Debug)]
pub(crate) struct PeerStates {
updates_receiver: Option<channel::UnboundedReceiver<PeerStateUpdate>>,
events_sender: Option<channel::UnboundedSender<PeerStateEvent>>,
peers: Mutex<HashMap<node::PublicKey, PeerState>>,
pending_blocks: Mutex<PendingBlocks>,
message_sender: channel::UnboundedSender<io::OutputMessage>,
storage: Arc<dyn WriteBlockStore>,
config: Config,
Expand All @@ -65,6 +68,7 @@ impl PeerStates {
updates_receiver: Some(updates_receiver),
events_sender: None,
peers: Mutex::default(),
pending_blocks: Mutex::default(),
message_sender,
storage,
config,
Expand All @@ -81,6 +85,7 @@ impl PeerStates {
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> {
let updates_receiver = self.updates_receiver.take().unwrap();
let storage = self.storage.as_ref();
let blocks_subscriber = storage.subscribe_to_block_writes();
let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks);
let (new_blocks_sender, mut new_blocks_subscriber) = watch::channel(BlockNumber(0));

Expand All @@ -93,6 +98,7 @@ impl PeerStates {
new_blocks_sender.send_replace(last_block_number);

s.spawn_bg(self.run_updates(ctx, updates_receiver, new_blocks_sender));
s.spawn_bg(self.cancel_received_block_tasks(ctx, blocks_subscriber));

for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
Expand All @@ -101,14 +107,27 @@ impl PeerStates {

loop {
let new_last_block_number = *sync::changed(ctx, &mut new_blocks_subscriber).await?;
let new_block_numbers =
(last_block_number.next().0..=new_last_block_number.0).map(BlockNumber);
let new_block_numbers = last_block_number.next()..new_last_block_number.next();
if new_block_numbers.is_empty() {
continue;
}
tracing::trace!(
?new_block_numbers,
"Filtering block numbers as per storage availability"
);

let missing_blocks = storage
.missing_block_numbers(ctx, new_block_numbers)
.await?;
if missing_blocks.is_empty() {
continue;
}
tracing::trace!(
?missing_blocks,
"Enqueuing requests for getting blocks from peers"
);

for block_number in new_block_numbers {
for block_number in missing_blocks {
let get_block_permit = sync::acquire(ctx, &get_block_semaphore).await?;
s.spawn(self.get_and_save_block(ctx, block_number, get_block_permit, storage));
}
Expand Down Expand Up @@ -139,6 +158,31 @@ impl PeerStates {
}
}

/// Cancels pending block retrieval for blocks that appear in the storage using other means
/// (e.g., thanks to the consensus algorithm). This works at best-effort basis; it's not guaranteed
/// that this method will timely cancel all block retrievals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to compromise here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the most general approach I've come up with that doesn't make many assumptions as to what could update blocks externally and how these updates would look (e.g., whether it's consensus or some other actor). I guess with the knowledge of consensus we could do marginally better by cancelling not only the latest block produced by consensus, but all the preceding blocks as well; is this what you had in mind? If you have any suggestions, I'm happy to listen.

Copy link
Contributor

@pompon0 pompon0 Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean you are using watch to signal the received block from external source, which naturally will lead to NOT cancelling some blocks fetching tasks if multiple blocks are received from an external source at the same time. Also AFAICT there is a race condition between spawning a fetching routine for a block and receiving this block from an external source.

#[instrument(level = "trace", skip_all, err)]
async fn cancel_received_block_tasks(
&self,
ctx: &ctx::Ctx,
mut subscriber: watch::Receiver<BlockNumber>,
) -> StorageResult<()> {
loop {
let block_number = *sync::changed(ctx, &mut subscriber).await?;
if sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number)
.is_some()
{
tracing::trace!(
%block_number,
"Block persisted using other means; canceling its retrieval"
);
// Retrieval is canceled by dropping the corresponding `oneshot::Sender`.
}
}
}

/// Returns the last trusted block number stored by the peer.
#[instrument(
level = "trace",
Expand Down Expand Up @@ -221,13 +265,40 @@ impl PeerStates {
get_block_permit: sync::SemaphorePermit<'_>,
storage: &dyn WriteBlockStore,
) -> StorageResult<()> {
let block = self.get_block(ctx, block_number).await?;
let (stop_sender, stop_receiver) = oneshot::channel();
sync::lock(ctx, &self.pending_blocks)
.await?
.insert(block_number, stop_sender);

let block_result = scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
// Cancel the scope in either of these events:
// - The parent scope is canceled.
// - The `stop_sender` is dropped.
stop_receiver.recv_or_disconnected(ctx).await.ok();
s.cancel();
Ok(())
});
self.get_block(ctx, block_number).await
})
.await;

drop(get_block_permit);
sync::lock(ctx, &self.pending_blocks)
.await?
.remove(&block_number);

if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
if let Ok(block) = block_result {
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::GotBlock(block_number));
}
storage.put_block(ctx, &block).await?;
} else {
tracing::trace!(%block_number, "Getting block canceled");
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::CanceledBlock(block_number));
}
}
storage.put_block(ctx, &block).await?;
Ok(())
}

Expand Down
92 changes: 92 additions & 0 deletions node/actors/sync_blocks/src/peers/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,98 @@ async fn updating_peer_state_with_single_block() {
test_peer_states(UpdatingPeerStateWithSingleBlock).await;
}

#[derive(Debug)]
struct CancelingBlockRetrieval;

#[async_trait]
impl Test for CancelingBlockRetrieval {
const BLOCK_COUNT: usize = 5;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
mut rng,
test_validators,
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
..
} = handles;

let peer_key = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(1));
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key);

// Check that the actor has sent a `get_block` request to the peer
let message = message_receiver.recv(ctx).await?;
assert_matches!(
message,
io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { .. })
);

// Emulate receiving block using external means.
storage
.put_block(ctx, &test_validators.final_blocks[1])
.await?;
// Retrieval of the block must be canceled.
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::CanceledBlock(BlockNumber(1)));
Ok(())
}
}

#[tokio::test]
async fn canceling_block_retrieval() {
test_peer_states(CancelingBlockRetrieval).await;
}

#[derive(Debug)]
struct FilteringBlockRetrieval;

#[async_trait]
impl Test for FilteringBlockRetrieval {
const BLOCK_COUNT: usize = 5;

async fn test(self, ctx: &ctx::Ctx, handles: TestHandles) -> anyhow::Result<()> {
let TestHandles {
mut rng,
test_validators,
peer_states_handle,
storage,
mut message_receiver,
mut events_receiver,
..
} = handles;

// Emulate receiving block using external means.
storage
.put_block(ctx, &test_validators.final_blocks[1])
.await?;

let peer_key = rng.gen::<node::SecretKey>().public();
peer_states_handle.update(peer_key.clone(), test_validators.sync_state(2));
let peer_event = events_receiver.recv(ctx).await?;
assert_matches!(peer_event, PeerStateEvent::PeerUpdated(key) if key == peer_key);

// Check that the actor has sent `get_block` request to the peer, but only for block #2.
let message = message_receiver.recv(ctx).await?;
let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient, number, ..
}) = message;
assert_eq!(recipient, peer_key);
assert_eq!(number, BlockNumber(2));

assert!(message_receiver.try_recv().is_none());
Ok(())
}
}

#[tokio::test]
async fn filtering_block_retrieval() {
test_peer_states(FilteringBlockRetrieval).await;
}

#[derive(Debug)]
struct UpdatingPeerStateWithMultipleBlocks;

Expand Down
Loading