diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index ca4d4845bf298..07ce724ccd0ae 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -18,8 +18,6 @@ use crate::execution_cache::TransactionCacheRead; use crate::stake_aggregator::{InsertResult, MultiStakeAggregator}; use crate::state_accumulator::StateAccumulator; use diffy::create_patch; -use futures::future::{select, Either}; -use futures::FutureExt; use itertools::Itertools; use mysten_metrics::{monitored_future, monitored_scope, MonitoredFutureExt}; use parking_lot::Mutex; @@ -63,11 +61,7 @@ use sui_types::messages_consensus::ConsensusTransactionKey; use sui_types::signature::GenericSignature; use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait}; use sui_types::transaction::{TransactionDataAPI, TransactionKey, TransactionKind}; -use tokio::{ - sync::{watch, Notify}, - task::JoinSet, - time::timeout, -}; +use tokio::{sync::Notify, task::JoinSet, time::timeout}; use tracing::{debug, error, info, instrument, warn}; use typed_store::traits::{TableSummary, TypedStoreDebug}; use typed_store::DBMapUtils; @@ -863,7 +857,6 @@ pub struct CheckpointBuilder { effects_store: Arc, accumulator: Weak, output: Box, - exit: watch::Receiver<()>, metrics: Arc, max_transactions_per_checkpoint: usize, max_checkpoint_size_bytes: usize, @@ -873,7 +866,6 @@ pub struct CheckpointAggregator { tables: Arc, epoch_store: Arc, notify: Arc, - exit: watch::Receiver<()>, current: Option, output: Box, state: Arc, @@ -901,7 +893,6 @@ impl CheckpointBuilder { effects_store: Arc, accumulator: Weak, output: Box, - exit: watch::Receiver<()>, notify_aggregator: Arc, metrics: Arc, max_transactions_per_checkpoint: usize, @@ -915,7 +906,6 @@ impl CheckpointBuilder { effects_store, accumulator, output, - exit, notify_aggregator, metrics, max_transactions_per_checkpoint, @@ -926,26 +916,10 @@ impl CheckpointBuilder { async fn run(mut self) { info!("Starting CheckpointBuilder"); loop { - // Check whether an exit signal has been received, if so we break the loop. - // This gives us a chance to exit, in case checkpoint making keeps failing. - match self.exit.has_changed() { - Ok(true) | Err(_) => { - break; - } - Ok(false) => (), - }; - self.maybe_build_checkpoints().await; - match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await { - Either::Left(_) => { - // break loop on exit signal - break; - } - Either::Right(_) => {} - } + self.notify.notified().await; } - info!("Shutting down CheckpointBuilder"); } async fn maybe_build_checkpoints(&mut self) { @@ -1769,7 +1743,6 @@ impl CheckpointAggregator { tables: Arc, epoch_store: Arc, notify: Arc, - exit: watch::Receiver<()>, output: Box, state: Arc, metrics: Arc, @@ -1779,7 +1752,6 @@ impl CheckpointAggregator { tables, epoch_store, notify, - exit, current, output, state, @@ -1800,19 +1772,7 @@ impl CheckpointAggregator { continue; } - match select( - self.exit.changed().boxed(), - timeout(Duration::from_secs(1), self.notify.notified()).boxed(), - ) - .await - { - Either::Left(_) => { - // return on exit signal - info!("Shutting down CheckpointAggregator"); - return; - } - Either::Right(_) => {} - } + let _ = timeout(Duration::from_secs(1), self.notify.notified()).await; } } @@ -2241,19 +2201,13 @@ impl CheckpointService { metrics: Arc, max_transactions_per_checkpoint: usize, max_checkpoint_size_bytes: usize, - ) -> ( - Arc, - watch::Sender<()>, /* The exit sender */ - JoinSet<()>, /* Handle to tasks */ - ) { + ) -> (Arc, JoinSet<()> /* Handle to tasks */) { info!( "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes" ); let notify_builder = Arc::new(Notify::new()); let notify_aggregator = Arc::new(Notify::new()); - let (exit_snd, exit_rcv) = watch::channel(()); - let mut tasks = JoinSet::new(); let builder = CheckpointBuilder::new( @@ -2264,22 +2218,17 @@ impl CheckpointService { effects_store, accumulator, checkpoint_output, - exit_rcv.clone(), notify_aggregator.clone(), metrics.clone(), max_transactions_per_checkpoint, max_checkpoint_size_bytes, ); - let epoch_store_clone = epoch_store.clone(); - tasks.spawn(monitored_future!(async move { - let _ = epoch_store_clone.within_alive_epoch(builder.run()).await; - })); + tasks.spawn(monitored_future!(builder.run())); let aggregator = CheckpointAggregator::new( checkpoint_store.clone(), epoch_store.clone(), notify_aggregator.clone(), - exit_rcv, certified_checkpoint_output, state.clone(), metrics.clone(), @@ -2299,7 +2248,7 @@ impl CheckpointService { metrics, }); - (service, exit_snd, tasks) + (service, tasks) } #[cfg(test)] @@ -2412,6 +2361,7 @@ mod tests { use super::*; use crate::authority::test_authority_builder::TestAuthorityBuilder; use futures::future::BoxFuture; + use futures::FutureExt as _; use shared_crypto::intent::{Intent, IntentScope}; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; @@ -2543,7 +2493,7 @@ mod tests { &epoch_store, )); - let (checkpoint_service, _exit_sender, _tasks) = CheckpointService::spawn( + let (checkpoint_service, _tasks) = CheckpointService::spawn( state.clone(), checkpoint_store, epoch_store.clone(), diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index 58b00cf1f7a8e..f14a3eb560a0d 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -33,7 +33,7 @@ pub fn checkpoint_service_for_testing(state: Arc) -> Arc(10); - let (checkpoint_service, _, _) = CheckpointService::spawn( + let (checkpoint_service, _) = CheckpointService::spawn( state.clone(), state.get_checkpoint_store().clone(), epoch_store.clone(), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 9fee10d194e73..55a3c0467695a 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -13,7 +13,6 @@ use arc_swap::ArcSwap; use fastcrypto_zkp::bn254::zk_login::JwkId; use fastcrypto_zkp::bn254::zk_login::OIDCProvider; use futures::TryFutureExt; -use mysten_common::debug_fatal; use prometheus::Registry; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt; @@ -48,7 +47,6 @@ use tap::tap::TapFallible; use tokio::runtime::Handle; use tokio::sync::{broadcast, mpsc, watch, Mutex}; use tokio::task::{JoinHandle, JoinSet}; -use tokio::time::timeout; use tower::ServiceBuilder; use tracing::{debug, error, warn}; use tracing::{error_span, info, Instrument}; @@ -150,10 +148,6 @@ pub struct ValidatorComponents { consensus_manager: ConsensusManager, consensus_store_pruner: ConsensusStorePruner, consensus_adapter: Arc, - // Sending to the channel or dropping this will eventually stop checkpoint tasks. - // The receiver side of this channel is copied into each checkpoint service task, - // and they are listening to any change to this channel. - checkpoint_service_exit: watch::Sender<()>, // Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration. checkpoint_service_tasks: JoinSet<()>, checkpoint_metrics: Arc, @@ -1291,17 +1285,16 @@ impl SuiNode { sui_node_metrics: Arc, sui_tx_validator_metrics: Arc, ) -> Result { - let (checkpoint_service, checkpoint_service_exit, checkpoint_service_tasks) = - Self::start_checkpoint_service( - config, - consensus_adapter.clone(), - checkpoint_store, - epoch_store.clone(), - state.clone(), - state_sync_handle, - accumulator, - checkpoint_metrics.clone(), - ); + let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service( + config, + consensus_adapter.clone(), + checkpoint_store, + epoch_store.clone(), + state.clone(), + state_sync_handle, + accumulator, + checkpoint_metrics.clone(), + ); // create a new map that gets injected into both the consensus handler and the consensus adapter // the consensus handler will write values forwarded from consensus, and the consensus adapter @@ -1378,7 +1371,6 @@ impl SuiNode { consensus_manager, consensus_store_pruner, consensus_adapter, - checkpoint_service_exit, checkpoint_service_tasks, checkpoint_metrics, sui_tx_validator_metrics, @@ -1394,7 +1386,7 @@ impl SuiNode { state_sync_handle: state_sync::Handle, accumulator: Weak, checkpoint_metrics: Arc, - ) -> (Arc, watch::Sender<()>, JoinSet<()>) { + ) -> (Arc, JoinSet<()>) { let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms(); let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms(); @@ -1688,31 +1680,25 @@ impl SuiNode { consensus_manager, consensus_store_pruner, consensus_adapter, - checkpoint_service_exit, mut checkpoint_service_tasks, checkpoint_metrics, sui_tx_validator_metrics, }) = self.validator_components.lock().await.take() { info!("Reconfiguring the validator."); - // Stop the old checkpoint service and wait for them to finish. - let _ = checkpoint_service_exit.send(()); - let wait_result = timeout(Duration::from_secs(5), async move { - while let Some(result) = checkpoint_service_tasks.join_next().await { - if let Err(err) = result { - if err.is_panic() { - std::panic::resume_unwind(err.into_panic()); - } - warn!("Error in checkpoint service task: {:?}", err); + // Cancel the old checkpoint service tasks. + // Waiting for checkpoint builder to finish gracefully is not possible, because it + // may wait on transactions while consensus on peers have already shut down. + checkpoint_service_tasks.abort_all(); + while let Some(result) = checkpoint_service_tasks.join_next().await { + if let Err(err) = result { + if err.is_panic() { + std::panic::resume_unwind(err.into_panic()); } + warn!("Error in checkpoint service task: {:?}", err); } - }) - .await; - if wait_result.is_err() { - debug_fatal!("Timed out waiting for checkpoint service tasks to finish."); - } else { - info!("Checkpoint service has shut down."); } + info!("Checkpoint service has shut down."); consensus_manager.shutdown().await; info!("Consensus has shut down.");