From d5377601dd7c709d6966068152061d9235a0b61f Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 23 Oct 2023 11:03:59 +0300 Subject: [PATCH] Fix cancellation detection with storage tasks --- node/actors/consensus/src/replica/state_machine.rs | 14 +++++++++----- node/actors/sync_blocks/src/peers/tests.rs | 2 +- node/actors/sync_blocks/src/tests/end_to_end.rs | 2 +- node/actors/sync_blocks/src/tests/mod.rs | 4 ++-- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/node/actors/consensus/src/replica/state_machine.rs b/node/actors/consensus/src/replica/state_machine.rs index 334f62bf..24577982 100644 --- a/node/actors/consensus/src/replica/state_machine.rs +++ b/node/actors/consensus/src/replica/state_machine.rs @@ -5,7 +5,7 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, }; -use storage::ReplicaStateStore; +use storage::{ReplicaStateStore, StorageError}; use tracing::{instrument, warn}; /// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible @@ -120,12 +120,16 @@ impl StateMachine { block_proposal_cache: self.block_proposal_cache.clone(), }; - scope::run_blocking!(ctx, |ctx, s| { + let store_result = scope::run_blocking!(ctx, |ctx, s| { let backup_future = self.storage.put_replica_state(ctx, &backup); s.spawn(backup_future).join(ctx).block()?; Ok(()) - }) - .expect("Failed backing up replica state"); - // ^ We don't know how to recover from DB errors, so panicking is the only option so far. + }); + match store_result { + Ok(()) => { /* Everything went fine */ } + Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"), + Err(StorageError::Database(err)) => panic!("Failed storing replica state: {err}"), + // ^ We don't know how to recover from DB errors, so panicking is the only option so far. + } } } diff --git a/node/actors/sync_blocks/src/peers/tests.rs b/node/actors/sync_blocks/src/peers/tests.rs index 210fe1b1..5175a5c6 100644 --- a/node/actors/sync_blocks/src/peers/tests.rs +++ b/node/actors/sync_blocks/src/peers/tests.rs @@ -95,7 +95,7 @@ async fn test_peer_states(test: T) { scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { peer_states.run(ctx).await.or_else(|err| { - if err.is::() { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err) diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 75a25632..99b22fdc 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -254,7 +254,7 @@ async fn test_sync_blocks(test: T) { .unwrap_err(); tracing::trace!(?key, "Node task completed"); - if err.is::() { + if err.root_cause().is::() { Ok(()) // Test has successfully completed } else { Err(err) diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 0d2f3ca0..35061374 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -123,7 +123,7 @@ async fn subscribing_to_state_updates() { scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { actor.run(ctx).await.or_else(|err| { - if err.is::() { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err) @@ -221,7 +221,7 @@ async fn getting_blocks() { scope::run!(ctx, |ctx, s| async { s.spawn_bg(async { actor.run(ctx).await.or_else(|err| { - if err.is::() { + if err.root_cause().is::() { Ok(()) // Swallow cancellation errors after the test is finished } else { Err(err)