Skip to content

Commit

Permalink
feat: introduce default customizers (#4831)
Browse files Browse the repository at this point in the history
* feat: introduce `DefaultHeartbeatHandlerGroupBuilderCustomizer` and `DefaultLeadershipChangeNotifierCustomizer`

* chore: code styling
  • Loading branch information
WenyXu authored Oct 15, 2024
1 parent 972c244 commit 3197b8b
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 18 deletions.
24 changes: 22 additions & 2 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ impl Instance {
}
}

pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
&mut self.instance
pub fn get_inner(&self) -> &MetasrvInstance {
&self.instance
}
}

Expand Down Expand Up @@ -90,6 +90,14 @@ impl Command {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
self.subcmd.load_options(global_options)
}

pub fn config_file(&self) -> &Option<String> {
self.subcmd.config_file()
}

pub fn env_prefix(&self) -> &String {
self.subcmd.env_prefix()
}
}

#[derive(Parser)]
Expand All @@ -109,6 +117,18 @@ impl SubCommand {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
}

fn config_file(&self) -> &Option<String> {
match self {
SubCommand::Start(cmd) => &cmd.config_file,
}
}

fn env_prefix(&self) -> &String {
match self {
SubCommand::Start(cmd) => &cmd.env_prefix,
}
}
}

#[derive(Debug, Default, Parser)]
Expand Down
12 changes: 12 additions & 0 deletions src/common/base/src/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ impl Plugins {
self.read().get::<T>().cloned()
}

pub fn get_or_insert<T, F>(&self, f: F) -> T
where
T: 'static + Send + Sync + Clone,
F: FnOnce() -> T,
{
let mut binding = self.write();
if !binding.contains::<T>() {
binding.insert(f());
}
binding.get::<T>().cloned().unwrap()
}

pub fn map_mut<T: 'static + Send + Sync, F, R>(&self, mapper: F) -> R
where
F: FnOnce(Option<&mut T>) -> R,
Expand Down
26 changes: 23 additions & 3 deletions src/common/meta/src/leadership_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use common_telemetry::{error, info};
Expand All @@ -24,6 +24,8 @@ pub type LeadershipChangeNotifierCustomizerRef = Arc<dyn LeadershipChangeNotifie
/// A trait for customizing the leadership change notifier.
pub trait LeadershipChangeNotifierCustomizer: Send + Sync {
fn customize(&self, notifier: &mut LeadershipChangeNotifier);

fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>);
}

/// A trait for handling leadership change events in a distributed system.
Expand All @@ -45,10 +47,28 @@ pub struct LeadershipChangeNotifier {
listeners: Vec<Arc<dyn LeadershipChangeListener>>,
}

impl LeadershipChangeNotifierCustomizer for LeadershipChangeNotifier {
#[derive(Default)]
pub struct DefaultLeadershipChangeNotifierCustomizer {
listeners: Mutex<Vec<Arc<dyn LeadershipChangeListener>>>,
}

impl DefaultLeadershipChangeNotifierCustomizer {
pub fn new() -> Self {
Self {
listeners: Mutex::new(Vec::new()),
}
}
}

impl LeadershipChangeNotifierCustomizer for DefaultLeadershipChangeNotifierCustomizer {
fn customize(&self, notifier: &mut LeadershipChangeNotifier) {
info!("Customizing leadership change notifier");
notifier.listeners.extend(self.listeners.clone());
let listeners = self.listeners.lock().unwrap().clone();
notifier.listeners.extend(listeners);
}

fn add_listener(&self, listener: Arc<dyn LeadershipChangeListener>) {
self.listeners.lock().unwrap().push(listener);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl MetasrvInstance {
self.plugins.clone()
}

pub fn mut_inner(&mut self) -> &mut Metasrv {
&mut self.metasrv
pub fn get_inner(&self) -> &Metasrv {
&self.metasrv
}
}

Expand Down
128 changes: 117 additions & 11 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::{BTreeMap, HashSet};
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use api::v1::meta::mailbox_message::Payload;
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Pushers {
}

#[derive(Clone)]
struct NameCachedHandler {
pub struct NameCachedHandler {
name: &'static str,
handler: Arc<dyn HeartbeatHandler>,
}
Expand Down Expand Up @@ -547,15 +547,27 @@ impl HeartbeatHandlerGroupBuilder {
})
}

fn add_handler_after_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers.insert(pos + 1, handler);
return Ok(());
}

error::HandlerNotFoundSnafu { name: target }.fail()
}

