diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 7c8d17cbe098..4c3d974c0a06 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -733,6 +733,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Handler not found: {}", name))] + HandlerNotFound { + name: String, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -803,7 +810,8 @@ impl ErrorExt for Error { | Error::InitExportMetricsTask { .. } | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } - | Error::TomlFormat { .. } => StatusCode::InvalidArguments, + | Error::TomlFormat { .. } + | Error::HandlerNotFound { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 5363b6c548a7..9d82920c08dd 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -232,17 +232,6 @@ pub struct HeartbeatHandlerGroup { } impl HeartbeatHandlerGroup { - pub(crate) fn new(pushers: Pushers) -> Self { - Self { - handlers: vec![], - pushers, - } - } - - fn add_handler(&mut self, handler: impl HeartbeatHandler + 'static) { - self.handlers.push(NameCachedHandler::new(handler)); - } - /// Registers the heartbeat response [`Pusher`] with the given key to the group. pub async fn register_pusher(&self, key: impl AsRef, pusher: Pusher) { let key = key.as_ref(); @@ -458,25 +447,34 @@ pub struct HeartbeatHandlerGroupBuilder { region_failure_handler: Option, /// The handler to handle region lease. - region_lease_handler: RegionLeaseHandler, + region_lease_handler: Option, /// The plugins. plugins: Option, /// The heartbeat response pushers. pushers: Pushers, + + /// The group of heartbeat handlers. + handlers: Vec, } impl HeartbeatHandlerGroupBuilder { - pub fn new(pushers: Pushers, region_lease_handler: RegionLeaseHandler) -> Self { + pub fn new(pushers: Pushers) -> Self { Self { region_failure_handler: None, - region_lease_handler, + region_lease_handler: None, plugins: None, pushers, + handlers: vec![], } } + pub fn with_region_lease_handler(mut self, handler: Option) -> Self { + self.region_lease_handler = handler; + self + } + /// Sets the [`RegionFailureHandler`]. pub fn with_region_failure_handler(mut self, handler: Option) -> Self { self.region_failure_handler = handler; @@ -489,10 +487,10 @@ impl HeartbeatHandlerGroupBuilder { self } - /// Builds the group of heartbeat handlers. - pub fn build(self) -> HeartbeatHandlerGroup { + /// Adds the default handlers. + pub fn add_default_handlers(mut self) -> Self { // Extract the `PublishHeartbeatHandler` from the plugins. - let publish_heartbeat_handler = if let Some(plugins) = self.plugins { + let publish_heartbeat_handler = if let Some(plugins) = self.plugins.as_ref() { plugins .get::() .map(|publish| PublishHeartbeatHandler::new(publish.clone())) @@ -500,39 +498,94 @@ impl HeartbeatHandlerGroupBuilder { None }; - // TODO(weny): Considers classifying handlers - // to make it easier for upper layers to customize handler groups. - let mut group = HeartbeatHandlerGroup::new(self.pushers); - group.add_handler(ResponseHeaderHandler); + self.add_handler_last(ResponseHeaderHandler); // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, // because even if the current meta-server node is no longer the leader it can // still help the datanode to keep lease. - group.add_handler(DatanodeKeepLeaseHandler); - group.add_handler(FlownodeKeepLeaseHandler); - group.add_handler(CheckLeaderHandler); - group.add_handler(OnLeaderStartHandler); - group.add_handler(ExtractStatHandler); - group.add_handler(CollectDatanodeClusterInfoHandler); - group.add_handler(CollectFrontendClusterInfoHandler); - group.add_handler(CollectFlownodeClusterInfoHandler); - group.add_handler(MailboxHandler); - group.add_handler(self.region_lease_handler); - group.add_handler(FilterInactiveRegionStatsHandler); - if let Some(region_failure_handler) = self.region_failure_handler { - group.add_handler(region_failure_handler); + self.add_handler_last(DatanodeKeepLeaseHandler); + self.add_handler_last(FlownodeKeepLeaseHandler); + self.add_handler_last(CheckLeaderHandler); + self.add_handler_last(OnLeaderStartHandler); + self.add_handler_last(ExtractStatHandler); + self.add_handler_last(CollectDatanodeClusterInfoHandler); + self.add_handler_last(CollectFrontendClusterInfoHandler); + self.add_handler_last(CollectFlownodeClusterInfoHandler); + self.add_handler_last(MailboxHandler); + if let Some(region_lease_handler) = self.region_lease_handler.take() { + self.add_handler_last(region_lease_handler); + } + self.add_handler_last(FilterInactiveRegionStatsHandler); + if let Some(region_failure_handler) = self.region_failure_handler.take() { + self.add_handler_last(region_failure_handler); } if let Some(publish_heartbeat_handler) = publish_heartbeat_handler { - group.add_handler(publish_heartbeat_handler); + self.add_handler_last(publish_heartbeat_handler); } - group.add_handler(CollectStatsHandler::default()); + self.add_handler_last(CollectStatsHandler::default()); + + self + } - group + /// Builds the group of heartbeat handlers. + pub fn build(self) -> HeartbeatHandlerGroup { + HeartbeatHandlerGroup { + handlers: self.handlers.into_iter().collect(), + pushers: self.pushers, + } + } + + /// Adds the handler after the specified handler. + pub fn add_handler_after( + &mut self, + target: &'static str, + handler: impl HeartbeatHandler + 'static, + ) -> Result<()> { + if let Some(pos) = self.handlers.iter().position(|x| x.name == target) { + self.handlers + .insert(pos + 1, NameCachedHandler::new(handler)); + return Ok(()); + } + + error::HandlerNotFoundSnafu { name: target }.fail() + } + + /// Adds the handler before the specified handler. + pub fn add_handler_before( + &mut self, + target: &'static str, + handler: impl HeartbeatHandler + 'static, + ) -> Result<()> { + if let Some(pos) = self.handlers.iter().position(|x| x.name == target) { + self.handlers.insert(pos, NameCachedHandler::new(handler)); + return Ok(()); + } + + error::HandlerNotFoundSnafu { name: target }.fail() + } + + /// Replaces the handler with the specified name. + pub fn replace_handler( + &mut self, + target: &'static str, + handler: impl HeartbeatHandler + 'static, + ) -> Result<()> { + if let Some(pos) = self.handlers.iter().position(|x| x.name == target) { + self.handlers[pos] = NameCachedHandler::new(handler); + return Ok(()); + } + + error::HandlerNotFoundSnafu { name: target }.fail() + } + + fn add_handler_last(&mut self, handler: impl HeartbeatHandler + 'static) { + self.handlers.push(NameCachedHandler::new(handler)); } } #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::sync::Arc; use std::time::Duration; @@ -541,17 +594,9 @@ mod tests { use common_meta::sequence::SequenceBuilder; use tokio::sync::mpsc; - use crate::handler::check_leader_handler::CheckLeaderHandler; - use crate::handler::collect_cluster_info_handler::{ - CollectDatanodeClusterInfoHandler, CollectFlownodeClusterInfoHandler, - CollectFrontendClusterInfoHandler, - }; + use super::{HeartbeatHandlerGroupBuilder, Pushers}; + use crate::error; use crate::handler::collect_stats_handler::CollectStatsHandler; - use crate::handler::extract_stat_handler::ExtractStatHandler; - use crate::handler::filter_inactive_region_stats::FilterInactiveRegionStatsHandler; - use crate::handler::keep_lease_handler::{DatanodeKeepLeaseHandler, FlownodeKeepLeaseHandler}; - use crate::handler::mailbox_handler::MailboxHandler; - use crate::handler::on_leader_start_handler::OnLeaderStartHandler; use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; @@ -621,24 +666,212 @@ mod tests { (mailbox, receiver) } - #[tokio::test] - async fn test_handler_name() { - let mut group = HeartbeatHandlerGroup::default(); - group.add_handler(ResponseHeaderHandler); - group.add_handler(DatanodeKeepLeaseHandler); - group.add_handler(FlownodeKeepLeaseHandler); - group.add_handler(CheckLeaderHandler); - group.add_handler(OnLeaderStartHandler); - group.add_handler(ExtractStatHandler); - group.add_handler(CollectDatanodeClusterInfoHandler); - group.add_handler(CollectFrontendClusterInfoHandler); - group.add_handler(CollectFlownodeClusterInfoHandler); - group.add_handler(MailboxHandler); - group.add_handler(FilterInactiveRegionStatsHandler); - group.add_handler(CollectStatsHandler::default()); + #[test] + fn test_handler_group_builder() { + let group = HeartbeatHandlerGroupBuilder::new(Pushers::default()) + .add_default_handlers() + .build(); + + let handlers = group.handlers; + assert_eq!(12, handlers.len()); + + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + #[test] + fn test_handler_group_builder_add_before() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .add_handler_before( + "FilterInactiveRegionStatsHandler", + CollectStatsHandler::default(), + ) + .unwrap(); + + let group = builder.build(); let handlers = group.handlers; + assert_eq!(13, handlers.len()); + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "CollectStatsHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_add_before_first() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .add_handler_before("ResponseHeaderHandler", CollectStatsHandler::default()) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; + assert_eq!(13, handlers.len()); + + let names = [ + "CollectStatsHandler", + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_add_after() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .add_handler_after("MailboxHandler", CollectStatsHandler::default()) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; + assert_eq!(13, handlers.len()); + + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "CollectStatsHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_add_after_last() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .add_handler_after("CollectStatsHandler", ResponseHeaderHandler) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; + assert_eq!(13, handlers.len()); + + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + "ResponseHeaderHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_replace() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .replace_handler("MailboxHandler", CollectStatsHandler::default()) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; + assert_eq!(12, handlers.len()); + + let names = [ + "ResponseHeaderHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "CollectStatsHandler", + "FilterInactiveRegionStatsHandler", + "CollectStatsHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_replace_last() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .replace_handler("CollectStatsHandler", ResponseHeaderHandler) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; assert_eq!(12, handlers.len()); let names = [ @@ -653,6 +886,38 @@ mod tests { "CollectFlownodeClusterInfoHandler", "MailboxHandler", "FilterInactiveRegionStatsHandler", + "ResponseHeaderHandler", + ]; + + for (handler, name) in handlers.iter().zip(names.into_iter()) { + assert_eq!(handler.name, name); + } + } + + #[test] + fn test_handler_group_builder_replace_first() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + builder + .replace_handler("ResponseHeaderHandler", CollectStatsHandler::default()) + .unwrap(); + + let group = builder.build(); + let handlers = group.handlers; + assert_eq!(12, handlers.len()); + + let names = [ + "CollectStatsHandler", + "DatanodeKeepLeaseHandler", + "FlownodeKeepLeaseHandler", + "CheckLeaderHandler", + "OnLeaderStartHandler", + "ExtractStatHandler", + "CollectDatanodeClusterInfoHandler", + "CollectFrontendClusterInfoHandler", + "CollectFlownodeClusterInfoHandler", + "MailboxHandler", + "FilterInactiveRegionStatsHandler", "CollectStatsHandler", ]; @@ -660,4 +925,24 @@ mod tests { assert_eq!(handler.name, name); } } + + #[test] + fn test_handler_group_builder_handler_not_found() { + let mut builder = + HeartbeatHandlerGroupBuilder::new(Pushers::default()).add_default_handlers(); + let err = builder + .add_handler_before("NotExists", CollectStatsHandler::default()) + .unwrap_err(); + assert_matches!(err, error::Error::HandlerNotFound { .. }); + + let err = builder + .add_handler_after("NotExists", CollectStatsHandler::default()) + .unwrap_err(); + assert_matches!(err, error::Error::HandlerNotFound { .. }); + + let err = builder + .replace_handler("NotExists", CollectStatsHandler::default()) + .unwrap_err(); + assert_matches!(err, error::Error::HandlerNotFound { .. }); + } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 662de433abd9..88d7e5bd6d05 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -358,9 +358,11 @@ impl MetasrvBuilder { memory_region_keeper.clone(), ); - HeartbeatHandlerGroupBuilder::new(pushers, region_lease_handler) + HeartbeatHandlerGroupBuilder::new(pushers) .with_plugins(plugins.clone()) .with_region_failure_handler(region_failover_handler) + .with_region_lease_handler(Some(region_lease_handler)) + .add_default_handlers() .build() } };