diff --git a/node/actors/sync_blocks/src/lib.rs b/node/actors/sync_blocks/src/lib.rs index a8028b51..6992470a 100644 --- a/node/actors/sync_blocks/src/lib.rs +++ b/node/actors/sync_blocks/src/lib.rs @@ -13,7 +13,7 @@ use concurrency::{ }; use network::io::SyncState; use std::sync::Arc; -use storage::WriteBlockStore; +use storage::{StorageError, StorageResult, WriteBlockStore}; use tracing::instrument; use utils::pipe::ActorPipe; @@ -72,12 +72,19 @@ impl SyncBlocks { pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let storage = self.message_handler.storage.clone(); - scope::run!(ctx, |ctx, s| async { + let result = scope::run!(ctx, |ctx, s| async { s.spawn_bg(Self::emit_state_updates(ctx, storage, &self.state_sender)); s.spawn_bg(self.peer_states.run(ctx)); self.message_handler.process_messages(ctx).await }) - .await + .await; + + // Since we clearly type cancellation errors, it's easier propagate them up to this entry point, + // rather than catching in the constituent tasks. + result.or_else(|err| match err { + StorageError::Canceled(_) => Ok(()), // Cancellation is not propagated as an error + StorageError::Database(err) => Err(err), + }) } #[instrument(level = "trace", skip_all, err)] @@ -85,7 +92,7 @@ impl SyncBlocks { ctx: &ctx::Ctx, storage: Arc, state_sender: &watch::Sender, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { let mut storage_subscriber = storage.subscribe_to_block_writes(); loop { let state = Self::get_sync_state(ctx, storage.as_ref()).await?; @@ -104,7 +111,7 @@ impl SyncBlocks { async fn get_sync_state( ctx: &ctx::Ctx, storage: &dyn WriteBlockStore, - ) -> anyhow::Result { + ) -> StorageResult { let last_contiguous_block_number = storage.last_contiguous_block_number(ctx).await?; let last_contiguous_stored_block = storage .block(ctx, last_contiguous_block_number) diff --git a/node/actors/sync_blocks/src/message_handler.rs b/node/actors/sync_blocks/src/message_handler.rs index e3779c54..16343134 100644 --- a/node/actors/sync_blocks/src/message_handler.rs +++ b/node/actors/sync_blocks/src/message_handler.rs @@ -5,7 +5,7 @@ use concurrency::ctx::{self, channel}; use network::io::{GetBlockError, GetBlockResponse, SyncBlocksRequest}; use roles::validator::BlockNumber; use std::sync::Arc; -use storage::WriteBlockStore; +use storage::{StorageResult, WriteBlockStore}; use tracing::instrument; /// Inner details of `SyncBlocks` actor allowing to process messages. @@ -22,9 +22,8 @@ pub(crate) struct SyncBlocksMessageHandler { impl SyncBlocksMessageHandler { /// Implements the message processing loop. #[instrument(level = "trace", skip_all, err)] - pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - loop { - let input_message = self.message_receiver.recv(ctx).await?; + pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { + while let Ok(input_message) = self.message_receiver.recv(ctx).await { match input_message { InputMessage::Network(SyncBlocksRequest::UpdatePeerSyncState { peer, @@ -42,6 +41,7 @@ impl SyncBlocksMessageHandler { } } } + Ok(()) } /// Gets a block with the specified `number` from the storage. @@ -52,7 +52,7 @@ impl SyncBlocksMessageHandler { &self, ctx: &ctx::Ctx, number: BlockNumber, - ) -> anyhow::Result { + ) -> StorageResult { Ok(self .storage .block(ctx, number) diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 4485a386..1e2d5da6 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -14,7 +14,7 @@ use roles::{ validator::{BlockHeader, BlockNumber, FinalBlock, PayloadHash}, }; use std::{collections::HashMap, sync::Arc}; -use storage::WriteBlockStore; +use storage::{StorageResult, WriteBlockStore}; use tracing::instrument; mod events; @@ -78,7 +78,7 @@ impl PeerStates { /// 1. Get information about missing blocks from the storage. /// 2. Spawn a task processing `SyncState`s from peers. /// 3. Spawn a task to get each missing block. - pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> { let updates_receiver = self.updates_receiver.take().unwrap(); let storage = self.storage.as_ref(); let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks); @@ -123,7 +123,7 @@ impl PeerStates { ctx: &ctx::Ctx, mut updates_receiver: channel::UnboundedReceiver, new_blocks_sender: watch::Sender, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { loop { let (peer_key, sync_state) = updates_receiver.recv(ctx).await?; let new_last_block_number = self @@ -151,7 +151,7 @@ impl PeerStates { ctx: &ctx::Ctx, peer_key: node::PublicKey, state: SyncState, - ) -> anyhow::Result { + ) -> ctx::OrCanceled { let last_contiguous_stored_block = match self.validate_sync_state(state) { Ok(block_number) => block_number, Err(err) => { @@ -220,7 +220,7 @@ impl PeerStates { block_number: BlockNumber, get_block_permit: sync::SemaphorePermit<'_>, storage: &dyn WriteBlockStore, - ) -> anyhow::Result<()> { + ) -> StorageResult<()> { let block = self.get_block(ctx, block_number).await?; drop(get_block_permit); @@ -236,7 +236,7 @@ impl PeerStates { &self, ctx: &ctx::Ctx, block_number: BlockNumber, - ) -> anyhow::Result { + ) -> ctx::OrCanceled { loop { let Some((peer_key, _permit)) = Self::acquire_peer_permit(&*sync::lock(ctx, &self.peers).await?, block_number) @@ -315,7 +315,7 @@ impl PeerStates { ctx: &ctx::Ctx, recipient: node::PublicKey, number: BlockNumber, - ) -> anyhow::Result> { + ) -> ctx::OrCanceled> { let (response, response_receiver) = oneshot::channel(); let message = SyncBlocksInputMessage::GetBlock { recipient: recipient.clone(), @@ -378,7 +378,7 @@ impl PeerStates { &self, ctx: &ctx::Ctx, peer_key: &node::PublicKey, - ) -> anyhow::Result<()> { + ) -> ctx::OrCanceled<()> { let mut peers = sync::lock(ctx, &self.peers).await?; if let Some(state) = peers.remove(peer_key) { tracing::trace!(?state, "Dropping peer connection state");