diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 84acb376c463..b2727a8ef226 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -25,12 +25,12 @@ use api::v1::meta::{ pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; use common_meta::instruction::{Instruction, InstructionReply}; -use common_telemetry::{debug, info, warn}; +use common_telemetry::{debug, info, timer, warn}; use dashmap::DashMap; pub use failure_handler::RegionFailureHandler; pub use keep_lease_handler::KeepLeaseHandler; use metrics::{decrement_gauge, increment_gauge}; -pub use on_leader_start::OnLeaderStartHandler; +pub use on_leader_start_handler::OnLeaderStartHandler; pub use persist_stats_handler::PersistStatsHandler; pub use response_header_handler::ResponseHeaderHandler; use snafu::{OptionExt, ResultExt}; @@ -40,7 +40,7 @@ use tokio::sync::{oneshot, Notify, RwLock}; use self::node_stat::Stat; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::metasrv::Context; -use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM; +use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; use crate::sequence::Sequence; use crate::service::mailbox::{ BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId, @@ -52,7 +52,7 @@ pub(crate) mod failure_handler; mod keep_lease_handler; pub mod mailbox_handler; pub mod node_stat; -mod on_leader_start; +mod on_leader_start_handler; mod persist_stats_handler; pub(crate) mod region_lease_handler; mod response_header_handler; @@ -61,6 +61,12 @@ mod response_header_handler; pub trait HeartbeatHandler: Send + Sync { fn is_acceptable(&self, role: Role) -> bool; + fn name(&self) -> &'static str { + let type_name = std::any::type_name::(); + // short name + type_name.split("::").last().unwrap_or(type_name) + } + async fn handle( &self, req: &HeartbeatRequest, @@ -171,9 +177,22 @@ impl Pushers { } } +struct NameCachedHandler { + name: &'static str, + handler: Box, +} + +impl NameCachedHandler { + fn new(handler: impl HeartbeatHandler + 'static) -> Self { + let name = handler.name(); + let handler = Box::new(handler); + Self { name, handler } + } +} + #[derive(Clone, Default)] pub struct HeartbeatHandlerGroup { - handlers: Arc>>>, + handlers: Arc>>, pushers: Pushers, } @@ -187,7 +206,7 @@ impl HeartbeatHandlerGroup { pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) { let mut handlers = self.handlers.write().await; - handlers.push(Box::new(handler)); + handlers.push(NameCachedHandler::new(handler)); } pub async fn register(&self, key: impl AsRef, pusher: Pusher) { @@ -223,13 +242,14 @@ impl HeartbeatHandlerGroup { err_msg: format!("invalid role: {:?}", req.header), })?; - for h in handlers.iter() { + for NameCachedHandler { name, handler } in handlers.iter() { if ctx.is_skip_all() { break; } - if h.is_acceptable(role) { - h.handle(&req, &mut ctx, &mut acc).await?; + if handler.is_acceptable(role) { + let _timer = timer!(METRIC_META_HANDLER_EXECUTE, &[("name", *name)]); + handler.handle(&req, &mut ctx, &mut acc).await?; } } let header = std::mem::take(&mut acc.header); @@ -383,7 +403,11 @@ mod tests { use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; use tokio::sync::mpsc; - use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; + use crate::handler::mailbox_handler::MailboxHandler; + use crate::handler::{ + CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox, + OnLeaderStartHandler, PersistStatsHandler, Pusher, ResponseHeaderHandler, + }; use crate::sequence::Sequence; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; use crate::service::store::memory::MemStore; @@ -452,4 +476,25 @@ mod tests { (mailbox, receiver) } + + #[tokio::test] + async fn test_handler_name() { + let group = HeartbeatHandlerGroup::default(); + group.add_handler(ResponseHeaderHandler::default()).await; + group.add_handler(CheckLeaderHandler::default()).await; + group.add_handler(OnLeaderStartHandler::default()).await; + group.add_handler(CollectStatsHandler::default()).await; + group.add_handler(MailboxHandler::default()).await; + group.add_handler(PersistStatsHandler::default()).await; + + let handlers = group.handlers.read().await; + + assert_eq!(6, handlers.len()); + assert_eq!("ResponseHeaderHandler", handlers[0].handler.name()); + assert_eq!("CheckLeaderHandler", handlers[1].handler.name()); + assert_eq!("OnLeaderStartHandler", handlers[2].handler.name()); + assert_eq!("CollectStatsHandler", handlers[3].handler.name()); + assert_eq!("MailboxHandler", handlers[4].handler.name()); + assert_eq!("PersistStatsHandler", handlers[5].handler.name()); + } } diff --git a/src/meta-srv/src/handler/on_leader_start.rs b/src/meta-srv/src/handler/on_leader_start_handler.rs similarity index 100% rename from src/meta-srv/src/handler/on_leader_start.rs rename to src/meta-srv/src/handler/on_leader_start_handler.rs diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index f468c4fef55a..cac65989917a 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -17,3 +17,4 @@ pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; +pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";