Skip to content

Commit

Permalink
Make SyncBlocks terminate successfully on cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Nov 6, 2023
1 parent b5bfb26 commit f0621c8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
17 changes: 12 additions & 5 deletions node/actors/sync_blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use concurrency::{
};
use network::io::SyncState;
use std::sync::Arc;
use storage::WriteBlockStore;
use storage::{StorageError, StorageResult, WriteBlockStore};
use tracing::instrument;
use utils::pipe::ActorPipe;

Expand Down Expand Up @@ -72,20 +72,27 @@ impl SyncBlocks {
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let storage = self.message_handler.storage.clone();

scope::run!(ctx, |ctx, s| async {
let result = scope::run!(ctx, |ctx, s| async {
s.spawn_bg(Self::emit_state_updates(ctx, storage, &self.state_sender));
s.spawn_bg(self.peer_states.run(ctx));
self.message_handler.process_messages(ctx).await
})
.await
.await;

// Since we clearly type cancellation errors, it's easier propagate them up to this entry point,
// rather than catching in the constituent tasks.
result.or_else(|err| match err {
StorageError::Canceled(_) => Ok(()), // Cancellation is not propagated as an error
StorageError::Database(err) => Err(err),
})
}

#[instrument(level = "trace", skip_all, err)]
async fn emit_state_updates(
ctx: &ctx::Ctx,
storage: Arc<dyn WriteBlockStore>,
state_sender: &watch::Sender<SyncState>,
) -> anyhow::Result<()> {
) -> StorageResult<()> {
let mut storage_subscriber = storage.subscribe_to_block_writes();
loop {
let state = Self::get_sync_state(ctx, storage.as_ref()).await?;
Expand All @@ -104,7 +111,7 @@ impl SyncBlocks {
async fn get_sync_state(
ctx: &ctx::Ctx,
storage: &dyn WriteBlockStore,
) -> anyhow::Result<SyncState> {
) -> StorageResult<SyncState> {
let last_contiguous_block_number = storage.last_contiguous_block_number(ctx).await?;
let last_contiguous_stored_block = storage
.block(ctx, last_contiguous_block_number)
Expand Down
10 changes: 5 additions & 5 deletions node/actors/sync_blocks/src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use concurrency::ctx::{self, channel};
use network::io::{GetBlockError, GetBlockResponse, SyncBlocksRequest};
use roles::validator::BlockNumber;
use std::sync::Arc;
use storage::WriteBlockStore;
use storage::{StorageResult, WriteBlockStore};
use tracing::instrument;

/// Inner details of `SyncBlocks` actor allowing to process messages.
Expand All @@ -22,9 +22,8 @@ pub(crate) struct SyncBlocksMessageHandler {
impl SyncBlocksMessageHandler {
/// Implements the message processing loop.
#[instrument(level = "trace", skip_all, err)]
pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
loop {
let input_message = self.message_receiver.recv(ctx).await?;
pub(crate) async fn process_messages(mut self, ctx: &ctx::Ctx) -> StorageResult<()> {
while let Ok(input_message) = self.message_receiver.recv(ctx).await {
match input_message {
InputMessage::Network(SyncBlocksRequest::UpdatePeerSyncState {
peer,
Expand All @@ -42,6 +41,7 @@ impl SyncBlocksMessageHandler {
}
}
}
Ok(())
}

/// Gets a block with the specified `number` from the storage.
Expand All @@ -52,7 +52,7 @@ impl SyncBlocksMessageHandler {
&self,
ctx: &ctx::Ctx,
number: BlockNumber,
) -> anyhow::Result<GetBlockResponse> {
) -> StorageResult<GetBlockResponse> {
Ok(self
.storage
.block(ctx, number)
Expand Down
16 changes: 8 additions & 8 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use roles::{
validator::{BlockHeader, BlockNumber, FinalBlock, PayloadHash},
};
use std::{collections::HashMap, sync::Arc};
use storage::WriteBlockStore;
use storage::{StorageResult, WriteBlockStore};
use tracing::instrument;

mod events;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl PeerStates {
/// 1. Get information about missing blocks from the storage.
/// 2. Spawn a task processing `SyncState`s from peers.
/// 3. Spawn a task to get each missing block.
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> StorageResult<()> {
let updates_receiver = self.updates_receiver.take().unwrap();
let storage = self.storage.as_ref();
let get_block_semaphore = Semaphore::new(self.config.max_concurrent_blocks);
Expand Down Expand Up @@ -123,7 +123,7 @@ impl PeerStates {
ctx: &ctx::Ctx,
mut updates_receiver: channel::UnboundedReceiver<PeerStateUpdate>,
new_blocks_sender: watch::Sender<BlockNumber>,
) -> anyhow::Result<()> {
) -> StorageResult<()> {
loop {
let (peer_key, sync_state) = updates_receiver.recv(ctx).await?;
let new_last_block_number = self
Expand Down Expand Up @@ -151,7 +151,7 @@ impl PeerStates {
ctx: &ctx::Ctx,
peer_key: node::PublicKey,
state: SyncState,
) -> anyhow::Result<BlockNumber> {
) -> ctx::OrCanceled<BlockNumber> {
let last_contiguous_stored_block = match self.validate_sync_state(state) {
Ok(block_number) => block_number,
Err(err) => {
Expand Down Expand Up @@ -220,7 +220,7 @@ impl PeerStates {
block_number: BlockNumber,
get_block_permit: sync::SemaphorePermit<'_>,
storage: &dyn WriteBlockStore,
) -> anyhow::Result<()> {
) -> StorageResult<()> {
let block = self.get_block(ctx, block_number).await?;
drop(get_block_permit);

Expand All @@ -236,7 +236,7 @@ impl PeerStates {
&self,
ctx: &ctx::Ctx,
block_number: BlockNumber,
) -> anyhow::Result<FinalBlock> {
) -> ctx::OrCanceled<FinalBlock> {
loop {
let Some((peer_key, _permit)) =
Self::acquire_peer_permit(&*sync::lock(ctx, &self.peers).await?, block_number)
Expand Down Expand Up @@ -315,7 +315,7 @@ impl PeerStates {
ctx: &ctx::Ctx,
recipient: node::PublicKey,
number: BlockNumber,
) -> anyhow::Result<Option<FinalBlock>> {
) -> ctx::OrCanceled<Option<FinalBlock>> {
let (response, response_receiver) = oneshot::channel();
let message = SyncBlocksInputMessage::GetBlock {
recipient: recipient.clone(),
Expand Down Expand Up @@ -378,7 +378,7 @@ impl PeerStates {
&self,
ctx: &ctx::Ctx,
peer_key: &node::PublicKey,
) -> anyhow::Result<()> {
) -> ctx::OrCanceled<()> {
let mut peers = sync::lock(ctx, &self.peers).await?;
if let Some(state) = peers.remove(peer_key) {
tracing::trace!(?state, "Dropping peer connection state");
Expand Down

0 comments on commit f0621c8

Please sign in to comment.