/// Adds the handler after the specified handler.
pub fn add_handler_after(
&mut self,
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
self.add_handler_after_inner(target, NameCachedHandler::new(handler))
}

fn add_handler_before_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers
.insert(pos + 1, NameCachedHandler::new(handler));
self.handlers.insert(pos, handler);
return Ok(());
}

Expand All @@ -568,8 +580,12 @@ impl HeartbeatHandlerGroupBuilder {
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
self.add_handler_before_inner(target, NameCachedHandler::new(handler))
}

fn replace_handler_inner(&mut self, target: &str, handler: NameCachedHandler) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers.insert(pos, NameCachedHandler::new(handler));
self.handlers[pos] = handler;
return Ok(());
}

Expand All @@ -582,25 +598,115 @@ impl HeartbeatHandlerGroupBuilder {
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Result<()> {
if let Some(pos) = self.handlers.iter().position(|x| x.name == target) {
self.handlers[pos] = NameCachedHandler::new(handler);
return Ok(());
}
self.replace_handler_inner(target, NameCachedHandler::new(handler))
}

error::HandlerNotFoundSnafu { name: target }.fail()
fn add_handler_last_inner(&mut self, handler: NameCachedHandler) {
self.handlers.push(handler);
}

fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) {
self.handlers.push(NameCachedHandler::new(handler));
self.add_handler_last_inner(NameCachedHandler::new(handler));
}
}

pub type HeartbeatHandlerGroupBuilderCustomizerRef =
Arc<dyn HeartbeatHandlerGroupBuilderCustomizer>;

pub enum CustomizeHeartbeatGroupAction {
AddHandlerAfter {
target: String,
handler: NameCachedHandler,
},
AddHandlerBefore {
target: String,
handler: NameCachedHandler,
},
ReplaceHandler {
target: String,
handler: NameCachedHandler,
},
AddHandlerLast {
handler: NameCachedHandler,
},
}

impl CustomizeHeartbeatGroupAction {
pub fn new_add_handler_after(
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Self {
Self::AddHandlerAfter {
target: target.to_string(),
handler: NameCachedHandler::new(handler),
}
}

pub fn new_add_handler_before(
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Self {
Self::AddHandlerBefore {
target: target.to_string(),
handler: NameCachedHandler::new(handler),
}
}

pub fn new_replace_handler(
target: &'static str,
handler: impl HeartbeatHandler + 'static,
) -> Self {
Self::ReplaceHandler {
target: target.to_string(),
handler: NameCachedHandler::new(handler),
}
}

pub fn new_add_handler_last(handler: impl HeartbeatHandler + 'static) -> Self {
Self::AddHandlerLast {
handler: NameCachedHandler::new(handler),
}
}
}

/// The customizer of the [`HeartbeatHandlerGroupBuilder`].
pub trait HeartbeatHandlerGroupBuilderCustomizer: Send + Sync {
fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()>;

fn add_action(&self, action: CustomizeHeartbeatGroupAction);
}

#[derive(Default)]
pub struct DefaultHeartbeatHandlerGroupBuilderCustomizer {
actions: Mutex<Vec<CustomizeHeartbeatGroupAction>>,
}

impl HeartbeatHandlerGroupBuilderCustomizer for DefaultHeartbeatHandlerGroupBuilderCustomizer {
fn customize(&self, builder: &mut HeartbeatHandlerGroupBuilder) -> Result<()> {
info!("Customizing the heartbeat handler group builder");
let mut actions = self.actions.lock().unwrap();
for action in actions.drain(..) {
match action {
CustomizeHeartbeatGroupAction::AddHandlerAfter { target, handler } => {
builder.add_handler_after_inner(&target, handler)?;
}
CustomizeHeartbeatGroupAction::AddHandlerBefore { target, handler } => {
builder.add_handler_before_inner(&target, handler)?;
}
CustomizeHeartbeatGroupAction::ReplaceHandler { target, handler } => {
builder.replace_handler_inner(&target, handler)?;
}
CustomizeHeartbeatGroupAction::AddHandlerLast { handler } => {
builder.add_handler_last_inner(handler);
}
}
}
Ok(())
}

fn add_action(&self, action: CustomizeHeartbeatGroupAction) {
self.actions.lock().unwrap().push(action);
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/handler/filter_inactive_region_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::error::Result;
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;

pub const NAME: &str = "FilterInactiveRegionStatsHandler";

pub struct FilterInactiveRegionStatsHandler;

#[async_trait]
Expand Down

0 comments on commit 3197b8b

Please sign in to comment.