Skip to content

Commit

Permalink
all tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Dec 23, 2023
1 parent 12e6277 commit 0f72363
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 345 deletions.
2 changes: 1 addition & 1 deletion node/actors/sync_blocks/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Config {
Ok(Self {
validator_set,
consensus_threshold,
max_concurrent_blocks: 10,
max_concurrent_blocks: 20,
max_concurrent_blocks_per_peer: 5,
sleep_interval_for_get_block: time::Duration::seconds(10),
})
Expand Down
43 changes: 28 additions & 15 deletions node/actors/sync_blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
//!
//! This crate contains an actor implementing block syncing among nodes, which is tied to the gossip
//! network RPCs.
use crate::{
io::{InputMessage, OutputMessage},
};
use crate::io::{InputMessage, OutputMessage};
use std::sync::Arc;
use zksync_concurrency::{ctx, scope, error::Wrap as _};
use zksync_consensus_storage::{BlockStoreState,BlockStore};
use zksync_consensus_utils::pipe::ActorPipe;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_consensus_network::io::{GetBlockError, SyncBlocksRequest};
use zksync_consensus_storage::{BlockStore, BlockStoreState};
use zksync_consensus_utils::pipe::ActorPipe;

mod config;
pub mod io;
Expand All @@ -27,9 +25,9 @@ impl Config {
ctx: &ctx::Ctx,
mut pipe: ActorPipe<InputMessage, OutputMessage>,
storage: Arc<BlockStore>,
) -> anyhow::Result<()> {
) -> anyhow::Result<()> {
let peer_states = PeerStates::new(self, storage.clone(), pipe.send);
let result : ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
let result: ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async { Ok(peer_states.run_block_fetcher(ctx).await?) });
loop {
match pipe.recv.recv(ctx).await? {
Expand All @@ -38,21 +36,36 @@ impl Config {
state,
response,
}) => {
let res = peer_states.update(&peer, BlockStoreState{
first: state.first_stored_block,
last: state.last_stored_block,
});
let res = peer_states.update(
&peer,
BlockStoreState {
first: state.first_stored_block,
last: state.last_stored_block,
},
);
if let Err(err) = res {
tracing::info!(%err, ?peer, "peer_states.update()");
}
response.send(()).ok();
}
InputMessage::Network(SyncBlocksRequest::GetBlock { block_number, response }) => {
response.send(storage.block(ctx,block_number).await.wrap("storage.block()")?.ok_or(GetBlockError::NotSynced)).ok();
InputMessage::Network(SyncBlocksRequest::GetBlock {
block_number,
response,
}) => {
response
.send(
storage
.block(ctx, block_number)
.await
.wrap("storage.block()")?
.ok_or(GetBlockError::NotSynced),
)
.ok();
}
}
}
}).await;
})
.await;

// Since we clearly type cancellation errors, it's easier propagate them up to this entry point,
// rather than catching in the constituent tasks.
Expand Down
103 changes: 66 additions & 37 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@ use anyhow::Context as _;
use std::{collections::HashMap, sync::Arc, sync::Mutex};
use zksync_concurrency::{
ctx::{self, channel},
oneshot, scope,
sync,
oneshot, scope, sync,
};
use zksync_consensus_utils::no_copy::NoCopy;
use zksync_consensus_network::io::{SyncBlocksInputMessage};
use zksync_consensus_network::io::SyncBlocksInputMessage;
use zksync_consensus_roles::{
node,
validator::{BlockNumber, FinalBlock},
};
use zksync_consensus_storage::{BlockStore,BlockStoreState};
use zksync_consensus_storage::{BlockStore, BlockStoreState};
use zksync_consensus_utils::no_copy::NoCopy;

mod events;
#[cfg(test)]
Expand All @@ -33,7 +32,7 @@ pub(crate) struct PeerStates {
config: Config,
storage: Arc<BlockStore>,
message_sender: channel::UnboundedSender<io::OutputMessage>,

peers: Mutex<HashMap<node::PublicKey, PeerState>>,
highest: sync::watch::Sender<BlockNumber>,
events_sender: Option<channel::UnboundedSender<PeerStateEvent>>,
Expand All @@ -57,7 +56,11 @@ impl PeerStates {
}
}

pub(crate) fn update(&self, peer: &node::PublicKey, state: BlockStoreState) -> anyhow::Result<()> {
pub(crate) fn update(
&self,
peer: &node::PublicKey,
state: BlockStoreState,
) -> anyhow::Result<()> {
let last = state.last.header().number;
anyhow::ensure!(state.first.header().number <= state.last.header().number);
state
Expand All @@ -69,10 +72,12 @@ impl PeerStates {
use std::collections::hash_map::Entry;
match peers.entry(peer.clone()) {
Entry::Occupied(mut e) => e.get_mut().state = state,
Entry::Vacant(e) => { e.insert(PeerState {
state,
get_block_semaphore: Arc::new(sync::Semaphore::new(permits)),
}); }
Entry::Vacant(e) => {
e.insert(PeerState {
state,
get_block_semaphore: Arc::new(sync::Semaphore::new(permits)),
});
}
}
self.highest.send_if_modified(|highest| {
if *highest >= last {
Expand Down Expand Up @@ -100,27 +105,30 @@ impl PeerStates {
self.fetch_block(ctx, block_number.into_inner()).await
});
}
}).await
})
.await
}

