From 4bb1f4f18447685fd1536e4fddafb0c9a6bb116e Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 11 Oct 2024 20:48:53 +0800 Subject: [PATCH] feat: introduce `LeadershipChangeNotifier` and `LeadershipChangeListener` (#4817) * feat: introduce `LeadershipChangeNotifier` * refactor: use `LeadershipChangeNotifier` * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: adjust log styling --- src/common/meta/src/error.rs | 18 ++- src/common/meta/src/leadership_notifier.rs | 156 +++++++++++++++++++ src/common/meta/src/lib.rs | 1 + src/common/meta/src/wal_options_allocator.rs | 17 ++ src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/metasrv.rs | 62 ++++---- src/meta-srv/src/procedure.rs | 30 ++++ src/meta-srv/src/region/supervisor.rs | 19 +++ 8 files changed, 270 insertions(+), 34 deletions(-) create mode 100644 src/common/meta/src/leadership_notifier.rs diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 849ee28948ef..0e7709df0b68 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -147,6 +147,20 @@ pub enum Error { source: common_procedure::Error, }, + #[snafu(display("Failed to start procedure manager"))] + StartProcedureManager { + #[snafu(implicit)] + location: Location, + source: common_procedure::Error, + }, + + #[snafu(display("Failed to stop procedure manager"))] + StopProcedureManager { + #[snafu(implicit)] + location: Location, + source: common_procedure::Error, + }, + #[snafu(display( "Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}" ))] @@ -715,7 +729,9 @@ impl ErrorExt for Error { SubmitProcedure { source, .. } | QueryProcedure { source, .. } - | WaitProcedure { source, .. } => source.status_code(), + | WaitProcedure { source, .. } + | StartProcedureManager { source, .. } + | StopProcedureManager { source, .. } => source.status_code(), RegisterProcedureLoader { source, .. } => source.status_code(), External { source, .. } => source.status_code(), OperateDatanode { source, .. } => source.status_code(), diff --git a/src/common/meta/src/leadership_notifier.rs b/src/common/meta/src/leadership_notifier.rs new file mode 100644 index 000000000000..4ba65d1adc54 --- /dev/null +++ b/src/common/meta/src/leadership_notifier.rs @@ -0,0 +1,156 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use common_telemetry::error; + +use crate::error::Result; + +pub type LeadershipChangeNotifierCustomizerRef = Arc; + +/// A trait for customizing the leadership change notifier. +pub trait LeadershipChangeNotifierCustomizer: Send + Sync { + fn customize(&self, notifier: &mut LeadershipChangeNotifier); +} + +/// A trait for handling leadership change events in a distributed system. +#[async_trait] +pub trait LeadershipChangeListener: Send + Sync { + /// Returns the listener name. + fn name(&self) -> &str; + + /// Called when the node transitions to the leader role. + async fn on_leader_start(&self) -> Result<()>; + + /// Called when the node transitions to the follower role. + async fn on_leader_stop(&self) -> Result<()>; +} + +/// A notifier for leadership change events. +#[derive(Default)] +pub struct LeadershipChangeNotifier { + listeners: Vec>, +} + +impl LeadershipChangeNotifier { + /// Adds a listener to the notifier. + pub fn add_listener(&mut self, listener: Arc) { + self.listeners.push(listener); + } + + /// Notify all listeners that the node has become a leader. + pub async fn notify_on_leader_start(&self) { + for listener in &self.listeners { + if let Err(err) = listener.on_leader_start().await { + error!( + err; + "Failed to notify listener: {}, event 'on_leader_start'", + listener.name() + ); + } + } + } + + /// Notify all listeners that the node has become a follower. + pub async fn notify_on_leader_stop(&self) { + for listener in &self.listeners { + if let Err(err) = listener.on_leader_stop().await { + error!( + err; + "Failed to notify listener: {}, event: 'on_follower_start'", + listener.name() + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + use super::*; + + struct MockListener { + name: String, + on_leader_start_fn: Option Result<()> + Send + Sync>>, + on_follower_start_fn: Option Result<()> + Send + Sync>>, + } + + #[async_trait::async_trait] + impl LeadershipChangeListener for MockListener { + fn name(&self) -> &str { + &self.name + } + + async fn on_leader_start(&self) -> Result<()> { + if let Some(f) = &self.on_leader_start_fn { + return f(); + } + Ok(()) + } + + async fn on_leader_stop(&self) -> Result<()> { + if let Some(f) = &self.on_follower_start_fn { + return f(); + } + Ok(()) + } + } + + #[tokio::test] + async fn test_leadership_change_notifier() { + let mut notifier = LeadershipChangeNotifier::default(); + let listener1 = Arc::new(MockListener { + name: "listener1".to_string(), + on_leader_start_fn: None, + on_follower_start_fn: None, + }); + let called_on_leader_start = Arc::new(AtomicBool::new(false)); + let called_on_follower_start = Arc::new(AtomicBool::new(false)); + let called_on_leader_start_moved = called_on_leader_start.clone(); + let called_on_follower_start_moved = called_on_follower_start.clone(); + let listener2 = Arc::new(MockListener { + name: "listener2".to_string(), + on_leader_start_fn: Some(Box::new(move || { + called_on_leader_start_moved.store(true, Ordering::Relaxed); + Ok(()) + })), + on_follower_start_fn: Some(Box::new(move || { + called_on_follower_start_moved.store(true, Ordering::Relaxed); + Ok(()) + })), + }); + + notifier.add_listener(listener1); + notifier.add_listener(listener2); + + let listener1 = notifier.listeners.first().unwrap(); + let listener2 = notifier.listeners.get(1).unwrap(); + + assert_eq!(listener1.name(), "listener1"); + assert_eq!(listener2.name(), "listener2"); + + notifier.notify_on_leader_start().await; + assert!(!called_on_follower_start.load(Ordering::Relaxed)); + assert!(called_on_leader_start.load(Ordering::Relaxed)); + + notifier.notify_on_leader_stop().await; + assert!(called_on_follower_start.load(Ordering::Relaxed)); + assert!(called_on_leader_start.load(Ordering::Relaxed)); + } +} diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 1dd658890c3a..158350bc32de 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -32,6 +32,7 @@ pub mod heartbeat; pub mod instruction; pub mod key; pub mod kv_backend; +pub mod leadership_notifier; pub mod lock_key; pub mod metrics; pub mod node_manager; diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs index ba0c6f407fda..283f43b9a8a5 100644 --- a/src/common/meta/src/wal_options_allocator.rs +++ b/src/common/meta/src/wal_options_allocator.rs @@ -17,6 +17,7 @@ pub mod kafka; use std::collections::HashMap; use std::sync::Arc; +use async_trait::async_trait; use common_wal::config::MetasrvWalConfig; use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use snafu::ResultExt; @@ -24,6 +25,7 @@ use store_api::storage::{RegionId, RegionNumber}; use crate::error::{EncodeWalOptionsSnafu, Result}; use crate::kv_backend::KvBackendRef; +use crate::leadership_notifier::LeadershipChangeListener; use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager; /// Allocates wal options in region granularity. @@ -94,6 +96,21 @@ impl WalOptionsAllocator { } } +#[async_trait] +impl LeadershipChangeListener for WalOptionsAllocator { + fn name(&self) -> &str { + "WalOptionsAllocator" + } + + async fn on_leader_start(&self) -> Result<()> { + self.start().await + } + + async fn on_leader_stop(&self) -> Result<()> { + Ok(()) + } +} + /// Allocates a wal options for each region. The allocated wal options is encoded immediately. pub fn allocate_region_wal_options( regions: Vec, diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 717edc92b204..01b48f1da083 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -40,7 +40,6 @@ pub mod selector; pub mod service; pub mod state; pub mod table_meta_alloc; - pub use crate::error::Result; mod greptimedb_telemetry; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index de7d54aa65a2..3cdb64e1e052 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -29,6 +29,9 @@ use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; +use common_meta::leadership_notifier::{ + LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef, +}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; @@ -56,6 +59,7 @@ use crate::handler::HeartbeatHandlerGroupRef; use crate::lease::lookup_datanode_peer; use crate::lock::DistLockRef; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; +use crate::procedure::ProcedureManagerListenerAdapter; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; use crate::region::supervisor::RegionSupervisorTickerRef; use crate::selector::{Selector, SelectorType}; @@ -291,17 +295,15 @@ pub type SelectorRef = Arc>; pub struct MetaStateHandler { - procedure_manager: ProcedureManagerRef, - wal_options_allocator: WalOptionsAllocatorRef, subscribe_manager: Option, greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, - region_supervisor_ticker: Option, + leadership_change_notifier: LeadershipChangeNotifier, state: StateRef, } impl MetaStateHandler { - pub async fn on_become_leader(&self) { + pub async fn on_leader_start(&self) { self.state.write().unwrap().next_state(become_leader(false)); if let Err(e) = self.leader_cached_kv_backend.load().await { @@ -310,33 +312,19 @@ impl MetaStateHandler { self.state.write().unwrap().next_state(become_leader(true)); } - if let Some(ticker) = self.region_supervisor_ticker.as_ref() { - ticker.start(); - } - - if let Err(e) = self.procedure_manager.start().await { - error!(e; "Failed to start procedure manager"); - } - - if let Err(e) = self.wal_options_allocator.start().await { - error!(e; "Failed to start wal options allocator"); - } + self.leadership_change_notifier + .notify_on_leader_start() + .await; self.greptimedb_telemetry_task.should_report(true); } - pub async fn on_become_follower(&self) { + pub async fn on_leader_stop(&self) { self.state.write().unwrap().next_state(become_follower()); - // Stops the procedures. - if let Err(e) = self.procedure_manager.stop().await { - error!(e; "Failed to stop procedure manager"); - } - - if let Some(ticker) = self.region_supervisor_ticker.as_ref() { - // Stops the supervisor ticker. - ticker.stop(); - } + self.leadership_change_notifier + .notify_on_leader_stop() + .await; // Suspends reporting. self.greptimedb_telemetry_task.should_report(false); @@ -410,15 +398,25 @@ impl Metasrv { greptimedb_telemetry_task .start() .context(StartTelemetryTaskSnafu)?; - let region_supervisor_ticker = self.region_supervisor_ticker.clone(); + + // Builds leadership change notifier. + let mut leadership_change_notifier = LeadershipChangeNotifier::default(); + leadership_change_notifier.add_listener(self.wal_options_allocator.clone()); + leadership_change_notifier + .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager))); + if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker { + leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _); + } + if let Some(customizer) = self.plugins.get::() { + customizer.customize(&mut leadership_change_notifier); + } + let state_handler = MetaStateHandler { greptimedb_telemetry_task, subscribe_manager, - procedure_manager, - wal_options_allocator: self.wal_options_allocator.clone(), state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), - region_supervisor_ticker, + leadership_change_notifier, }; let _handle = common_runtime::spawn_global(async move { loop { @@ -429,12 +427,12 @@ impl Metasrv { info!("Leader's cache has bean cleared on leader change: {msg}"); match msg { LeaderChangeMessage::Elected(_) => { - state_handler.on_become_leader().await; + state_handler.on_leader_start().await; } LeaderChangeMessage::StepDown(leader) => { error!("Leader :{:?} step down", leader); - state_handler.on_become_follower().await; + state_handler.on_leader_stop().await; } } } @@ -448,7 +446,7 @@ impl Metasrv { } } - state_handler.on_become_follower().await; + state_handler.on_leader_stop().await; }); // Register candidate and keep lease in background. diff --git a/src/meta-srv/src/procedure.rs b/src/meta-srv/src/procedure.rs index 1f430654d224..8e696723e8d5 100644 --- a/src/meta-srv/src/procedure.rs +++ b/src/meta-srv/src/procedure.rs @@ -12,7 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +use async_trait::async_trait; +use common_meta::error::{self, Result}; +use common_meta::leadership_notifier::LeadershipChangeListener; +use common_procedure::ProcedureManagerRef; +use snafu::ResultExt; + pub mod region_migration; #[cfg(test)] mod tests; pub mod utils; + +#[derive(Clone)] +pub struct ProcedureManagerListenerAdapter(pub ProcedureManagerRef); + +#[async_trait] +impl LeadershipChangeListener for ProcedureManagerListenerAdapter { + fn name(&self) -> &str { + "ProcedureManager" + } + + async fn on_leader_start(&self) -> Result<()> { + self.0 + .start() + .await + .context(error::StartProcedureManagerSnafu) + } + + async fn on_leader_stop(&self) -> Result<()> { + self.0 + .stop() + .await + .context(error::StopProcedureManagerSnafu) + } +} diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 80f6c848652b..32da666f2bcd 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,10 +16,12 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; +use async_trait::async_trait; use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; +use common_meta::leadership_notifier::LeadershipChangeListener; use common_meta::peer::PeerLookupServiceRef; use common_meta::{ClusterId, DatanodeId}; use common_runtime::JoinHandle; @@ -129,6 +131,23 @@ pub struct RegionSupervisorTicker { sender: Sender, } +#[async_trait] +impl LeadershipChangeListener for RegionSupervisorTicker { + fn name(&self) -> &'static str { + "RegionSupervisorTicker" + } + + async fn on_leader_start(&self) -> common_meta::error::Result<()> { + self.start(); + Ok(()) + } + + async fn on_leader_stop(&self) -> common_meta::error::Result<()> { + self.stop(); + Ok(()) + } +} + impl RegionSupervisorTicker { pub(crate) fn new(tick_interval: Duration, sender: Sender) -> Self { Self {