diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 2723fdf1..bdf21b77 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -68,14 +68,14 @@ impl PeerStates { state: BlockStoreState, ) -> anyhow::Result<()> { use std::collections::hash_map::Entry; - + let last = state.last.header().number; anyhow::ensure!(state.first.header().number <= state.last.header().number); state .last .verify(&self.config.validator_set, self.config.consensus_threshold) .context("state.last.verify()")?; - let mut peers = self.peers.lock().unwrap(); + let mut peers = self.peers.lock().unwrap(); match peers.entry(peer.clone()) { Entry::Occupied(mut e) => e.get_mut().state = state, Entry::Vacant(e) => { @@ -86,13 +86,14 @@ impl PeerStates { }); } } - self.highest_peer_block.send_if_modified(|highest_peer_block| { - if *highest_peer_block >= last { - return false; - } - *highest_peer_block = last; - true - }); + self.highest_peer_block + .send_if_modified(|highest_peer_block| { + if *highest_peer_block >= last { + return false; + } + *highest_peer_block = last; + true + }); Ok(()) } @@ -103,7 +104,10 @@ impl PeerStates { let mut next = self.storage.subscribe().borrow().next(); let mut highest_peer_block = self.highest_peer_block.subscribe(); loop { - sync::wait_for(ctx, &mut highest_peer_block, |highest_peer_block| highest_peer_block >= &next).await?; + sync::wait_for(ctx, &mut highest_peer_block, |highest_peer_block| { + highest_peer_block >= &next + }) + .await?; let permit = sync::acquire(ctx, &sem).await?; let block_number = NoCopy::from(next); next = next.next(); @@ -119,10 +123,12 @@ impl PeerStates { /// Fetches the block from peers and puts it to storage. /// Early exits if the block appeared in storage from other source. async fn fetch_block(&self, ctx: &ctx::Ctx, block_number: BlockNumber) -> ctx::OrCanceled<()> { - scope::run!(ctx, |ctx,s| async { + scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { match self.fetch_block_from_peers(ctx, block_number).await { - Ok(block) => { let _ = self.storage.store_block(ctx,block).await; } + Ok(block) => { + let _ = self.storage.store_block(ctx, block).await; + } Err(ctx::Canceled) => { if let Some(send) = &self.events_sender { send.send(PeerStateEvent::CanceledBlock(block_number)); @@ -132,9 +138,10 @@ impl PeerStates { Ok(()) }); // Cancel fetching as soon as block is queued for storage. - self.storage.wait_until_queued(ctx,block_number).await - }).await?; - self.storage.wait_until_stored(ctx,block_number).await + self.storage.wait_until_queued(ctx, block_number).await + }) + .await?; + self.storage.wait_until_stored(ctx, block_number).await } /// Fetches the block from peers. diff --git a/node/actors/sync_blocks/src/peers/tests/basics.rs b/node/actors/sync_blocks/src/peers/tests/basics.rs index ada87c12..5c552836 100644 --- a/node/actors/sync_blocks/src/peers/tests/basics.rs +++ b/node/actors/sync_blocks/src/peers/tests/basics.rs @@ -80,7 +80,7 @@ impl Test for CancelingBlockRetrieval { .unwrap(); // Check that the actor has sent a `get_block` request to the peer - let io::OutputMessage::Network(..) = message_receiver.recv(ctx).await?; + let io::OutputMessage::Network(..) = message_receiver.recv(ctx).await?; // Emulate receiving block using external means. storage @@ -257,11 +257,13 @@ impl Test for DisconnectingPeer { // Drop the response sender emulating peer disconnect. let msg = message_receiver.recv(ctx).await?; { - let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock{ - recipient, number, .. + let io::OutputMessage::Network(SyncBlocksInputMessage::GetBlock { + recipient, + number, + .. }) = &msg; - assert_eq!(recipient,&peer_key); - assert_eq!(number,&BlockNumber(1)); + assert_eq!(recipient, &peer_key); + assert_eq!(number, &BlockNumber(1)); } drop(msg); diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 014c0f4b..362d64ef 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -183,7 +183,7 @@ async fn getting_blocks() { let protocol_version = validator::ProtocolVersion::EARLIEST; let genesis_block = make_genesis_block(rng, protocol_version); - let (storage, runner) = make_store(ctx, genesis_block.clone()).await; + let (storage, runner) = make_store(ctx, genesis_block.clone()).await; let (actor_pipe, dispatcher_pipe) = pipe::new(); let cfg: Config = rng.gen(); diff --git a/node/libs/storage/src/block_store.rs b/node/libs/storage/src/block_store.rs index f3c746aa..840fe93b 100644 --- a/node/libs/storage/src/block_store.rs +++ b/node/libs/storage/src/block_store.rs @@ -104,7 +104,7 @@ impl BlockStore { /// Constructs a BlockStore. /// BlockStore takes ownership of the passed PersistentBlockStore, /// i.e. caller should modify the underlying persistent storage - /// (add/remove blocks) ONLY through the constructed BlockStore. + /// ONLY through the constructed BlockStore. pub async fn new( ctx: &ctx::Ctx, persistent: Box, @@ -187,7 +187,7 @@ impl BlockStore { inner.queue.push_back(block); true }); - self.wait_until_stored(ctx,number).await + self.wait_until_stored(ctx, number).await } /// Waits until the given block is queued to be stored. diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index f3bca597..b9da51b9 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,12 +1,12 @@ //! In-memory storage implementation. use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; -use std::{collections::BTreeMap, sync::Mutex}; +use std::{collections::VecDeque, sync::Mutex}; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; /// In-memory block store. #[derive(Debug, Default)] -pub struct BlockStore(Mutex>); +pub struct BlockStore(Mutex>); /// In-memory replica store. #[derive(Debug, Default)] @@ -15,7 +15,7 @@ pub struct ReplicaStore(Mutex>); impl BlockStore { /// Creates a new store containing only the specified `genesis_block`. pub fn new(genesis: validator::FinalBlock) -> Self { - Self(Mutex::new([(genesis.header().number, genesis)].into())) + Self(Mutex::new([genesis].into())) } } @@ -27,8 +27,8 @@ impl PersistentBlockStore for BlockStore { return Ok(None); } Ok(Some(BlockStoreState { - first: blocks.first_key_value().unwrap().1.justification.clone(), - last: blocks.last_key_value().unwrap().1.justification.clone(), + first: blocks.front().unwrap().justification.clone(), + last: blocks.back().unwrap().justification.clone(), })) } @@ -37,7 +37,12 @@ impl PersistentBlockStore for BlockStore { _ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result> { - Ok(self.0.lock().unwrap().get(&number).cloned()) + let blocks = self.0.lock().unwrap(); + let Some(front) = blocks.front() else { + return Ok(None); + }; + let idx = number.0 - front.header().number.0; + Ok(blocks.get(idx as usize).cloned()) } async fn store_next_block( @@ -48,12 +53,12 @@ impl PersistentBlockStore for BlockStore { let mut blocks = self.0.lock().unwrap(); let got = block.header().number; if !blocks.is_empty() { - let want = blocks.last_key_value().unwrap().0.next(); + let want = blocks.back().unwrap().header().number.next(); if got != want { return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); } } - blocks.insert(got, block.clone()); + blocks.push_back(block.clone()); Ok(()) } }