/// Fetches the block from peers and puts it to storage.
/// Early exits if the block appeared in storage from other source.
async fn fetch_block(&self, ctx: &ctx::Ctx, block_number: BlockNumber) -> ctx::OrCanceled<()> {
scope::run!(ctx, |ctx,s| async {
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
let res = self.fetch_block_from_peers(ctx,block_number).await;
if let Some(events_sender) = &self.events_sender {
events_sender.send(match res {
Ok(()) => PeerStateEvent::GotBlock(block_number),
Err(ctx::Canceled) => PeerStateEvent::CanceledBlock(block_number),
});
if let Err(ctx::Canceled) = self.fetch_block_from_peers(ctx, block_number).await {
if let Some(send) = &self.events_sender {
send.send(PeerStateEvent::CanceledBlock(block_number));
}
}
Ok(())
});
// Observe if the block has appeared in storage.
sync::wait_for(ctx, &mut self.storage.subscribe(), |state| state.next() > block_number).await?;
sync::wait_for(ctx, &mut self.storage.subscribe(), |state| {
state.next() > block_number
})
.await?;
Ok(())
}).await
})
.await
}

/// Fetches the block from peers and puts it to storage.
Expand All @@ -129,25 +137,39 @@ impl PeerStates {
ctx: &ctx::Ctx,
number: BlockNumber,
) -> ctx::OrCanceled<()> {
loop {
let Some((peer, _permit)) = self.try_acquire_peer_permit(number) else {
while ctx.is_active() {
let Some((peer, permit)) = self.try_acquire_peer_permit(number) else {
let sleep_interval = self.config.sleep_interval_for_get_block;
ctx.sleep(sleep_interval).await?;
continue;
};
match self.fetch_block_from_peer(ctx, &peer, number).await {
Ok(block) => return self.storage.queue_block(ctx, block).await,
let res = self.fetch_block_from_peer(ctx, &peer, number).await;
drop(permit);
match res {
Ok(block) => {
if let Some(send) = &self.events_sender {
send.send(PeerStateEvent::GotBlock(number));
}
return self.storage.queue_block(ctx, block).await;
}
Err(ctx::Error::Canceled(_)) => {
tracing::info!(%number, ?peer, "get_block() call canceled");
}
Err(err) => {
tracing::info!(%err, "get_block({peer:?},{number}) failed, dropping peer");
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::RpcFailed { peer_key: peer.clone(), block_number: number });
tracing::info!(%err, %number, ?peer, "get_block() failed");
if let Some(send) = &self.events_sender {
send.send(PeerStateEvent::RpcFailed {
peer_key: peer.clone(),
block_number: number,
});
}
self.drop_peer(&peer);
}
}
}
Err(ctx::Canceled.into())
}

/// Fetches a block from the specified peer.
async fn fetch_block_from_peer(
&self,
Expand All @@ -162,22 +184,28 @@ impl PeerStates {
response,
};
self.message_sender.send(message.into());
let block = response_receiver.recv_or_disconnected(ctx)
let block = response_receiver
.recv_or_disconnected(ctx)
.await?
.context("no response")?
.context("RPC error")?;
.context("RPC error")?;
if block.header().number != number {
return Err(anyhow::anyhow!(
"block does not have requested number (requested: {number}, got: {})",
block.header().number
).into());
)
.into());
}
block.validate(&self.config.validator_set, self.config.consensus_threshold)
block
.validate(&self.config.validator_set, self.config.consensus_threshold)
.context("block.validate()")?;
Ok(block)
}

fn try_acquire_peer_permit(&self, block_number: BlockNumber) -> Option<(node::PublicKey, sync::OwnedSemaphorePermit)> {
fn try_acquire_peer_permit(
&self,
block_number: BlockNumber,
) -> Option<(node::PublicKey, sync::OwnedSemaphorePermit)> {
let peers = self.peers.lock().unwrap();
let mut peers_with_no_permits = vec![];
let eligible_peers_info = peers.iter().filter(|(peer_key, state)| {
Expand Down Expand Up @@ -206,7 +234,6 @@ impl PeerStates {
Some((peer_key.clone(), permit))
} else {
tracing::debug!(
%block_number,
?peers_with_no_permits,
"No peers to query block #{block_number}"
);
Expand All @@ -216,8 +243,10 @@ impl PeerStates {

/// Drops peer state.
fn drop_peer(&self, peer: &node::PublicKey) {
if self.peers.lock().unwrap().remove(peer).is_none() { return }
tracing::trace!(?peer, "Dropping peer state");
if self.peers.lock().unwrap().remove(peer).is_none() {
return;
}
tracing::debug!(?peer, "Dropping peer state");
if let Some(events_sender) = &self.events_sender {
events_sender.send(PeerStateEvent::PeerDropped(peer.clone()));
}
Expand Down
Loading

0 comments on commit 0f72363

Please sign in to comment.