diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 2389770f5231..757bb273416e 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -48,6 +48,10 @@ impl Instance { _guard: guard, } } + + pub fn mut_inner(&mut self) -> &mut MetasrvInstance { + &mut self.instance + } } #[async_trait] diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d4a0d823c90f..8e1185e38936 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -653,7 +653,7 @@ impl StartCommand { } } -struct StandaloneInformationExtension { +pub struct StandaloneInformationExtension { region_server: RegionServer, procedure_manager: ProcedureManagerRef, start_time_ms: u64, diff --git a/src/common/meta/src/leadership_notifier.rs b/src/common/meta/src/leadership_notifier.rs index 4ba65d1adc54..0fed50d3880d 100644 --- a/src/common/meta/src/leadership_notifier.rs +++ b/src/common/meta/src/leadership_notifier.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_trait::async_trait; -use common_telemetry::error; +use common_telemetry::{error, info}; use crate::error::Result; @@ -45,6 +45,13 @@ pub struct LeadershipChangeNotifier { listeners: Vec>, } +impl LeadershipChangeNotifierCustomizer for LeadershipChangeNotifier { + fn customize(&self, notifier: &mut LeadershipChangeNotifier) { + info!("Customizing leadership change notifier"); + notifier.listeners.extend(self.listeners.clone()); + } +} + impl LeadershipChangeNotifier { /// Adds a listener to the notifier. pub fn add_listener(&mut self, listener: Arc) { diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index d10b22c610a8..79773498cfc8 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -148,6 +148,10 @@ impl MetasrvInstance { pub fn plugins(&self) -> Plugins { self.plugins.clone() } + + pub fn mut_inner(&mut self) -> &mut Metasrv { + &mut self.metasrv + } } pub async fn bootstrap_metasrv_with_router( diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index b45a1ac7e3ad..ce75ff5ff181 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -209,15 +209,16 @@ impl Pushers { } } +#[derive(Clone)] struct NameCachedHandler { name: &'static str, - handler: Box, + handler: Arc, } impl NameCachedHandler { fn new(handler: impl HeartbeatHandler + 'static) -> Self { let name = handler.name(); - let handler = Box::new(handler); + let handler = Arc::new(handler); Self { name, handler } } } @@ -225,7 +226,7 @@ impl NameCachedHandler { pub type HeartbeatHandlerGroupRef = Arc; /// The group of heartbeat handlers. -#[derive(Default)] +#[derive(Default, Clone)] pub struct HeartbeatHandlerGroup { handlers: Vec, pushers: Pushers, @@ -442,6 +443,7 @@ impl Mailbox for HeartbeatMailbox { } /// The builder to build the group of heartbeat handlers. +#[derive(Clone)] pub struct HeartbeatHandlerGroupBuilder { /// The handler to handle region failure. region_failure_handler: Option, @@ -540,7 +542,7 @@ impl HeartbeatHandlerGroupBuilder { } Ok(HeartbeatHandlerGroup { - handlers: self.handlers.into_iter().collect(), + handlers: self.handlers, pushers: self.pushers, }) } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 02f423c4b418..8908881d1714 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -21,6 +21,7 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor}; +#[derive(Clone)] pub struct RegionFailureHandler { heartbeat_acceptor: HeartbeatAcceptor, } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index de491da37150..40a554e453e9 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -26,6 +26,7 @@ use crate::metasrv::Context; use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse}; use crate::region::RegionLeaseKeeper; +#[derive(Clone)] pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c43eec60d456..71d15be2a081 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -44,18 +44,18 @@ use common_wal::config::MetasrvWalConfig; use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; use crate::election::{Election, LeaderChangeMessage}; use crate::error::{ - InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, - StopProcedureManagerSnafu, + self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu, + StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; use crate::failure_detector::PhiAccrualFailureDetectorOptions; -use crate::handler::HeartbeatHandlerGroupRef; +use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef}; use crate::lease::lookup_datanode_peer; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::ProcedureManagerListenerAdapter; @@ -353,7 +353,8 @@ pub struct Metasrv { selector: SelectorRef, // The flow selector is used to select a target flownode. flow_selector: SelectorRef, - handler_group: HeartbeatHandlerGroupRef, + handler_group: Option, + handler_group_builder: Option, election: Option, procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, @@ -370,7 +371,15 @@ pub struct Metasrv { } impl Metasrv { - pub async fn try_start(&self) -> Result<()> { + pub async fn try_start(&mut self) -> Result<()> { + let builder = self + .handler_group_builder + .take() + .context(error::UnexpectedSnafu { + violated: "expected heartbeat handler group builder", + })?; + self.handler_group = Some(Arc::new(builder.build()?)); + if self .started .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) @@ -558,10 +567,14 @@ impl Metasrv { &self.flow_selector } - pub fn handler_group(&self) -> &HeartbeatHandlerGroupRef { + pub fn handler_group(&self) -> &Option { &self.handler_group } + pub fn handler_group_builder(&mut self) -> &mut Option { + &mut self.handler_group_builder + } + pub fn election(&self) -> Option<&ElectionRef> { self.election.as_ref() } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index db01b3ec9ddf..32129b853829 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -48,9 +48,7 @@ use crate::flow_meta_alloc::FlowPeerAllocator; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::failure_handler::RegionFailureHandler; use crate::handler::region_lease_handler::RegionLeaseHandler; -use crate::handler::{ - HeartbeatHandlerGroup, HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers, -}; +use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; use crate::lease::MetaPeerLookupService; use crate::metasrv::{ ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, @@ -74,7 +72,7 @@ pub struct MetasrvBuilder { kv_backend: Option, in_memory: Option, selector: Option, - handler_group: Option, + handler_group_builder: Option, election: Option, meta_peer_client: Option, node_manager: Option, @@ -88,7 +86,7 @@ impl MetasrvBuilder { kv_backend: None, in_memory: None, selector: None, - handler_group: None, + handler_group_builder: None, meta_peer_client: None, election: None, options: None, @@ -118,8 +116,11 @@ impl MetasrvBuilder { self } - pub fn heartbeat_handler(mut self, handler_group: HeartbeatHandlerGroup) -> Self { - self.handler_group = Some(handler_group); + pub fn heartbeat_handler( + mut self, + handler_group_builder: HeartbeatHandlerGroupBuilder, + ) -> Self { + self.handler_group_builder = Some(handler_group_builder); self } @@ -161,7 +162,7 @@ impl MetasrvBuilder { kv_backend, in_memory, selector, - handler_group, + handler_group_builder, node_manager, plugins, table_metadata_allocator, @@ -338,8 +339,8 @@ impl MetasrvBuilder { .context(error::InitDdlManagerSnafu)?, ); - let handler_group = match handler_group { - Some(handler_group) => handler_group, + let handler_group_builder = match handler_group_builder { + Some(handler_group_builder) => handler_group_builder, None => { let region_lease_handler = RegionLeaseHandler::new( distributed_time_constants::REGION_LEASE_SECS, @@ -352,7 +353,6 @@ impl MetasrvBuilder { .with_region_failure_handler(region_failover_handler) .with_region_lease_handler(Some(region_lease_handler)) .add_default_handlers() - .build()? } }; @@ -371,7 +371,8 @@ impl MetasrvBuilder { selector, // TODO(jeremy): We do not allow configuring the flow selector. flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), - handler_group: Arc::new(handler_group), + handler_group: None, + handler_group_builder: Some(handler_group_builder), election, procedure_manager, mailbox, diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 014d524927d0..4a88701e1a77 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -74,7 +74,7 @@ pub async fn mock( None => builder, }; - let metasrv = builder.build().await.unwrap(); + let mut metasrv = builder.build().await.unwrap(); metasrv.try_start().await.unwrap(); let (client, server) = tokio::io::duplex(1024); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 32da666f2bcd..c6db67c76130 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -258,6 +258,7 @@ impl RegionFailureDetectorController for RegionFailureDetectorControl { } /// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`]. +#[derive(Clone)] pub(crate) struct HeartbeatAcceptor { sender: Sender, } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 569d6a8089b8..2470129be9fd 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -23,6 +23,7 @@ use api::v1::meta::{ use common_telemetry::{debug, error, info, warn}; use futures::StreamExt; use once_cell::sync::OnceCell; +use snafu::OptionExt; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio_stream::wrappers::ReceiverStream; @@ -45,7 +46,13 @@ impl heartbeat_server::Heartbeat for Metasrv { ) -> GrpcResult { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); - let handler_group = self.handler_group().clone(); + let handler_group = self + .handler_group() + .clone() + .context(error::UnexpectedSnafu { + violated: "expected heartbeat handlers", + })?; + let ctx = self.new_ctx(); let _handle = common_runtime::spawn_global(async move { let mut pusher_key = None;