Skip to content

Commit

Permalink
applied comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Jan 4, 2024
1 parent f4d530b commit 10b3595
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 31 deletions.
37 changes: 22 additions & 15 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ impl PeerStates {
state: BlockStoreState,
) -> anyhow::Result<()> {
use std::collections::hash_map::Entry;

let last = state.last.header().number;
anyhow::ensure!(state.first.header().number <= state.last.header().number);
state
.last
.verify(&self.config.validator_set, self.config.consensus_threshold)
.context("state.last.verify()")?;
let mut peers = self.peers.lock().unwrap();
let mut peers = self.peers.lock().unwrap();
match peers.entry(peer.clone()) {
Entry::Occupied(mut e) => e.get_mut().state = state,
Entry::Vacant(e) => {
Expand All @@ -86,13 +86,14 @@ impl PeerStates {
});
}
}
self.highest_peer_block.send_if_modified(|highest_peer_block| {
if *highest_peer_block >= last {
return false;
}
*highest_peer_block = last;
true
});
self.highest_peer_block
.send_if_modified(|highest_peer_block| {
if *highest_peer_block >= last {
return false;
}
*highest_peer_block = last;
true
});
Ok(())
}

Expand All @@ -103,7 +104,10 @@ impl PeerStates {
let mut next = self.storage.subscribe().borrow().next();
let mut highest_peer_block = self.highest_peer_block.subscribe();
loop {
sync::wait_for(ctx, &mut highest_peer_block, |highest_peer_block| highest_peer_block >= &next).await?;
sync::wait_for(ctx, &mut highest_peer_block, |highest_peer_block| {
highest_peer_block >= &next
})
.await?;
let permit = sync::acquire(ctx, &sem).await?;
let block_number = NoCopy::from(next);
next = next.next();
Expand All @@ -119,10 +123,12 @@ impl PeerStates {
/// Fetches the block from peers and puts it to storage.
/// Early exits if the block appeared in storage from other source.
async fn fetch_block(&self, ctx: &ctx::Ctx, block_number: BlockNumber) -> ctx::OrCanceled<()> {
scope::run!(ctx, |ctx,s| async {
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(async {
match self.fetch_block_from_peers(ctx, block_number).await {
Ok(block) => { let _ = self.storage.store_block(ctx,block).await; }
Ok(block) => {
let _ = self.storage.store_block(ctx, block).await;
}
Err(ctx::Canceled) => {
if let Some(send) = &self.events_sender {
send.send(PeerStateEvent::CanceledBlock(block_number));
Expand All @@ -132,9 +138,10 @@ impl PeerStates {
Ok(())
});
// Cancel fetching as soon as block is queued for storage.
self.storage.wait_until_queued(ctx,block_number).await
}).await?;
self.storage.wait_until_stored(ctx,block_number).await
self.storage.wait_until_queued(ctx, block_number).await
})
.await?;
self.storage.wait_until_stored(ctx, block_number).await
}

/// Fetches the block from peers.
Expand Down
12 changes: 7 additions & 5 deletions node/actors/sync_blocks/src/peers/tests/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Test for CancelingBlockRetrieval {
.unwrap();

// Check that the actor has sent a `get_block` request to the peer
let io::OutputMessage::Network(..) = message_receiver.recv(ctx).await?;
let io::OutputMessage::Network(..) = message_receiver.recv(ctx).await?;

// Emulate receiving block using external means.
storage
Expand Down Expand Up @@ -257,11 +257,13 @@ impl Test for DisconnectingPeer {
// Drop the response sender emulating peer disconnect.
let msg = message_receiver.recv(ctx).await?;
{
let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock{
recipient, number, ..
let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock {
recipient,
number,
..
}) = &msg;
assert_eq!(recipient,&peer_key);
assert_eq!(number,&BlockNumber(1));
assert_eq!(recipient, &peer_key);
assert_eq!(number, &BlockNumber(1));
}
drop(msg);

Expand Down
2 changes: 1 addition & 1 deletion node/actors/sync_blocks/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async fn getting_blocks() {
let protocol_version = validator::ProtocolVersion::EARLIEST;
let genesis_block = make_genesis_block(rng, protocol_version);

let (storage, runner) = make_store(ctx, genesis_block.clone()).await;
let (storage, runner) = make_store(ctx, genesis_block.clone()).await;
let (actor_pipe, dispatcher_pipe) = pipe::new();

let cfg: Config = rng.gen();
Expand Down
4 changes: 2 additions & 2 deletions node/libs/storage/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl BlockStore {
/// Constructs a BlockStore.
/// BlockStore takes ownership of the passed PersistentBlockStore,
/// i.e. caller should modify the underlying persistent storage
/// (add/remove blocks) ONLY through the constructed BlockStore.
/// ONLY through the constructed BlockStore.
pub async fn new(
ctx: &ctx::Ctx,
persistent: Box<dyn PersistentBlockStore>,
Expand Down Expand Up @@ -187,7 +187,7 @@ impl BlockStore {
inner.queue.push_back(block);
true
});
self.wait_until_stored(ctx,number).await
self.wait_until_stored(ctx, number).await
}

/// Waits until the given block is queued to be stored.
Expand Down
21 changes: 13 additions & 8 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! In-memory storage implementation.
use crate::{BlockStoreState, PersistentBlockStore, ReplicaState};
use std::{collections::BTreeMap, sync::Mutex};
use std::{collections::VecDeque, sync::Mutex};
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;

/// In-memory block store.
#[derive(Debug, Default)]
pub struct BlockStore(Mutex<BTreeMap<validator::BlockNumber, validator::FinalBlock>>);
pub struct BlockStore(Mutex<VecDeque<validator::FinalBlock>>);

/// In-memory replica store.
#[derive(Debug, Default)]
Expand All @@ -15,7 +15,7 @@ pub struct ReplicaStore(Mutex<Option<ReplicaState>>);
impl BlockStore {
/// Creates a new store containing only the specified `genesis_block`.
pub fn new(genesis: validator::FinalBlock) -> Self {
Self(Mutex::new([(genesis.header().number, genesis)].into()))
Self(Mutex::new([genesis].into()))
}
}

Expand All @@ -27,8 +27,8 @@ impl PersistentBlockStore for BlockStore {
return Ok(None);
}
Ok(Some(BlockStoreState {
first: blocks.first_key_value().unwrap().1.justification.clone(),
last: blocks.last_key_value().unwrap().1.justification.clone(),
first: blocks.front().unwrap().justification.clone(),
last: blocks.back().unwrap().justification.clone(),
}))
}

Expand All @@ -37,7 +37,12 @@ impl PersistentBlockStore for BlockStore {
_ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<Option<validator::FinalBlock>> {
Ok(self.0.lock().unwrap().get(&number).cloned())
let blocks = self.0.lock().unwrap();
let Some(front) = blocks.front() else {
return Ok(None);
};
let idx = number.0 - front.header().number.0;
Ok(blocks.get(idx as usize).cloned())
}

async fn store_next_block(
Expand All @@ -48,12 +53,12 @@ impl PersistentBlockStore for BlockStore {
let mut blocks = self.0.lock().unwrap();
let got = block.header().number;
if !blocks.is_empty() {
let want = blocks.last_key_value().unwrap().0.next();
let want = blocks.back().unwrap().header().number.next();
if got != want {
return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into());
}
}
blocks.insert(got, block.clone());
blocks.push_back(block.clone());
Ok(())
}
}
Expand Down

0 comments on commit 10b3595

Please sign in to comment.