From 7efdf2f3bd302dcfb3c53985d87d0a038314af37 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 06:35:36 +0000 Subject: [PATCH 01/16] feat: Use DATANODE_LEASE_SECS from distributed_time_constants for heartbeat pause duration --- src/meta-srv/src/failure_detector.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index bc7f4972fd00..116a5a827b28 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::time::Duration; +use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; /// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)" @@ -82,7 +83,9 @@ impl Default for PhiAccrualFailureDetectorOptions { Self { threshold: 8_f32, min_std_deviation: Duration::from_millis(100), - acceptable_heartbeat_pause: Duration::from_millis(3000), + acceptable_heartbeat_pause: Duration::from_secs( + distributed_time_constants::DATANODE_LEASE_SECS, + ), first_heartbeat_estimate: Duration::from_millis(1000), } } From e5149b226ade78df78ffb4fababd4d3bbd99a007 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 08:06:40 +0000 Subject: [PATCH 02/16] feat: introduce `RegionFailureDetectorController` to manage region failure detectors --- src/meta-srv/src/failure_detector.rs | 4 +- src/meta-srv/src/region/failure_detector.rs | 17 ++- src/meta-srv/src/region/supervisor.rs | 121 +++++++++++++++++++- 3 files changed, 138 insertions(+), 4 deletions(-) diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index 116a5a827b28..ac0df6c2dc13 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -38,7 +38,7 @@ use serde::{Deserialize, Serialize}; /// /// where F is the cumulative distribution function of a normal distribution with mean /// and standard deviation estimated from historical heartbeat inter-arrival times. -#[cfg_attr(test, derive(Clone))] +#[cfg_attr(test, derive(Debug, Clone, PartialEq))] pub(crate) struct PhiAccrualFailureDetector { /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but @@ -198,7 +198,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 { /// It is capped by the number of samples specified in `max_sample_size`. /// /// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] struct HeartbeatHistory { /// Number of samples to use for calculation of mean and standard deviation of inter-arrival /// times. diff --git a/src/meta-srv/src/region/failure_detector.rs b/src/meta-srv/src/region/failure_detector.rs index e9c574cadd3e..2821a17328d9 100644 --- a/src/meta-srv/src/region/failure_detector.rs +++ b/src/meta-srv/src/region/failure_detector.rs @@ -51,7 +51,7 @@ impl RegionFailureDetector { } } - /// Returns [PhiAccrualFailureDetector] of the specific ([DatanodeId],[RegionId]). + /// Returns [`PhiAccrualFailureDetector`] of the specific [`Ident`]. pub(crate) fn region_failure_detector( &self, ident: Ident, @@ -61,6 +61,21 @@ impl RegionFailureDetector { .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options)) } + /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`Ident`]. + /// If a detector already exists for the region, it is returned. Otherwise, a new + /// detector is created and initialized with the provided timestamp. + pub(crate) fn maybe_init_region_failure_detector( + &self, + ident: Ident, + ts_millis: i64, + ) -> impl DerefMut + '_ { + self.detectors.entry(ident).or_insert_with(|| { + let mut detector = PhiAccrualFailureDetector::from_options(self.options); + detector.heartbeat(ts_millis); + detector + }) + } + /// Returns a [FailureDetectorEntry] iterator. pub(crate) fn iter(&self) -> impl Iterator + '_ { self.detectors diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 83b264eaa4ba..78d83d679f66 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -29,13 +29,13 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{interval, MissedTickBehavior}; -use super::failure_detector::RegionFailureDetector; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::node_stat::Stat; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::region_migration::RegionMigrationProcedureTask; +use crate::region::failure_detector::{Ident, RegionFailureDetector}; use crate::selector::SelectorOptions; /// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. @@ -75,6 +75,8 @@ impl From<&Stat> for DatanodeHeartbeat { /// of the supervisor during tests. pub(crate) enum Event { Tick, + RegisterFailureDetectors(Vec), + DeregisterFailureDetectors(Vec), HeartbeatArrived(DatanodeHeartbeat), Clear, #[cfg(test)] @@ -87,6 +89,14 @@ impl Debug for Event { Self::Tick => write!(f, "Tick"), Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(), Self::Clear => write!(f, "Clear"), + Self::RegisterFailureDetectors(arg0) => f + .debug_tuple("RegisterFailureDetectors") + .field(arg0) + .finish(), + Self::DeregisterFailureDetectors(arg0) => f + .debug_tuple("DeregisterFailureDetectors") + .field(arg0) + .finish(), #[cfg(test)] Self::Dump(_) => f.debug_struct("Dump").finish(), } @@ -178,6 +188,59 @@ pub struct RegionSupervisor { peer_lookup: PeerLookupServiceRef, } +pub type RegionFailureDetectorControllerRef = Arc; + +#[async_trait::async_trait] +pub trait RegionFailureDetectorController { + /// Registers failure detectors for the given identifiers. + async fn register_failure_detectors(&self, ident: Vec); + + /// Deregisters failure detectors for the given identifiers. + async fn deregister_failure_detectors(&self, ident: Vec); +} + +/// Controller for managing failure detectors for regions. +#[derive(Debug, Clone)] +pub struct RegionFailureDetectorControl { + sender: Sender, +} + +#[async_trait::async_trait] +impl RegionFailureDetectorController for RegionFailureDetectorControl { + async fn register_failure_detectors(&self, ident: Vec) { + if self + .sender + .send(Event::RegisterFailureDetectors(ident)) + .await + .is_err() + { + error!("RegionSupervisor is stop receiving heartbeat"); + } + } + + async fn deregister_failure_detectors(&self, ident: Vec) { + if self + .sender + .send(Event::DeregisterFailureDetectors(ident)) + .await + .is_err() + { + error!("RegionSupervisor is stop receiving heartbeat"); + } + } +} + +/// A noop implementation of [`RegionFailureDetectorController`]. +#[derive(Debug, Clone)] +pub struct NoopRegionFailureDetectorControl; + +#[async_trait::async_trait] +impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { + async fn register_failure_detectors(&self, _ident: Vec) {} + + async fn deregister_failure_detectors(&self, _ident: Vec) {} +} + /// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`]. pub(crate) struct HeartbeatAcceptor { sender: Sender, @@ -231,6 +294,13 @@ impl RegionSupervisor { } } + /// Returns the [`RegionFailureDetectorControllerRef`]. + pub(crate) fn failure_detector_controller(&self) -> RegionFailureDetectorControllerRef { + Arc::new(RegionFailureDetectorControl { + sender: self.sender.clone(), + }) + } + /// Returns the [`RegionSupervisorTicker`]. pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef { Arc::new(RegionSupervisorTicker { @@ -248,6 +318,12 @@ impl RegionSupervisor { let regions = self.detect_region_failure(); self.handle_region_failures(regions).await; } + Event::RegisterFailureDetectors(ident) => { + self.register_failure_detectors(ident).await + } + Event::DeregisterFailureDetectors(ident) => { + self.deregister_failure_detectors(ident).await + } Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), Event::Clear => self.clear(), #[cfg(test)] @@ -259,6 +335,21 @@ impl RegionSupervisor { info!("RegionSupervisor is stopped!"); } + async fn register_failure_detectors(&self, idents: Vec) { + let ts_millis = current_time_millis(); + for ident in idents { + // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode. + self.failure_detector + .maybe_init_region_failure_detector(ident, ts_millis); + } + } + + async fn deregister_failure_detectors(&self, idents: Vec) { + for ident in idents { + self.failure_detector.remove(&ident) + } + } + async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) { if regions.is_empty() { return; @@ -526,4 +617,32 @@ pub(crate) mod tests { } } } + + #[tokio::test] + async fn test_region_failure_detector_controller() { + let mut supervisor = new_test_supervisor(); + let sender = supervisor.sender(); + let controller = supervisor.failure_detector_controller(); + tokio::spawn(async move { supervisor.run().await }); + let ident = (0, 1, RegionId::new(1, 1)); + controller.register_failure_detectors(vec![ident]).await; + + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + let region_detector = detector.region_failure_detector(ident).clone(); + + // Registers failure detector again + controller.register_failure_detectors(vec![ident]).await; + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + let got = detector.region_failure_detector(ident).clone(); + assert_eq!(region_detector, got); + + controller.deregister_failure_detectors(vec![ident]).await; + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + assert!(rx.await.unwrap().is_empty()); + } } From 11b35d93169596aa714f8a3fe9235b486c0fc710 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 08:36:03 +0000 Subject: [PATCH 03/16] feat: add `RegionFailureDetectorController` to `DdlContext` --- src/cmd/src/standalone.rs | 3 +- src/common/meta/src/ddl.rs | 28 ++++++++++++++-- src/common/meta/src/ddl_manager.rs | 3 +- src/common/meta/src/test_util.rs | 3 +- src/meta-srv/src/metasrv/builder.rs | 48 +++++++++++++++------------ src/meta-srv/src/procedure/utils.rs | 3 +- src/meta-srv/src/region/supervisor.rs | 23 +------------ tests-integration/src/standalone.rs | 3 +- 8 files changed, 63 insertions(+), 51 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b48951062fc2..316aa6db7f25 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -25,7 +25,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -559,6 +559,7 @@ impl StartCommand { flow_metadata_manager, flow_metadata_allocator, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager, true, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3e9443d88d4b..c2051964ad33 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; -use store_api::storage::{RegionNumber, TableId}; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::flow_meta::FlowMetadataAllocatorRef; @@ -30,7 +30,7 @@ use crate::peer::PeerLookupServiceRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; -use crate::ClusterId; +use crate::{ClusterId, DatanodeId}; pub mod alter_logical_tables; pub mod alter_table; @@ -102,6 +102,28 @@ pub struct TableMetadata { pub region_wal_options: HashMap, } +pub type RegionFailureDetectorControllerRef = Arc; + +#[async_trait::async_trait] +pub trait RegionFailureDetectorController: Send + Sync { + /// Registers failure detectors for the given identifiers. + async fn register_failure_detectors(&self, ident: Vec<(ClusterId, DatanodeId, RegionId)>); + + /// Deregisters failure detectors for the given identifiers. + async fn deregister_failure_detectors(&self, ident: Vec<(ClusterId, DatanodeId, RegionId)>); +} + +/// A noop implementation of [`RegionFailureDetectorController`]. +#[derive(Debug, Clone)] +pub struct NoopRegionFailureDetectorControl; + +#[async_trait::async_trait] +impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { + async fn register_failure_detectors(&self, _ident: Vec<(ClusterId, DatanodeId, RegionId)>) {} + + async fn deregister_failure_detectors(&self, _ident: Vec<(ClusterId, DatanodeId, RegionId)>) {} +} + /// The context of ddl. #[derive(Clone)] pub struct DdlContext { @@ -121,4 +143,6 @@ pub struct DdlContext { pub flow_metadata_allocator: FlowMetadataAllocatorRef, /// look up peer by id. pub peer_lookup_service: PeerLookupServiceRef, + /// controller of region failure detector. + pub region_failure_detector_controller: RegionFailureDetectorControllerRef, } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1228de66dab7..567498a38dbc 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -805,7 +805,7 @@ mod tests { use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; - use crate::ddl::DdlContext; + use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -856,6 +856,7 @@ mod tests { flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), true, diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index a68b93597737..44c534dc32d8 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -24,7 +24,7 @@ use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; -use crate::ddl::DdlContext; +use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use crate::error::Result; use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; @@ -182,6 +182,7 @@ pub fn new_ddl_context_with_kv_backend( flow_metadata_allocator, flow_metadata_manager, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 3f3a86b2e8bf..74608e0e2b96 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -22,7 +22,7 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::DdlContext; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; use common_meta::key::flow::FlowMetadataManager; @@ -282,24 +282,6 @@ impl MetasrvBuilder { }, )); let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); - let ddl_manager = Arc::new( - DdlManager::try_new( - DdlContext { - node_manager, - cache_invalidator, - memory_region_keeper: memory_region_keeper.clone(), - table_metadata_manager: table_metadata_manager.clone(), - table_metadata_allocator: table_metadata_allocator.clone(), - flow_metadata_manager: flow_metadata_manager.clone(), - flow_metadata_allocator: flow_metadata_allocator.clone(), - peer_lookup_service: peer_lookup_service.clone(), - }, - procedure_manager.clone(), - true, - ) - .context(error::InitDdlManagerSnafu)?, - ); - let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( @@ -318,7 +300,7 @@ impl MetasrvBuilder { .fail(); } - let (region_failover_handler, region_supervisor_ticker) = + let (region_failover_handler, region_supervisor_ticker, region_failure_detector_controller) = if options.enable_region_failover && is_remote_wal { let region_supervisor = RegionSupervisor::new( options.failure_detector, @@ -327,17 +309,39 @@ impl MetasrvBuilder { selector.clone(), region_migration_manager.clone(), leader_cached_kv_backend.clone() as _, - peer_lookup_service, + peer_lookup_service.clone(), ); let region_supervisor_ticker = region_supervisor.ticker(); + let region_failure_detector_controller = + region_supervisor.failure_detector_controller(); ( Some(RegionFailureHandler::new(region_supervisor)), Some(region_supervisor_ticker), + region_failure_detector_controller as _, ) } else { - (None, None) + (None, None, Arc::new(NoopRegionFailureDetectorControl) as _) }; + let ddl_manager = Arc::new( + DdlManager::try_new( + DdlContext { + node_manager, + cache_invalidator, + memory_region_keeper: memory_region_keeper.clone(), + table_metadata_manager: table_metadata_manager.clone(), + table_metadata_allocator: table_metadata_allocator.clone(), + flow_metadata_manager: flow_metadata_manager.clone(), + flow_metadata_allocator: flow_metadata_allocator.clone(), + peer_lookup_service, + region_failure_detector_controller, + }, + procedure_manager.clone(), + true, + ) + .context(error::InitDdlManagerSnafu)?, + ); + let handler_group = match handler_group { Some(handler_group) => handler_group, None => { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 2812294781a6..eda1ae7cdf5b 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -114,7 +114,7 @@ pub mod test_data { use common_catalog::consts::MITO2_ENGINE; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; - use common_meta::ddl::DdlContext; + use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -226,6 +226,7 @@ pub mod test_data { flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } } diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 78d83d679f66..ad5b2085f970 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; +use common_meta::ddl::{RegionFailureDetectorController, RegionFailureDetectorControllerRef}; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::PeerLookupServiceRef; @@ -188,17 +189,6 @@ pub struct RegionSupervisor { peer_lookup: PeerLookupServiceRef, } -pub type RegionFailureDetectorControllerRef = Arc; - -#[async_trait::async_trait] -pub trait RegionFailureDetectorController { - /// Registers failure detectors for the given identifiers. - async fn register_failure_detectors(&self, ident: Vec); - - /// Deregisters failure detectors for the given identifiers. - async fn deregister_failure_detectors(&self, ident: Vec); -} - /// Controller for managing failure detectors for regions. #[derive(Debug, Clone)] pub struct RegionFailureDetectorControl { @@ -230,17 +220,6 @@ impl RegionFailureDetectorController for RegionFailureDetectorControl { } } -/// A noop implementation of [`RegionFailureDetectorController`]. -#[derive(Debug, Clone)] -pub struct NoopRegionFailureDetectorControl; - -#[async_trait::async_trait] -impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { - async fn register_failure_detectors(&self, _ident: Vec) {} - - async fn deregister_failure_detectors(&self, _ident: Vec) {} -} - /// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`]. pub(crate) struct HeartbeatAcceptor { sender: Sender, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2e761d52a280..458f51b0948a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,7 @@ use common_config::KvBackendConfig; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; -use common_meta::ddl::DdlContext; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; @@ -199,6 +199,7 @@ impl GreptimeDbStandaloneBuilder { flow_metadata_manager, flow_metadata_allocator, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), register_procedure_loaders, From 833f86b0b4b83f2659fdd8d80c582e860be57a65 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 10:05:52 +0000 Subject: [PATCH 04/16] feat: add `region_failure_detector_controller` to `Context` in region migration --- src/meta-srv/src/handler/failure_handler.rs | 14 +-- src/meta-srv/src/metasrv/builder.rs | 78 +++++++++------ .../src/procedure/region_migration.rs | 8 +- .../procedure/region_migration/test_util.rs | 2 + src/meta-srv/src/region/supervisor.rs | 98 ++++++++----------- 5 files changed, 107 insertions(+), 93 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index efef360a204a..4024a77af8ff 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -26,8 +26,10 @@ pub struct RegionFailureHandler { } impl RegionFailureHandler { - pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self { - let heartbeat_acceptor = region_supervisor.heartbeat_acceptor(); + pub(crate) fn new( + mut region_supervisor: RegionSupervisor, + heartbeat_acceptor: HeartbeatAcceptor, + ) -> Self { info!("Starting region supervisor"); common_runtime::spawn_bg(async move { region_supervisor.run().await }); Self { heartbeat_acceptor } @@ -71,13 +73,13 @@ mod tests { use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::builder::MetasrvBuilder; use crate::region::supervisor::tests::new_test_supervisor; - use crate::region::supervisor::Event; + use crate::region::supervisor::{Event, HeartbeatAcceptor}; #[tokio::test] async fn test_handle_heartbeat() { - let supervisor = new_test_supervisor(); - let sender = supervisor.sender(); - let handler = RegionFailureHandler::new(supervisor); + let (supervisor, sender) = new_test_supervisor(); + let heartbeat_acceptor = HeartbeatAcceptor::new(sender.clone()); + let handler = RegionFailureHandler::new(supervisor, heartbeat_acceptor); let req = &HeartbeatRequest::default(); let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 74608e0e2b96..b328e7c53e9b 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -22,7 +22,9 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; +use common_meta::ddl::{ + DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, +}; use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; use common_meta::key::flow::FlowMetadataManager; @@ -68,7 +70,10 @@ use crate::metasrv::{ use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::pubsub::PublisherRef; -use crate::region::supervisor::{RegionSupervisor, DEFAULT_TICK_INTERVAL}; +use crate::region::supervisor::{ + HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker, + DEFAULT_TICK_INTERVAL, +}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::service::mailbox::MailboxRef; @@ -282,46 +287,59 @@ impl MetasrvBuilder { }, )); let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); + if !is_remote_wal && options.enable_region_failover { + return error::UnexpectedSnafu { + violated: "Region failover is not supported in the local WAL implementation!", + } + .fail(); + } + + let (tx, rx) = tokio::sync::mpsc::channel(1024); + let (region_failure_detector_controller, region_supervisor_ticker): ( + RegionFailureDetectorControllerRef, + Option>, + ) = if options.enable_region_failover && is_remote_wal { + ( + Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _, + Some(Arc::new(RegionSupervisorTicker::new( + DEFAULT_TICK_INTERVAL, + tx.clone(), + ))), + ) + } else { + (Arc::new(NoopRegionFailureDetectorControl) as _, None as _) + }; + let region_migration_manager = Arc::new(RegionMigrationManager::new( procedure_manager.clone(), DefaultContextFactory::new( table_metadata_manager.clone(), memory_region_keeper.clone(), + region_failure_detector_controller.clone(), mailbox.clone(), options.server_addr.clone(), ), )); region_migration_manager.try_start()?; - if !is_remote_wal && options.enable_region_failover { - return error::UnexpectedSnafu { - violated: "Region failover is not supported in the local WAL implementation!", - } - .fail(); - } + let region_failover_handler = if options.enable_region_failover && is_remote_wal { + let region_supervisor = RegionSupervisor::new( + rx, + options.failure_detector, + selector_ctx.clone(), + selector.clone(), + region_migration_manager.clone(), + leader_cached_kv_backend.clone() as _, + peer_lookup_service.clone(), + ); - let (region_failover_handler, region_supervisor_ticker, region_failure_detector_controller) = - if options.enable_region_failover && is_remote_wal { - let region_supervisor = RegionSupervisor::new( - options.failure_detector, - DEFAULT_TICK_INTERVAL, - selector_ctx.clone(), - selector.clone(), - region_migration_manager.clone(), - leader_cached_kv_backend.clone() as _, - peer_lookup_service.clone(), - ); - let region_supervisor_ticker = region_supervisor.ticker(); - let region_failure_detector_controller = - region_supervisor.failure_detector_controller(); - ( - Some(RegionFailureHandler::new(region_supervisor)), - Some(region_supervisor_ticker), - region_failure_detector_controller as _, - ) - } else { - (None, None, Arc::new(NoopRegionFailureDetectorControl) as _) - }; + Some(RegionFailureHandler::new( + region_supervisor, + HeartbeatAcceptor::new(tx), + )) + } else { + None + }; let ddl_manager = Arc::new( DdlManager::try_new( diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index da9a5641a5ac..8a5af802dfca 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -29,6 +29,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; +use common_meta::ddl::RegionFailureDetectorControllerRef; use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; @@ -154,15 +155,17 @@ pub struct DefaultContextFactory { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, } impl DefaultContextFactory { - /// Returns an [ContextFactoryImpl]. + /// Returns an [`DefaultContextFactory`]. pub fn new( table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, ) -> Self { @@ -170,6 +173,7 @@ impl DefaultContextFactory { volatile_ctx: VolatileContext::default(), table_metadata_manager, opening_region_keeper, + region_failure_detector_controller, mailbox, server_addr, } @@ -183,6 +187,7 @@ impl ContextFactory for DefaultContextFactory { volatile_ctx: self.volatile_ctx, table_metadata_manager: self.table_metadata_manager, opening_region_keeper: self.opening_region_keeper, + region_failure_detector_controller: self.region_failure_detector_controller.clone(), mailbox: self.mailbox, server_addr: self.server_addr, } @@ -195,6 +200,7 @@ pub struct Context { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 2f3638ef9323..6cc8ea12a507 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -20,6 +20,7 @@ use std::time::Duration; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; +use common_meta::ddl::NoopRegionFailureDetectorControl; use common_meta::instruction::{ DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, }; @@ -150,6 +151,7 @@ impl TestingEnv { volatile_ctx: Default::default(), mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index ad5b2085f970..5d0a2d3feab7 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; -use common_meta::ddl::{RegionFailureDetectorController, RegionFailureDetectorControllerRef}; +use common_meta::ddl::RegionFailureDetectorController; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::PeerLookupServiceRef; @@ -120,6 +120,14 @@ pub struct RegionSupervisorTicker { } impl RegionSupervisorTicker { + pub(crate) fn new(tick_interval: Duration, sender: Sender) -> Self { + Self { + tick_handle: Mutex::new(None), + tick_interval, + sender, + } + } + /// Starts the ticker. pub fn start(&self) { let mut handle = self.tick_handle.lock().unwrap(); @@ -171,12 +179,8 @@ pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); pub struct RegionSupervisor { /// Used to detect the failure of regions. failure_detector: RegionFailureDetector, - /// The interval of tick - tick_interval: Duration, /// Receives [Event]s. receiver: Receiver, - /// [Event] Sender. - sender: Sender, /// The context of [`SelectorRef`] selector_context: SelectorContext, /// Candidate node selector. @@ -195,6 +199,12 @@ pub struct RegionFailureDetectorControl { sender: Sender, } +impl RegionFailureDetectorControl { + pub(crate) fn new(sender: Sender) -> Self { + Self { sender } + } +} + #[async_trait::async_trait] impl RegionFailureDetectorController for RegionFailureDetectorControl { async fn register_failure_detectors(&self, ident: Vec) { @@ -226,6 +236,10 @@ pub(crate) struct HeartbeatAcceptor { } impl HeartbeatAcceptor { + pub(crate) fn new(sender: Sender) -> Self { + Self { sender } + } + /// Accepts heartbeats from datanodes. pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) { if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { @@ -234,30 +248,19 @@ impl HeartbeatAcceptor { } } -#[cfg(test)] -impl RegionSupervisor { - /// Returns the [Event] sender. - pub(crate) fn sender(&self) -> Sender { - self.sender.clone() - } -} - impl RegionSupervisor { pub(crate) fn new( + event_receiver: Receiver, options: PhiAccrualFailureDetectorOptions, - tick_interval: Duration, selector_context: SelectorContext, selector: SelectorRef, region_migration_manager: RegionMigrationManagerRef, kv_backend: KvBackendRef, peer_lookup: PeerLookupServiceRef, ) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(1024); Self { failure_detector: RegionFailureDetector::new(options), - tick_interval, - receiver: rx, - sender: tx, + receiver: event_receiver, selector_context, selector, region_migration_manager, @@ -266,29 +269,6 @@ impl RegionSupervisor { } } - /// Returns the [`HeartbeatAcceptor`]. - pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor { - HeartbeatAcceptor { - sender: self.sender.clone(), - } - } - - /// Returns the [`RegionFailureDetectorControllerRef`]. - pub(crate) fn failure_detector_controller(&self) -> RegionFailureDetectorControllerRef { - Arc::new(RegionFailureDetectorControl { - sender: self.sender.clone(), - }) - } - - /// Returns the [`RegionSupervisorTicker`]. - pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef { - Arc::new(RegionSupervisorTicker { - tick_interval: self.tick_interval, - sender: self.sender.clone(), - tick_handle: Mutex::new(None), - }) - } - /// Runs the main loop. pub(crate) async fn run(&mut self) { while let Some(event) = self.receiver.recv().await { @@ -463,22 +443,25 @@ pub(crate) mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + use common_meta::ddl::RegionFailureDetectorController; use common_meta::peer::Peer; use common_meta::test_util::NoopPeerLookupService; use common_time::util::current_time_millis; use rand::Rng; use store_api::storage::RegionId; + use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tokio::time::sleep; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::test_util::TestingEnv; use crate::region::supervisor::{ - DatanodeHeartbeat, Event, RegionSupervisor, RegionSupervisorTicker, + DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor, + RegionSupervisorTicker, }; use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector}; - pub(crate) fn new_test_supervisor() -> RegionSupervisor { + pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender) { let env = TestingEnv::new(); let selector_context = new_test_selector_context(); let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)])); @@ -490,21 +473,25 @@ pub(crate) mod tests { let kv_backend = env.kv_backend(); let peer_lookup = Arc::new(NoopPeerLookupService); - RegionSupervisor::new( - Default::default(), - Duration::from_secs(1), - selector_context, - selector, - region_migration_manager, - kv_backend, - peer_lookup, + let (tx, rx) = tokio::sync::mpsc::channel(1024); + + ( + RegionSupervisor::new( + rx, + Default::default(), + selector_context, + selector, + region_migration_manager, + kv_backend, + peer_lookup, + ), + tx, ) } #[tokio::test] async fn test_heartbeat() { - let mut supervisor = new_test_supervisor(); - let sender = supervisor.sender(); + let (mut supervisor, sender) = new_test_supervisor(); tokio::spawn(async move { supervisor.run().await }); sender @@ -599,9 +586,8 @@ pub(crate) mod tests { #[tokio::test] async fn test_region_failure_detector_controller() { - let mut supervisor = new_test_supervisor(); - let sender = supervisor.sender(); - let controller = supervisor.failure_detector_controller(); + let (mut supervisor, sender) = new_test_supervisor(); + let controller = RegionFailureDetectorControl::new(sender.clone()); tokio::spawn(async move { supervisor.run().await }); let ident = (0, 1, RegionId::new(1, 1)); controller.register_failure_detectors(vec![ident]).await; From ba7ed651b0847e2fe7e5f5d5d08bac67b2cd787c Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 10:17:36 +0000 Subject: [PATCH 05/16] feat: register region failure detectors during rollback region migration procedure --- .../rollback_downgraded_region.rs | 22 ++++++++++++++++++- src/meta-srv/src/region/supervisor.rs | 10 +++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index c9253be2d597..7d62b74e4a07 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -50,6 +50,12 @@ impl UpdateMetadata { reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), }); } + let cluster_id = ctx.persistent_ctx.cluster_id; + let datanode_id = ctx.persistent_ctx.from_peer.id; + let region_id = ctx.persistent_ctx.region_id; + ctx.region_failure_detector_controller + .register_failure_detectors(vec![(cluster_id, datanode_id, region_id)]) + .await; ctx.remove_table_route_value(); @@ -60,6 +66,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -71,6 +78,7 @@ mod tests { use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; + use crate::region::supervisor::RegionFailureDetectorControl; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) @@ -98,6 +106,8 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + let (tx, mut rx) = tokio::sync::mpsc::channel(8); + ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx)); let table_id = ctx.region_id().table_id(); let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); @@ -161,8 +171,18 @@ mod tests { assert!(ctx.volatile_ctx.table_route.is_none()); assert!(err.is_retryable()); assert!(format!("{err:?}").contains("Failed to update the table route")); - + assert_eq!(rx.len(), 0); state.rollback_downgraded_region(&mut ctx).await.unwrap(); + let event = rx.try_recv().unwrap(); + let idents = event.into_region_failure_detectors(); + assert_eq!( + idents, + vec![( + ctx.persistent_ctx.cluster_id, + from_peer.id, + ctx.persistent_ctx.region_id + )] + ); let table_route = table_metadata_manager .table_route_manager() diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 5d0a2d3feab7..4bc562224ba6 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -84,6 +84,16 @@ pub(crate) enum Event { Dump(tokio::sync::oneshot::Sender), } +#[cfg(test)] +impl Event { + pub(crate) fn into_region_failure_detectors(self) -> Vec { + match self { + Self::RegisterFailureDetectors(ident) => ident, + _ => unreachable!(), + } + } +} + impl Debug for Event { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { From 604df22937698633f2593a8e2370417cba92be96 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 10:50:00 +0000 Subject: [PATCH 06/16] feat: deregister region failure detectors during drop table procedure --- src/common/meta/src/ddl/drop_database.rs | 4 ++ .../meta/src/ddl/drop_database/cursor.rs | 3 ++ .../meta/src/ddl/drop_database/executor.rs | 11 +++++- .../meta/src/ddl/drop_database/metadata.rs | 2 + .../meta/src/ddl/drop_database/start.rs | 3 ++ src/common/meta/src/ddl/drop_table.rs | 1 + .../meta/src/ddl/drop_table/executor.rs | 39 ++++++++++++++++++- 7 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index ce62b7d0c316..578e7744f1a6 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -35,6 +35,7 @@ use crate::ddl::DdlContext; use crate::error::Result; use crate::key::table_name::TableNameValue; use crate::lock_key::{CatalogLock, SchemaLock}; +use crate::ClusterId; pub struct DropDatabaseProcedure { /// The context of procedure runtime. @@ -53,6 +54,7 @@ pub(crate) enum DropTableTarget { /// Context of [DropDatabaseProcedure] execution. pub(crate) struct DropDatabaseContext { + cluster_id: ClusterId, catalog: String, schema: String, drop_if_exists: bool, @@ -85,6 +87,7 @@ impl DropDatabaseProcedure { Self { runtime_context: context, context: DropDatabaseContext { + cluster_id: 0, catalog, schema, drop_if_exists, @@ -105,6 +108,7 @@ impl DropDatabaseProcedure { Ok(Self { runtime_context, context: DropDatabaseContext { + cluster_id: 0, catalog, schema, drop_if_exists, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index c3dd8a582684..3b25b4202539 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -221,6 +221,7 @@ mod tests { // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -256,6 +257,7 @@ mod tests { // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -284,6 +286,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseCursor::new(DropTableTarget::Physical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 433e7dc96b20..a8d8ed9d1ffc 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -96,10 +96,11 @@ impl State for DropDatabaseExecutor { async fn next( &mut self, ddl_ctx: &DdlContext, - _ctx: &mut DropDatabaseContext, + ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)> { self.register_dropping_regions(ddl_ctx)?; - let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true); + let executor = + DropTableExecutor::new(ctx.cluster_id, self.table_name.clone(), self.table_id, true); // Deletes metadata for table permanently. let table_route_value = TableRouteValue::new( self.table_id, @@ -186,6 +187,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -198,6 +200,7 @@ mod tests { } // Execute again let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -238,6 +241,7 @@ mod tests { DropTableTarget::Logical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -250,6 +254,7 @@ mod tests { } // Execute again let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -339,6 +344,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -368,6 +374,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 005806146013..8d338df07c5f 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -118,6 +118,7 @@ mod tests { .unwrap(); let mut state = DropDatabaseRemoveMetadata; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, @@ -144,6 +145,7 @@ mod tests { // Schema not exists let mut state = DropDatabaseRemoveMetadata; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index 792eeac8dda1..deeb8ed215ed 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -89,6 +89,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut step = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: false, @@ -104,6 +105,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, @@ -126,6 +128,7 @@ mod tests { .unwrap(); let mut state = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index f3840a7d6774..e2a7adf3cc0e 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -279,6 +279,7 @@ impl DropTableData { fn build_executor(&self) -> DropTableExecutor { DropTableExecutor::new( + self.cluster_id, self.task.table_name(), self.task.table_id, self.task.drop_if_exists, diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 0783ce86ccaf..2e60f9aaf593 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -33,6 +33,7 @@ use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::ClusterId; /// [Control] indicated to the caller whether to go to the next step. #[derive(Debug)] @@ -50,8 +51,14 @@ impl Control { impl DropTableExecutor { /// Returns the [DropTableExecutor]. - pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self { + pub fn new( + cluster_id: ClusterId, + table: TableName, + table_id: TableId, + drop_if_exists: bool, + ) -> Self { Self { + cluster_id, table, table_id, drop_if_exists, @@ -64,6 +71,7 @@ impl DropTableExecutor { /// - Invalidates the cache on the Frontend nodes. /// - Drops the regions on the Datanode nodes. pub struct DropTableExecutor { + cluster_id: ClusterId, table: TableName, table_id: TableId, drop_if_exists: bool, @@ -128,9 +136,33 @@ impl DropTableExecutor { ctx: &DdlContext, table_route_value: &TableRouteValue, ) -> Result<()> { + let ident = if table_route_value.is_physical() { + // Safety: checked. + let regions = table_route_value.region_routes().unwrap(); + let ident = regions + .iter() + .flat_map(|region| { + region + .leader_peer + .as_ref() + .map(|peer| (self.cluster_id, peer.id, region.region.id)) + }) + .collect::>(); + Some(ident) + } else { + None + }; + ctx.table_metadata_manager .destroy_table_metadata(self.table_id, &self.table, table_route_value) - .await + .await?; + // Notifies the region supervisor to remove failure detectors. + if let Some(ident) = ident { + ctx.region_failure_detector_controller + .deregister_failure_detectors(ident) + .await; + } + Ok(()) } /// Restores the table metadata. @@ -274,6 +306,7 @@ mod tests { let node_manager = Arc::new(MockDatanodeManager::new(())); let ctx = new_ddl_context(node_manager); let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, true, @@ -283,6 +316,7 @@ mod tests { // Drops a non-exists table let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, false, @@ -292,6 +326,7 @@ mod tests { // Drops a exists table let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, false, From d0f2134c4178654e44d6bc05524af4f9ee6530e9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 27 Jun 2024 10:55:48 +0000 Subject: [PATCH 07/16] feat: register region failure detectors during create table procedure --- src/common/meta/src/ddl/create_table.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index d0b889609a84..1c27fedd253e 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -265,16 +265,35 @@ impl CreateTableProcedure { /// - Failed to create table metadata. async fn on_create_metadata(&mut self) -> Result { let table_id = self.table_id(); + let cluster_id = self.creator.data.cluster_id; let manager = &self.context.table_metadata_manager; let raw_table_info = self.table_info().clone(); // Safety: the region_wal_options must be allocated. let region_wal_options = self.region_wal_options()?.clone(); // Safety: the table_route must be allocated. - let table_route = TableRouteValue::Physical(self.table_route()?.clone()); + let physical_table_route = self.table_route()?.clone(); + + let ident = physical_table_route + .region_routes + .iter() + .map(|region| { + ( + cluster_id, + region.leader_peer.as_ref().unwrap().id, + region.region.id, + ) + }) + .collect::>(); + let table_route = TableRouteValue::Physical(physical_table_route); manager .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; + // Notifies region supervisor to detector failures of new created regions. + self.context + .region_failure_detector_controller + .register_failure_detectors(ident) + .await; info!("Created table metadata for table {table_id}"); self.creator.opening_regions.clear(); From e4f318e162125387dd36122100265a7a144f295a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 28 Jun 2024 03:05:08 +0000 Subject: [PATCH 08/16] fix: update meta config --- config/config.md | 2 +- config/metasrv.example.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.md b/config/config.md index 7ff0364ed535..a594e7368074 100644 --- a/config/config.md +++ b/config/config.md @@ -259,7 +259,7 @@ | `failure_detector` | -- | -- | -- | | `failure_detector.threshold` | Float | `8.0` | -- | | `failure_detector.min_std_deviation` | String | `100ms` | -- | -| `failure_detector.acceptable_heartbeat_pause` | String | `3000ms` | -- | +| `failure_detector.acceptable_heartbeat_pause` | String | `10000ms` | -- | | `failure_detector.first_heartbeat_estimate` | String | `1000ms` | -- | | `datanode` | -- | -- | Datanode options. | | `datanode.client` | -- | -- | Datanode client options. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 71dc48077c9c..1128d274cef2 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -54,7 +54,7 @@ max_metadata_value_size = "1500KiB" [failure_detector] threshold = 8.0 min_std_deviation = "100ms" -acceptable_heartbeat_pause = "3000ms" +acceptable_heartbeat_pause = "10000ms" first_heartbeat_estimate = "1000ms" ## Datanode options. From 6e3bcb2e96e4a9a9fb64079ce9f715a41da1ad09 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 28 Jun 2024 08:02:51 +0000 Subject: [PATCH 09/16] chore: apply suggestions from CR --- src/common/meta/src/ddl.rs | 7 +++++-- src/common/meta/src/ddl/create_table.rs | 5 ++++- src/common/meta/src/ddl/drop_table/executor.rs | 5 ++++- .../update_metadata/rollback_downgraded_region.rs | 4 ++++ 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index c2051964ad33..3c3835134fc0 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -104,13 +104,16 @@ pub struct TableMetadata { pub type RegionFailureDetectorControllerRef = Arc; +/// Used for actively registering Region failure detectors. +/// +/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode. #[async_trait::async_trait] pub trait RegionFailureDetectorController: Send + Sync { /// Registers failure detectors for the given identifiers. - async fn register_failure_detectors(&self, ident: Vec<(ClusterId, DatanodeId, RegionId)>); + async fn register_failure_detectors(&self, idents: Vec<(ClusterId, DatanodeId, RegionId)>); /// Deregisters failure detectors for the given identifiers. - async fn deregister_failure_detectors(&self, ident: Vec<(ClusterId, DatanodeId, RegionId)>); + async fn deregister_failure_detectors(&self, idents: Vec<(ClusterId, DatanodeId, RegionId)>); } /// A noop implementation of [`RegionFailureDetectorController`]. diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 1c27fedd253e..fd0e3f34fae3 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -289,7 +289,10 @@ impl CreateTableProcedure { manager .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; - // Notifies region supervisor to detector failures of new created regions. + // Notifies RegionSupervisor to register failure detector of new created regions. + // + // The datanode may crash without sending a heartbeat that contains information about newly created regions, + // which may prevent the RegionSupervisor from detecting failures in these newly created regions. self.context .region_failure_detector_controller .register_failure_detectors(ident) diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 2e60f9aaf593..22153e86b962 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -156,7 +156,10 @@ impl DropTableExecutor { ctx.table_metadata_manager .destroy_table_metadata(self.table_id, &self.table, table_route_value) .await?; - // Notifies the region supervisor to remove failure detectors. + // Notifies the RegionSupervisor to remove failure detectors. + // + // Once the regions were dropped, subsequent heartbeats no longer include these regions. + // Therefore, we should remove the failure detectors for these dropped regions. if let Some(ident) = ident { ctx.region_failure_detector_controller .deregister_failure_detectors(ident) diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 7d62b74e4a07..99b91417f3c1 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -53,6 +53,10 @@ impl UpdateMetadata { let cluster_id = ctx.persistent_ctx.cluster_id; let datanode_id = ctx.persistent_ctx.from_peer.id; let region_id = ctx.persistent_ctx.region_id; + // Notifies RegionSupervisor to register failure detectors of failed region. + // + // The original failure detector was removed once the procedure was triggered. + // Now, we need to register the failure detector for the failed region. ctx.region_failure_detector_controller .register_failure_detectors(vec![(cluster_id, datanode_id, region_id)]) .await; From bf1f5cad9ca173f5eef3f1eea7e34626e5e9ca21 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 28 Jun 2024 10:47:07 +0000 Subject: [PATCH 10/16] chore: avoid cloning --- src/meta-srv/src/procedure/region_migration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 8a5af802dfca..9b703afe03a3 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -187,7 +187,7 @@ impl ContextFactory for DefaultContextFactory { volatile_ctx: self.volatile_ctx, table_metadata_manager: self.table_metadata_manager, opening_region_keeper: self.opening_region_keeper, - region_failure_detector_controller: self.region_failure_detector_controller.clone(), + region_failure_detector_controller: self.region_failure_detector_controller, mailbox: self.mailbox, server_addr: self.server_addr, } From b240e938ab736a143af807d1a275d1db4c7fd628 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 28 Jun 2024 11:22:02 +0000 Subject: [PATCH 11/16] chore: rename --- src/common/meta/src/ddl/create_table.rs | 4 ++-- src/common/meta/src/ddl/drop_table/executor.rs | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index fd0e3f34fae3..66d9784bc178 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -274,7 +274,7 @@ impl CreateTableProcedure { // Safety: the table_route must be allocated. let physical_table_route = self.table_route()?.clone(); - let ident = physical_table_route + let idents = physical_table_route .region_routes .iter() .map(|region| { @@ -295,7 +295,7 @@ impl CreateTableProcedure { // which may prevent the RegionSupervisor from detecting failures in these newly created regions. self.context .region_failure_detector_controller - .register_failure_detectors(ident) + .register_failure_detectors(idents) .await; info!("Created table metadata for table {table_id}"); diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 22153e86b962..2aa9abf30650 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -136,10 +136,10 @@ impl DropTableExecutor { ctx: &DdlContext, table_route_value: &TableRouteValue, ) -> Result<()> { - let ident = if table_route_value.is_physical() { + let idents = if table_route_value.is_physical() { // Safety: checked. let regions = table_route_value.region_routes().unwrap(); - let ident = regions + let idents = regions .iter() .flat_map(|region| { region @@ -148,7 +148,7 @@ impl DropTableExecutor { .map(|peer| (self.cluster_id, peer.id, region.region.id)) }) .collect::>(); - Some(ident) + Some(idents) } else { None }; @@ -160,7 +160,7 @@ impl DropTableExecutor { // // Once the regions were dropped, subsequent heartbeats no longer include these regions. // Therefore, we should remove the failure detectors for these dropped regions. - if let Some(ident) = ident { + if let Some(ident) = idents { ctx.region_failure_detector_controller .deregister_failure_detectors(ident) .await; From 94e24fe870e47ebeb2717aeecded1252b45842a8 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 28 Jun 2024 14:39:07 +0000 Subject: [PATCH 12/16] chore: reduce the size of the test --- tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index 65b600ea4184..57332d5a0621 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -84,9 +84,9 @@ impl Arbitrary<'_> for FuzzInput { let seed = u.int_in_range(u64::MIN..=u64::MAX)?; let mut rng = ChaChaRng::seed_from_u64(seed); let columns = rng.gen_range(2..64); - let rows = rng.gen_range(2..4096); + let rows = rng.gen_range(2..2048); let tables = rng.gen_range(1..64); - let inserts = rng.gen_range(2..16); + let inserts = rng.gen_range(2..8); Ok(FuzzInput { columns, rows, @@ -264,7 +264,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { let mut rng = ChaCha20Rng::seed_from_u64(input.seed); info!("Generates {} tables", input.tables); let exprs = generate_create_exprs(input.tables, input.columns, &mut rng)?; - let parallelism = 8; + let parallelism = 4; let table_ctxs = exprs .iter() .map(|expr| Arc::new(TableContext::from(expr))) From 62d10761975df93cdfb2aa86cddfd57721dd9907 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jul 2024 03:04:18 +0000 Subject: [PATCH 13/16] chore: apply suggestions from CR --- src/common/meta/src/ddl.rs | 32 ++++++++++++++--- src/common/meta/src/ddl/create_table.rs | 28 +++++---------- .../meta/src/ddl/drop_table/executor.rs | 35 ++++++------------- src/common/meta/src/ddl/utils.rs | 19 ++++++++++ .../src/procedure/region_migration.rs | 14 ++++++++ .../rollback_downgraded_region.rs | 11 +----- src/meta-srv/src/region/failure_detector.rs | 21 ++++++----- src/meta-srv/src/region/supervisor.rs | 33 ++++++++--------- 8 files changed, 106 insertions(+), 87 deletions(-) diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3c3835134fc0..60cf7b2a691d 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -104,16 +104,18 @@ pub struct TableMetadata { pub type RegionFailureDetectorControllerRef = Arc; +pub type DetectingRegion = (ClusterId, DatanodeId, RegionId); + /// Used for actively registering Region failure detectors. /// /// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode. #[async_trait::async_trait] pub trait RegionFailureDetectorController: Send + Sync { /// Registers failure detectors for the given identifiers. - async fn register_failure_detectors(&self, idents: Vec<(ClusterId, DatanodeId, RegionId)>); + async fn register_failure_detectors(&self, idents: Vec); /// Deregisters failure detectors for the given identifiers. - async fn deregister_failure_detectors(&self, idents: Vec<(ClusterId, DatanodeId, RegionId)>); + async fn deregister_failure_detectors(&self, idents: Vec); } /// A noop implementation of [`RegionFailureDetectorController`]. @@ -122,9 +124,9 @@ pub struct NoopRegionFailureDetectorControl; #[async_trait::async_trait] impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { - async fn register_failure_detectors(&self, _ident: Vec<(ClusterId, DatanodeId, RegionId)>) {} + async fn register_failure_detectors(&self, _ident: Vec) {} - async fn deregister_failure_detectors(&self, _ident: Vec<(ClusterId, DatanodeId, RegionId)>) {} + async fn deregister_failure_detectors(&self, _ident: Vec) {} } /// The context of ddl. @@ -149,3 +151,25 @@ pub struct DdlContext { /// controller of region failure detector. pub region_failure_detector_controller: RegionFailureDetectorControllerRef, } + +impl DdlContext { + /// Notifies the RegionSupervisor to register failure detector of new created regions. + /// + /// The datanode may crash without sending a heartbeat that contains information about newly created regions, + /// which may prevent the RegionSupervisor from detecting failures in these newly created regions. + pub async fn register_failure_detectors(&self, detecting_regions: Vec) { + self.region_failure_detector_controller + .register_failure_detectors(detecting_regions) + .await; + } + + /// Notifies the RegionSupervisor to remove failure detectors. + /// + /// Once the regions were dropped, subsequent heartbeats no longer include these regions. + /// Therefore, we should remove the failure detectors for these dropped regions. + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + self.region_failure_detector_controller + .deregister_failure_detectors(detecting_regions) + .await; + } +} diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 66d9784bc178..1d171f595e44 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -33,7 +33,10 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; +use crate::ddl::utils::{ + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error, + region_storage_path, +}; use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; @@ -273,29 +276,16 @@ impl CreateTableProcedure { let region_wal_options = self.region_wal_options()?.clone(); // Safety: the table_route must be allocated. let physical_table_route = self.table_route()?.clone(); - - let idents = physical_table_route - .region_routes - .iter() - .map(|region| { - ( - cluster_id, - region.leader_peer.as_ref().unwrap().id, - region.region.id, - ) - }) - .collect::>(); + let detecting_regions = convert_region_routes_to_detecting_regions( + cluster_id, + &physical_table_route.region_routes, + ); let table_route = TableRouteValue::Physical(physical_table_route); manager .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; - // Notifies RegionSupervisor to register failure detector of new created regions. - // - // The datanode may crash without sending a heartbeat that contains information about newly created regions, - // which may prevent the RegionSupervisor from detecting failures in these newly created regions. self.context - .region_failure_detector_controller - .register_failure_detectors(idents) + .register_failure_detectors(detecting_regions) .await; info!("Created table metadata for table {table_id}"); diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 2aa9abf30650..6088d87a3f60 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -26,7 +26,7 @@ use table::metadata::TableId; use table::table_name::TableName; use crate::cache_invalidator::Context; -use crate::ddl::utils::add_peer_context_if_needed; +use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::CacheIdent; @@ -136,34 +136,19 @@ impl DropTableExecutor { ctx: &DdlContext, table_route_value: &TableRouteValue, ) -> Result<()> { - let idents = if table_route_value.is_physical() { + ctx.table_metadata_manager + .destroy_table_metadata(self.table_id, &self.table, table_route_value) + .await?; + + let detecting_regions = if table_route_value.is_physical() { // Safety: checked. let regions = table_route_value.region_routes().unwrap(); - let idents = regions - .iter() - .flat_map(|region| { - region - .leader_peer - .as_ref() - .map(|peer| (self.cluster_id, peer.id, region.region.id)) - }) - .collect::>(); - Some(idents) + convert_region_routes_to_detecting_regions(self.cluster_id, regions) } else { - None + vec![] }; - - ctx.table_metadata_manager - .destroy_table_metadata(self.table_id, &self.table, table_route_value) - .await?; - // Notifies the RegionSupervisor to remove failure detectors. - // - // Once the regions were dropped, subsequent heartbeats no longer include these regions. - // Therefore, we should remove the failure detectors for these dropped regions. - if let Some(ident) = idents { - ctx.region_failure_detector_controller - .deregister_failure_detectors(ident) - .await; + if !detecting_regions.is_empty() { + ctx.deregister_failure_detectors(detecting_regions).await; } Ok(()) } diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index de6171d4efcf..36cb97338655 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -19,11 +19,14 @@ use snafu::{ensure, location, Location, OptionExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use table::metadata::TableId; +use crate::ddl::DetectingRegion; use crate::error::{Error, Result, TableNotFoundSnafu, UnsupportedSnafu}; use crate::key::table_name::TableNameKey; use crate::key::TableMetadataManagerRef; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::RegionRoute; +use crate::ClusterId; /// Adds [Peer] context if the error is unretryable. pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error { @@ -126,3 +129,19 @@ pub async fn get_physical_table_id( .get_physical_table_id(logical_table_id) .await } + +/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`]. +pub fn convert_region_routes_to_detecting_regions( + cluster_id: ClusterId, + region_routes: &[RegionRoute], +) -> Vec { + region_routes + .iter() + .flat_map(|route| { + route + .leader_peer + .as_ref() + .map(|peer| (cluster_id, peer.id, route.region.id)) + }) + .collect::>() +} diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 9b703afe03a3..3edea35c0707 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -242,6 +242,20 @@ impl Context { Ok(table_route_value.as_ref().unwrap()) } + /// Notifies the RegionSupervisor to register failure detectors of failed region. + /// + /// The original failure detector was removed once the procedure was triggered. + /// Now, we need to register the failure detector for the failed region. + pub async fn register_failure_detectors(&self) { + let cluster_id = self.persistent_ctx.cluster_id; + let datanode_id = self.persistent_ctx.from_peer.id; + let region_id = self.persistent_ctx.region_id; + + self.region_failure_detector_controller + .register_failure_detectors(vec![(cluster_id, datanode_id, region_id)]) + .await; + } + /// Removes the `table_route` of [VolatileContext], returns true if any. pub fn remove_table_route_value(&mut self) -> bool { let value = self.volatile_ctx.table_route.take(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 99b91417f3c1..fe457274cdc1 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -50,17 +50,8 @@ impl UpdateMetadata { reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), }); } - let cluster_id = ctx.persistent_ctx.cluster_id; - let datanode_id = ctx.persistent_ctx.from_peer.id; - let region_id = ctx.persistent_ctx.region_id; - // Notifies RegionSupervisor to register failure detectors of failed region. - // - // The original failure detector was removed once the procedure was triggered. - // Now, we need to register the failure detector for the failed region. - ctx.region_failure_detector_controller - .register_failure_detectors(vec![(cluster_id, datanode_id, region_id)]) - .await; + ctx.register_failure_detectors().await; ctx.remove_table_route_value(); Ok(()) diff --git a/src/meta-srv/src/region/failure_detector.rs b/src/meta-srv/src/region/failure_detector.rs index 2821a17328d9..006602206a1c 100644 --- a/src/meta-srv/src/region/failure_detector.rs +++ b/src/meta-srv/src/region/failure_detector.rs @@ -14,27 +14,24 @@ use std::ops::DerefMut; -use common_meta::{ClusterId, DatanodeId}; +use common_meta::ddl::DetectingRegion; use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; -use store_api::storage::RegionId; use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions}; -pub(crate) type Ident = (ClusterId, DatanodeId, RegionId); - /// Detects the region failures. pub(crate) struct RegionFailureDetector { options: PhiAccrualFailureDetectorOptions, - detectors: DashMap, + detectors: DashMap, } pub(crate) struct FailureDetectorEntry<'a> { - e: RefMulti<'a, Ident, PhiAccrualFailureDetector>, + e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>, } impl FailureDetectorEntry<'_> { - pub(crate) fn region_ident(&self) -> &Ident { + pub(crate) fn region_ident(&self) -> &DetectingRegion { self.e.key() } @@ -54,7 +51,7 @@ impl RegionFailureDetector { /// Returns [`PhiAccrualFailureDetector`] of the specific [`Ident`]. pub(crate) fn region_failure_detector( &self, - ident: Ident, + ident: DetectingRegion, ) -> impl DerefMut + '_ { self.detectors .entry(ident) @@ -66,7 +63,7 @@ impl RegionFailureDetector { /// detector is created and initialized with the provided timestamp. pub(crate) fn maybe_init_region_failure_detector( &self, - ident: Ident, + ident: DetectingRegion, ts_millis: i64, ) -> impl DerefMut + '_ { self.detectors.entry(ident).or_insert_with(|| { @@ -84,7 +81,7 @@ impl RegionFailureDetector { } /// Removes the specific [PhiAccrualFailureDetector] if exists. - pub(crate) fn remove(&self, ident: &Ident) { + pub(crate) fn remove(&self, ident: &DetectingRegion) { self.detectors.remove(ident); } @@ -95,7 +92,7 @@ impl RegionFailureDetector { /// Returns true if the specific `ident` exists. #[cfg(test)] - pub(crate) fn contains(&self, ident: &Ident) -> bool { + pub(crate) fn contains(&self, ident: &DetectingRegion) -> bool { self.detectors.contains_key(ident) } @@ -125,6 +122,8 @@ impl RegionFailureDetector { #[cfg(test)] mod tests { + use store_api::storage::RegionId; + use super::*; #[test] diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 4bc562224ba6..3204e210cba0 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; -use common_meta::ddl::RegionFailureDetectorController; +use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::PeerLookupServiceRef; @@ -36,7 +36,7 @@ use crate::handler::node_stat::Stat; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::region_migration::RegionMigrationProcedureTask; -use crate::region::failure_detector::{Ident, RegionFailureDetector}; +use crate::region::failure_detector::RegionFailureDetector; use crate::selector::SelectorOptions; /// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. @@ -76,8 +76,8 @@ impl From<&Stat> for DatanodeHeartbeat { /// of the supervisor during tests. pub(crate) enum Event { Tick, - RegisterFailureDetectors(Vec), - DeregisterFailureDetectors(Vec), + RegisterFailureDetectors(Vec), + DeregisterFailureDetectors(Vec), HeartbeatArrived(DatanodeHeartbeat), Clear, #[cfg(test)] @@ -86,7 +86,7 @@ pub(crate) enum Event { #[cfg(test)] impl Event { - pub(crate) fn into_region_failure_detectors(self) -> Vec { + pub(crate) fn into_region_failure_detectors(self) -> Vec { match self { Self::RegisterFailureDetectors(ident) => ident, _ => unreachable!(), @@ -217,25 +217,23 @@ impl RegionFailureDetectorControl { #[async_trait::async_trait] impl RegionFailureDetectorController for RegionFailureDetectorControl { - async fn register_failure_detectors(&self, ident: Vec) { - if self + async fn register_failure_detectors(&self, ident: Vec) { + if let Err(err) = self .sender .send(Event::RegisterFailureDetectors(ident)) .await - .is_err() { - error!("RegionSupervisor is stop receiving heartbeat"); + error!(err; "RegionSupervisor is stop receiving heartbeat"); } } - async fn deregister_failure_detectors(&self, ident: Vec) { - if self + async fn deregister_failure_detectors(&self, ident: Vec) { + if let Err(err) = self .sender .send(Event::DeregisterFailureDetectors(ident)) .await - .is_err() { - error!("RegionSupervisor is stop receiving heartbeat"); + error!(err; "RegionSupervisor is stop receiving heartbeat"); } } } @@ -252,8 +250,8 @@ impl HeartbeatAcceptor { /// Accepts heartbeats from datanodes. pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) { - if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { - error!(e; "RegionSupervisor is stop receiving heartbeat"); + if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { + error!(err; "RegionSupervisor is stop receiving heartbeat"); } } } @@ -304,7 +302,7 @@ impl RegionSupervisor { info!("RegionSupervisor is stopped!"); } - async fn register_failure_detectors(&self, idents: Vec) { + async fn register_failure_detectors(&self, idents: Vec) { let ts_millis = current_time_millis(); for ident in idents { // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode. @@ -313,7 +311,7 @@ impl RegionSupervisor { } } - async fn deregister_failure_detectors(&self, idents: Vec) { + async fn deregister_failure_detectors(&self, idents: Vec) { for ident in idents { self.failure_detector.remove(&ident) } @@ -482,7 +480,6 @@ pub(crate) mod tests { )); let kv_backend = env.kv_backend(); let peer_lookup = Arc::new(NoopPeerLookupService); - let (tx, rx) = tokio::sync::mpsc::channel(1024); ( From 40b95f3283477d102400cf75bbce451f0555a1e3 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jul 2024 03:11:36 +0000 Subject: [PATCH 14/16] chore: move channel initialization into `RegionSupervisor::channel` --- src/meta-srv/src/metasrv/builder.rs | 2 +- src/meta-srv/src/region/supervisor.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b328e7c53e9b..3c039fc136b9 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -294,7 +294,7 @@ impl MetasrvBuilder { .fail(); } - let (tx, rx) = tokio::sync::mpsc::channel(1024); + let (tx, rx) = RegionSupervisor::channel(); let (region_failure_detector_controller, region_supervisor_ticker): ( RegionFailureDetectorControllerRef, Option>, diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 3204e210cba0..fccb84fdfcb7 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -257,6 +257,11 @@ impl HeartbeatAcceptor { } impl RegionSupervisor { + /// Returns a a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages. + pub(crate) fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(1024) + } + pub(crate) fn new( event_receiver: Receiver, options: PhiAccrualFailureDetectorOptions, @@ -480,7 +485,7 @@ pub(crate) mod tests { )); let kv_backend = env.kv_backend(); let peer_lookup = Arc::new(NoopPeerLookupService); - let (tx, rx) = tokio::sync::mpsc::channel(1024); + let (tx, rx) = RegionSupervisor::channel(); ( RegionSupervisor::new( From 32f22d7fb54123ae30dac7dc39eadfb75ebe9532 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jul 2024 03:16:21 +0000 Subject: [PATCH 15/16] chore: minor refactor --- src/common/meta/src/ddl.rs | 8 ++++---- src/common/meta/src/ddl/drop_table/executor.rs | 4 +--- src/meta-srv/src/region/supervisor.rs | 12 ++++++------ 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 60cf7b2a691d..008153a94284 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -112,10 +112,10 @@ pub type DetectingRegion = (ClusterId, DatanodeId, RegionId); #[async_trait::async_trait] pub trait RegionFailureDetectorController: Send + Sync { /// Registers failure detectors for the given identifiers. - async fn register_failure_detectors(&self, idents: Vec); + async fn register_failure_detectors(&self, detecting_regions: Vec); /// Deregisters failure detectors for the given identifiers. - async fn deregister_failure_detectors(&self, idents: Vec); + async fn deregister_failure_detectors(&self, detecting_regions: Vec); } /// A noop implementation of [`RegionFailureDetectorController`]. @@ -124,9 +124,9 @@ pub struct NoopRegionFailureDetectorControl; #[async_trait::async_trait] impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { - async fn register_failure_detectors(&self, _ident: Vec) {} + async fn register_failure_detectors(&self, _detecting_regions: Vec) {} - async fn deregister_failure_detectors(&self, _ident: Vec) {} + async fn deregister_failure_detectors(&self, _detecting_regions: Vec) {} } /// The context of ddl. diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 6088d87a3f60..3848eeb4fc54 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -147,9 +147,7 @@ impl DropTableExecutor { } else { vec![] }; - if !detecting_regions.is_empty() { - ctx.deregister_failure_detectors(detecting_regions).await; - } + ctx.deregister_failure_detectors(detecting_regions).await; Ok(()) } diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index fccb84fdfcb7..8f73c8316409 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -307,18 +307,18 @@ impl RegionSupervisor { info!("RegionSupervisor is stopped!"); } - async fn register_failure_detectors(&self, idents: Vec) { + async fn register_failure_detectors(&self, detecting_regions: Vec) { let ts_millis = current_time_millis(); - for ident in idents { + for region in detecting_regions { // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode. self.failure_detector - .maybe_init_region_failure_detector(ident, ts_millis); + .maybe_init_region_failure_detector(region, ts_millis); } } - async fn deregister_failure_detectors(&self, idents: Vec) { - for ident in idents { - self.failure_detector.remove(&ident) + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + for region in detecting_regions { + self.failure_detector.remove(®ion) } } From cef5b6556a7a786a064d999ec86f51468a39423f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 1 Jul 2024 04:15:28 +0000 Subject: [PATCH 16/16] chore: rename ident --- .../rollback_downgraded_region.rs | 4 +- src/meta-srv/src/region/failure_detector.rs | 28 ++++++------- src/meta-srv/src/region/supervisor.rs | 42 +++++++++++-------- 3 files changed, 41 insertions(+), 33 deletions(-) diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index fe457274cdc1..8da1bbb0dbc9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -169,9 +169,9 @@ mod tests { assert_eq!(rx.len(), 0); state.rollback_downgraded_region(&mut ctx).await.unwrap(); let event = rx.try_recv().unwrap(); - let idents = event.into_region_failure_detectors(); + let detecting_regions = event.into_region_failure_detectors(); assert_eq!( - idents, + detecting_regions, vec![( ctx.persistent_ctx.cluster_id, from_peer.id, diff --git a/src/meta-srv/src/region/failure_detector.rs b/src/meta-srv/src/region/failure_detector.rs index 006602206a1c..8533d27f30ac 100644 --- a/src/meta-srv/src/region/failure_detector.rs +++ b/src/meta-srv/src/region/failure_detector.rs @@ -48,25 +48,25 @@ impl RegionFailureDetector { } } - /// Returns [`PhiAccrualFailureDetector`] of the specific [`Ident`]. + /// Returns [`PhiAccrualFailureDetector`] of the specific [`DetectingRegion`]. pub(crate) fn region_failure_detector( &self, - ident: DetectingRegion, + detecting_region: DetectingRegion, ) -> impl DerefMut + '_ { self.detectors - .entry(ident) + .entry(detecting_region) .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options)) } - /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`Ident`]. + /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`]. /// If a detector already exists for the region, it is returned. Otherwise, a new /// detector is created and initialized with the provided timestamp. pub(crate) fn maybe_init_region_failure_detector( &self, - ident: DetectingRegion, + detecting_region: DetectingRegion, ts_millis: i64, ) -> impl DerefMut + '_ { - self.detectors.entry(ident).or_insert_with(|| { + self.detectors.entry(detecting_region).or_insert_with(|| { let mut detector = PhiAccrualFailureDetector::from_options(self.options); detector.heartbeat(ts_millis); detector @@ -81,8 +81,8 @@ impl RegionFailureDetector { } /// Removes the specific [PhiAccrualFailureDetector] if exists. - pub(crate) fn remove(&self, ident: &DetectingRegion) { - self.detectors.remove(ident); + pub(crate) fn remove(&self, region: &DetectingRegion) { + self.detectors.remove(region); } /// Removes all [PhiAccrualFailureDetector]s. @@ -90,10 +90,10 @@ impl RegionFailureDetector { self.detectors.clear() } - /// Returns true if the specific `ident` exists. + /// Returns true if the specific [`DetectingRegion`] exists. #[cfg(test)] - pub(crate) fn contains(&self, ident: &DetectingRegion) -> bool { - self.detectors.contains_key(ident) + pub(crate) fn contains(&self, region: &DetectingRegion) -> bool { + self.detectors.contains_key(region) } /// Returns the length @@ -129,9 +129,9 @@ mod tests { #[test] fn test_default_failure_detector_container() { let container = RegionFailureDetector::new(Default::default()); - let ident = (0, 2, RegionId::new(1, 1)); - let _ = container.region_failure_detector(ident); - assert!(container.contains(&ident)); + let detecting_region = (0, 2, RegionId::new(1, 1)); + let _ = container.region_failure_detector(detecting_region); + assert!(container.contains(&detecting_region)); { let mut iter = container.iter(); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 8f73c8316409..8c7dff9b3df8 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -88,7 +88,7 @@ pub(crate) enum Event { impl Event { pub(crate) fn into_region_failure_detectors(self) -> Vec { match self { - Self::RegisterFailureDetectors(ident) => ident, + Self::RegisterFailureDetectors(detecting_regions) => detecting_regions, _ => unreachable!(), } } @@ -217,20 +217,20 @@ impl RegionFailureDetectorControl { #[async_trait::async_trait] impl RegionFailureDetectorController for RegionFailureDetectorControl { - async fn register_failure_detectors(&self, ident: Vec) { + async fn register_failure_detectors(&self, detecting_regions: Vec) { if let Err(err) = self .sender - .send(Event::RegisterFailureDetectors(ident)) + .send(Event::RegisterFailureDetectors(detecting_regions)) .await { error!(err; "RegionSupervisor is stop receiving heartbeat"); } } - async fn deregister_failure_detectors(&self, ident: Vec) { + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { if let Err(err) = self .sender - .send(Event::DeregisterFailureDetectors(ident)) + .send(Event::DeregisterFailureDetectors(detecting_regions)) .await { error!(err; "RegionSupervisor is stop receiving heartbeat"); @@ -290,11 +290,11 @@ impl RegionSupervisor { let regions = self.detect_region_failure(); self.handle_region_failures(regions).await; } - Event::RegisterFailureDetectors(ident) => { - self.register_failure_detectors(ident).await + Event::RegisterFailureDetectors(detecting_regions) => { + self.register_failure_detectors(detecting_regions).await } - Event::DeregisterFailureDetectors(ident) => { - self.deregister_failure_detectors(ident).await + Event::DeregisterFailureDetectors(detecting_regions) => { + self.deregister_failure_detectors(detecting_regions).await } Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), Event::Clear => self.clear(), @@ -439,8 +439,10 @@ impl RegionSupervisor { /// Updates the state of corresponding failure detectors. fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) { for region_id in heartbeat.regions { - let ident = (heartbeat.cluster_id, heartbeat.datanode_id, region_id); - let mut detector = self.failure_detector.region_failure_detector(ident); + let detecting_region = (heartbeat.cluster_id, heartbeat.datanode_id, region_id); + let mut detector = self + .failure_detector + .region_failure_detector(detecting_region); detector.heartbeat(heartbeat.timestamp); } } @@ -601,23 +603,29 @@ pub(crate) mod tests { let (mut supervisor, sender) = new_test_supervisor(); let controller = RegionFailureDetectorControl::new(sender.clone()); tokio::spawn(async move { supervisor.run().await }); - let ident = (0, 1, RegionId::new(1, 1)); - controller.register_failure_detectors(vec![ident]).await; + let detecting_region = (0, 1, RegionId::new(1, 1)); + controller + .register_failure_detectors(vec![detecting_region]) + .await; let (tx, rx) = oneshot::channel(); sender.send(Event::Dump(tx)).await.unwrap(); let detector = rx.await.unwrap(); - let region_detector = detector.region_failure_detector(ident).clone(); + let region_detector = detector.region_failure_detector(detecting_region).clone(); // Registers failure detector again - controller.register_failure_detectors(vec![ident]).await; + controller + .register_failure_detectors(vec![detecting_region]) + .await; let (tx, rx) = oneshot::channel(); sender.send(Event::Dump(tx)).await.unwrap(); let detector = rx.await.unwrap(); - let got = detector.region_failure_detector(ident).clone(); + let got = detector.region_failure_detector(detecting_region).clone(); assert_eq!(region_detector, got); - controller.deregister_failure_detectors(vec![ident]).await; + controller + .deregister_failure_detectors(vec![detecting_region]) + .await; let (tx, rx) = oneshot::channel(); sender.send(Event::Dump(tx)).await.unwrap(); assert!(rx.await.unwrap().is_empty());