From 659c3757c298074a5e074c6afca78a22cf0f0cae Mon Sep 17 00:00:00 2001 From: fys Date: Thu, 3 Aug 2023 11:43:02 +0800 Subject: [PATCH] cr --- src/meta-srv/src/metasrv/builder.rs | 30 ++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 70ad5044dab6..de041ab70039 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -21,7 +21,6 @@ use common_grpc::channel_manager::ChannelConfig; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; -use tokio::sync::mpsc::Sender; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; @@ -42,9 +41,7 @@ use crate::metasrv::{ }; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; -use crate::pubsub::{ - DefaultPublish, DefaultSubscribeManager, Message, PublishRef, SubscribeManagerRef, -}; +use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::lease_based::LeaseBasedSelector; use crate::sequence::Sequence; use crate::service::mailbox::MailboxRef; @@ -65,6 +62,7 @@ pub struct MetaSrvBuilder { lock: Option, metadata_service: Option, datanode_clients: Option>, + pubsub: Option<(PublishRef, SubscribeManagerRef)>, } impl MetaSrvBuilder { @@ -80,6 +78,7 @@ impl MetaSrvBuilder { lock: None, metadata_service: None, datanode_clients: None, + pubsub: None, } } @@ -133,6 +132,11 @@ impl MetaSrvBuilder { self } + pub fn pubsub(mut self, publish: PublishRef, subscribe_manager: SubscribeManagerRef) -> Self { + self.pubsub = Some((publish, subscribe_manager)); + self + } + pub async fn build(self) -> Result { let started = Arc::new(AtomicBool::new(false)); @@ -147,6 +151,7 @@ impl MetaSrvBuilder { lock, metadata_service, datanode_clients, + pubsub, } = self; let options = options.unwrap_or_default(); @@ -174,7 +179,6 @@ impl MetaSrvBuilder { &table_metadata_manager, ); let _ = ddl_manager.try_start(); - let (publish, sub_manager) = build_publish(); let handler_group = match handler_group { Some(handler_group) => handler_group, @@ -221,9 +225,11 @@ impl MetaSrvBuilder { } group.add_handler(RegionLeaseHandler::default()).await; group.add_handler(PersistStatsHandler::default()).await; - group - .add_handler(PublishHeartbeatHandler::new(publish.clone())) - .await; + if let Some((publish, _)) = pubsub.as_ref() { + group + .add_handler(PublishHeartbeatHandler::new(publish.clone())) + .await; + } group } }; @@ -246,7 +252,7 @@ impl MetaSrvBuilder { ddl_manager, table_metadata_manager, greptimedb_telemerty_task: get_greptimedb_telemetry_task(meta_peer_client).await, - pubsub: Some((publish, sub_manager)), + pubsub, }) } } @@ -325,12 +331,6 @@ fn build_ddl_manager( )) } -fn build_publish() -> (PublishRef, SubscribeManagerRef) { - let sub_manager = Arc::new(DefaultSubscribeManager::>::default()); - let publish = Arc::new(DefaultPublish::new(sub_manager.clone())); - (publish, sub_manager) -} - impl Default for MetaSrvBuilder { fn default() -> Self { Self::new()