From 59ec90299b4edb975aad844d6864b40dcd50e3be Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 15 Oct 2024 19:36:48 +0800 Subject: [PATCH] refactor: metasrv cannot be cloned (#4834) * refactor: metasrv cannot be cloned * chore: remove MetasrvInstance's clone --- src/meta-srv/src/bootstrap.rs | 16 ++++----- src/meta-srv/src/handler.rs | 1 - src/meta-srv/src/handler/failure_handler.rs | 1 - .../src/handler/region_lease_handler.rs | 1 - src/meta-srv/src/metasrv.rs | 35 +++++++++---------- src/meta-srv/src/metasrv/builder.rs | 6 ++-- src/meta-srv/src/mocks.rs | 11 +++--- src/meta-srv/src/service/admin.rs | 2 +- src/meta-srv/src/service/heartbeat.rs | 9 ++--- tests-integration/src/cluster.rs | 2 +- 10 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 8323a7f8dfad..d5f369a3488d 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -56,9 +56,8 @@ use crate::selector::SelectorType; use crate::service::admin; use crate::{error, Result}; -#[derive(Clone)] pub struct MetasrvInstance { - metasrv: Metasrv, + metasrv: Arc, httpsrv: Arc, @@ -83,8 +82,9 @@ impl MetasrvInstance { .with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?) .build(), ); + let metasrv = Arc::new(metasrv); // put metasrv into plugins for later use - plugins.insert::>(Arc::new(metasrv.clone())); + plugins.insert::>(metasrv.clone()); let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins)) .context(InitExportMetricsTaskSnafu)?; Ok(MetasrvInstance { @@ -178,13 +178,13 @@ pub async fn bootstrap_metasrv_with_router( Ok(()) } -pub fn router(metasrv: Metasrv) -> Router { +pub fn router(metasrv: Arc) -> Router { tonic::transport::Server::builder() .accept_http1(true) // for admin services - .add_service(HeartbeatServer::new(metasrv.clone())) - .add_service(StoreServer::new(metasrv.clone())) - .add_service(ClusterServer::new(metasrv.clone())) - .add_service(ProcedureServiceServer::new(metasrv.clone())) + .add_service(HeartbeatServer::from_arc(metasrv.clone())) + .add_service(StoreServer::from_arc(metasrv.clone())) + .add_service(ClusterServer::from_arc(metasrv.clone())) + .add_service(ProcedureServiceServer::from_arc(metasrv.clone())) .add_service(admin::make_admin_service(metasrv)) } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 5499cb259b2f..15eff85a88fc 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -443,7 +443,6 @@ impl Mailbox for HeartbeatMailbox { } /// The builder to build the group of heartbeat handlers. -#[derive(Clone)] pub struct HeartbeatHandlerGroupBuilder { /// The handler to handle region failure. region_failure_handler: Option, diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 8908881d1714..02f423c4b418 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -21,7 +21,6 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor}; -#[derive(Clone)] pub struct RegionFailureHandler { heartbeat_acceptor: HeartbeatAcceptor, } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 40a554e453e9..de491da37150 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -26,7 +26,6 @@ use crate::metasrv::Context; use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse}; use crate::region::RegionLeaseKeeper; -#[derive(Clone)] pub struct RegionLeaseHandler { region_lease_seconds: u64, region_lease_keeper: RegionLeaseKeeperRef, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 71d15be2a081..5c53a1abbb80 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -16,7 +16,7 @@ pub mod builder; use std::fmt::Display; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use clap::ValueEnum; @@ -337,7 +337,6 @@ impl MetaStateHandler { } } -#[derive(Clone)] pub struct Metasrv { state: StateRef, started: Arc, @@ -353,8 +352,8 @@ pub struct Metasrv { selector: SelectorRef, // The flow selector is used to select a target flownode. flow_selector: SelectorRef, - handler_group: Option, - handler_group_builder: Option, + handler_group: RwLock>, + handler_group_builder: Mutex>, election: Option, procedure_manager: ProcedureManagerRef, mailbox: MailboxRef, @@ -371,15 +370,7 @@ pub struct Metasrv { } impl Metasrv { - pub async fn try_start(&mut self) -> Result<()> { - let builder = self - .handler_group_builder - .take() - .context(error::UnexpectedSnafu { - violated: "expected heartbeat handler group builder", - })?; - self.handler_group = Some(Arc::new(builder.build()?)); - + pub async fn try_start(&self) -> Result<()> { if self .started .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) @@ -389,6 +380,16 @@ impl Metasrv { return Ok(()); } + let handler_group_builder = + self.handler_group_builder + .lock() + .unwrap() + .take() + .context(error::UnexpectedSnafu { + violated: "expected heartbeat handler group builder", + })?; + *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?)); + // Creates default schema if not exists self.table_metadata_manager .init() @@ -567,12 +568,8 @@ impl Metasrv { &self.flow_selector } - pub fn handler_group(&self) -> &Option { - &self.handler_group - } - - pub fn handler_group_builder(&mut self) -> &mut Option { - &mut self.handler_group_builder + pub fn handler_group(&self) -> Option { + self.handler_group.read().unwrap().clone() } pub fn election(&self) -> Option<&ElectionRef> { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 32129b853829..31c17bb8f2e6 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::sync::atomic::AtomicBool; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use client::client_manager::NodeClients; @@ -371,8 +371,8 @@ impl MetasrvBuilder { selector, // TODO(jeremy): We do not allow configuring the flow selector. flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), - handler_group: None, - handler_group_builder: Some(handler_group_builder), + handler_group: RwLock::new(None), + handler_group_builder: Mutex::new(Some(handler_group_builder)), election, procedure_manager, mailbox, diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 4a88701e1a77..318fd16c87f2 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -33,7 +33,7 @@ use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; pub struct MockInfo { pub server_addr: String, pub channel_manager: ChannelManager, - pub metasrv: Metasrv, + pub metasrv: Arc, } pub async fn mock_with_memstore() -> MockInfo { @@ -74,16 +74,17 @@ pub async fn mock( None => builder, }; - let mut metasrv = builder.build().await.unwrap(); + let metasrv = builder.build().await.unwrap(); metasrv.try_start().await.unwrap(); let (client, server) = tokio::io::duplex(1024); + let metasrv = Arc::new(metasrv); let service = metasrv.clone(); let _handle = tokio::spawn(async move { tonic::transport::Server::builder() - .add_service(HeartbeatServer::new(service.clone())) - .add_service(StoreServer::new(service.clone())) - .add_service(ProcedureServiceServer::new(service.clone())) + .add_service(HeartbeatServer::from_arc(service.clone())) + .add_service(StoreServer::from_arc(service.clone())) + .add_service(ProcedureServiceServer::from_arc(service.clone())) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index f60461fa7c54..fca8cf329552 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -30,7 +30,7 @@ use tonic::server::NamedService; use crate::metasrv::Metasrv; -pub fn make_admin_service(metasrv: Metasrv) -> Admin { +pub fn make_admin_service(metasrv: Arc) -> Admin { let router = Router::new().route("/health", health::HealthHandler); let router = router.route( diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 2470129be9fd..9569d0e431e3 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -46,12 +46,9 @@ impl heartbeat_server::Heartbeat for Metasrv { ) -> GrpcResult { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); - let handler_group = self - .handler_group() - .clone() - .context(error::UnexpectedSnafu { - violated: "expected heartbeat handlers", - })?; + let handler_group = self.handler_group().context(error::UnexpectedSnafu { + violated: "expected heartbeat handlers", + })?; let ctx = self.new_ctx(); let _handle = common_runtime::spawn_global(async move { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 44e594676923..769dad4c1e6b 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -72,7 +72,7 @@ pub struct GreptimeDbCluster { pub datanode_instances: HashMap, pub kv_backend: KvBackendRef, - pub metasrv: Metasrv, + pub metasrv: Arc, pub frontend: Arc, }