Skip to content

Commit

Permalink
feat: introduce LeadershipChangeNotifier and `LeadershipChangeListe…
Browse files Browse the repository at this point in the history
…ner` (#4817)

* feat: introduce `LeadershipChangeNotifier`

* refactor: use `LeadershipChangeNotifier`

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: adjust log styling
  • Loading branch information
WenyXu authored Oct 11, 2024
1 parent 0f907ef commit 4bb1f4f
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 34 deletions.
18 changes: 17 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ pub enum Error {
source: common_procedure::Error,
},

#[snafu(display("Failed to start procedure manager"))]
StartProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::Error,
},

#[snafu(display("Failed to stop procedure manager"))]
StopProcedureManager {
#[snafu(implicit)]
location: Location,
source: common_procedure::Error,
},

#[snafu(display(
"Failed to get procedure output, procedure id: {procedure_id}, error: {err_msg}"
))]
Expand Down Expand Up @@ -715,7 +729,9 @@ impl ErrorExt for Error {

SubmitProcedure { source, .. }
| QueryProcedure { source, .. }
| WaitProcedure { source, .. } => source.status_code(),
| WaitProcedure { source, .. }
| StartProcedureManager { source, .. }
| StopProcedureManager { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),
Expand Down
156 changes: 156 additions & 0 deletions src/common/meta/src/leadership_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::error;

use crate::error::Result;

pub type LeadershipChangeNotifierCustomizerRef = Arc<dyn LeadershipChangeNotifierCustomizer>;

/// A trait for customizing the leadership change notifier.
pub trait LeadershipChangeNotifierCustomizer: Send + Sync {
fn customize(&self, notifier: &mut LeadershipChangeNotifier);
}

/// A trait for handling leadership change events in a distributed system.
#[async_trait]
pub trait LeadershipChangeListener: Send + Sync {
/// Returns the listener name.
fn name(&self) -> &str;

/// Called when the node transitions to the leader role.
async fn on_leader_start(&self) -> Result<()>;

/// Called when the node transitions to the follower role.
async fn on_leader_stop(&self) -> Result<()>;
}

/// A notifier for leadership change events.
#[derive(Default)]
pub struct LeadershipChangeNotifier {
listeners: Vec<Arc<dyn LeadershipChangeListener>>,
}

impl LeadershipChangeNotifier {
/// Adds a listener to the notifier.
pub fn add_listener(&mut self, listener: Arc<dyn LeadershipChangeListener>) {
self.listeners.push(listener);
}

/// Notify all listeners that the node has become a leader.
pub async fn notify_on_leader_start(&self) {
for listener in &self.listeners {
if let Err(err) = listener.on_leader_start().await {
error!(
err;
"Failed to notify listener: {}, event 'on_leader_start'",
listener.name()
);
}
}
}

/// Notify all listeners that the node has become a follower.
pub async fn notify_on_leader_stop(&self) {
for listener in &self.listeners {
if let Err(err) = listener.on_leader_stop().await {
error!(
err;
"Failed to notify listener: {}, event: 'on_follower_start'",
listener.name()
);
}
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use super::*;

struct MockListener {
name: String,
on_leader_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
on_follower_start_fn: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
}

#[async_trait::async_trait]
impl LeadershipChangeListener for MockListener {
fn name(&self) -> &str {
&self.name
}

async fn on_leader_start(&self) -> Result<()> {
if let Some(f) = &self.on_leader_start_fn {
return f();
}
Ok(())
}

async fn on_leader_stop(&self) -> Result<()> {
if let Some(f) = &self.on_follower_start_fn {
return f();
}
Ok(())
}
}

#[tokio::test]
async fn test_leadership_change_notifier() {
let mut notifier = LeadershipChangeNotifier::default();
let listener1 = Arc::new(MockListener {
name: "listener1".to_string(),
on_leader_start_fn: None,
on_follower_start_fn: None,
});
let called_on_leader_start = Arc::new(AtomicBool::new(false));
let called_on_follower_start = Arc::new(AtomicBool::new(false));
let called_on_leader_start_moved = called_on_leader_start.clone();
let called_on_follower_start_moved = called_on_follower_start.clone();
let listener2 = Arc::new(MockListener {
name: "listener2".to_string(),
on_leader_start_fn: Some(Box::new(move || {
called_on_leader_start_moved.store(true, Ordering::Relaxed);
Ok(())
})),
on_follower_start_fn: Some(Box::new(move || {
called_on_follower_start_moved.store(true, Ordering::Relaxed);
Ok(())
})),
});

notifier.add_listener(listener1);
notifier.add_listener(listener2);

let listener1 = notifier.listeners.first().unwrap();
let listener2 = notifier.listeners.get(1).unwrap();

assert_eq!(listener1.name(), "listener1");
assert_eq!(listener2.name(), "listener2");

notifier.notify_on_leader_start().await;
assert!(!called_on_follower_start.load(Ordering::Relaxed));
assert!(called_on_leader_start.load(Ordering::Relaxed));

notifier.notify_on_leader_stop().await;
assert!(called_on_follower_start.load(Ordering::Relaxed));
assert!(called_on_leader_start.load(Ordering::Relaxed));
}
}
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod heartbeat;
pub mod instruction;
pub mod key;
pub mod kv_backend;
pub mod leadership_notifier;
pub mod lock_key;
pub mod metrics;
pub mod node_manager;
Expand Down
17 changes: 17 additions & 0 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ pub mod kafka;
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_wal::config::MetasrvWalConfig;
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use snafu::ResultExt;
use store_api::storage::{RegionId, RegionNumber};

use crate::error::{EncodeWalOptionsSnafu, Result};
use crate::kv_backend::KvBackendRef;
use crate::leadership_notifier::LeadershipChangeListener;
use crate::wal_options_allocator::kafka::topic_manager::TopicManager as KafkaTopicManager;

/// Allocates wal options in region granularity.
Expand Down Expand Up @@ -94,6 +96,21 @@ impl WalOptionsAllocator {
}
}

#[async_trait]
impl LeadershipChangeListener for WalOptionsAllocator {
fn name(&self) -> &str {
"WalOptionsAllocator"
}

async fn on_leader_start(&self) -> Result<()> {
self.start().await
}

async fn on_leader_stop(&self) -> Result<()> {
Ok(())
}
}

/// Allocates a wal options for each region. The allocated wal options is encoded immediately.
pub fn allocate_region_wal_options(
regions: Vec<RegionNumber>,
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub mod selector;
pub mod service;
pub mod state;
pub mod table_meta_alloc;

pub use crate::error::Result;

mod greptimedb_telemetry;
Expand Down
62 changes: 30 additions & 32 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::leadership_notifier::{
LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
};
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
Expand Down Expand Up @@ -56,6 +59,7 @@ use crate::handler::HeartbeatHandlerGroupRef;
use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::ProcedureManagerListenerAdapter;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
Expand Down Expand Up @@ -291,17 +295,15 @@ pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;

pub struct MetaStateHandler {
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
subscribe_manager: Option<SubscriptionManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
leadership_change_notifier: LeadershipChangeNotifier,
state: StateRef,
}

impl MetaStateHandler {
pub async fn on_become_leader(&self) {
pub async fn on_leader_start(&self) {
self.state.write().unwrap().next_state(become_leader(false));

if let Err(e) = self.leader_cached_kv_backend.load().await {
Expand All @@ -310,33 +312,19 @@ impl MetaStateHandler {
self.state.write().unwrap().next_state(become_leader(true));
}

if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
ticker.start();
}

if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}

if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");
}
self.leadership_change_notifier
.notify_on_leader_start()
.await;

self.greptimedb_telemetry_task.should_report(true);
}

pub async fn on_become_follower(&self) {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());

// Stops the procedures.
if let Err(e) = self.procedure_manager.stop().await {
error!(e; "Failed to stop procedure manager");
}

if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
// Stops the supervisor ticker.
ticker.stop();
}
self.leadership_change_notifier
.notify_on_leader_stop()
.await;

// Suspends reporting.
self.greptimedb_telemetry_task.should_report(false);
Expand Down Expand Up @@ -410,15 +398,25 @@ impl Metasrv {
greptimedb_telemetry_task
.start()
.context(StartTelemetryTaskSnafu)?;
let region_supervisor_ticker = self.region_supervisor_ticker.clone();

// Builds leadership change notifier.
let mut leadership_change_notifier = LeadershipChangeNotifier::default();
leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
leadership_change_notifier
.add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
}
if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
customizer.customize(&mut leadership_change_notifier);
}

let state_handler = MetaStateHandler {
greptimedb_telemetry_task,
subscribe_manager,
procedure_manager,
wal_options_allocator: self.wal_options_allocator.clone(),
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
region_supervisor_ticker,
leadership_change_notifier,
};
let _handle = common_runtime::spawn_global(async move {
loop {
Expand All @@ -429,12 +427,12 @@ impl Metasrv {
info!("Leader's cache has bean cleared on leader change: {msg}");
match msg {
LeaderChangeMessage::Elected(_) => {
state_handler.on_become_leader().await;
state_handler.on_leader_start().await;
}
LeaderChangeMessage::StepDown(leader) => {
error!("Leader :{:?} step down", leader);

state_handler.on_become_follower().await;
state_handler.on_leader_stop().await;
}
}
}
Expand All @@ -448,7 +446,7 @@ impl Metasrv {
}
}

state_handler.on_become_follower().await;
state_handler.on_leader_stop().await;
});

// Register candidate and keep lease in background.
Expand Down
Loading

0 comments on commit 4bb1f4f

Please sign in to comment.