Skip to content

Commit

Permalink
feat: add handler execution timer (GreptimeTeam#1791)
Browse files Browse the repository at this point in the history
* feat: add handler execution timer

* fix: by cr
  • Loading branch information
fengjiachun authored Jun 20, 2023
1 parent 2dd86b6 commit 1703e93
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
65 changes: 55 additions & 10 deletions src/meta-srv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use api::v1::meta::{
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
use common_meta::instruction::{Instruction, InstructionReply};
use common_telemetry::{debug, info, warn};
use common_telemetry::{debug, info, timer, warn};
use dashmap::DashMap;
pub use failure_handler::RegionFailureHandler;
pub use keep_lease_handler::KeepLeaseHandler;
use metrics::{decrement_gauge, increment_gauge};
pub use on_leader_start::OnLeaderStartHandler;
pub use on_leader_start_handler::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
pub use response_header_handler::ResponseHeaderHandler;
use snafu::{OptionExt, ResultExt};
Expand All @@ -40,7 +40,7 @@ use tokio::sync::{oneshot, Notify, RwLock};
use self::node_stat::Stat;
use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu};
use crate::metasrv::Context;
use crate::metrics::METRIC_META_HEARTBEAT_CONNECTION_NUM;
use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM};
use crate::sequence::Sequence;
use crate::service::mailbox::{
BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId,
Expand All @@ -52,7 +52,7 @@ pub(crate) mod failure_handler;
mod keep_lease_handler;
pub mod mailbox_handler;
pub mod node_stat;
mod on_leader_start;
mod on_leader_start_handler;
mod persist_stats_handler;
pub(crate) mod region_lease_handler;
mod response_header_handler;
Expand All @@ -61,6 +61,12 @@ mod response_header_handler;
pub trait HeartbeatHandler: Send + Sync {
fn is_acceptable(&self, role: Role) -> bool;

fn name(&self) -> &'static str {
let type_name = std::any::type_name::<Self>();
// short name
type_name.split("::").last().unwrap_or(type_name)
}

async fn handle(
&self,
req: &HeartbeatRequest,
Expand Down Expand Up @@ -171,9 +177,22 @@ impl Pushers {
}
}

struct NameCachedHandler {
name: &'static str,
handler: Box<dyn HeartbeatHandler>,
}

impl NameCachedHandler {
fn new(handler: impl HeartbeatHandler + 'static) -> Self {
let name = handler.name();
let handler = Box::new(handler);
Self { name, handler }
}
}

#[derive(Clone, Default)]
pub struct HeartbeatHandlerGroup {
handlers: Arc<RwLock<Vec<Box<dyn HeartbeatHandler>>>>,
handlers: Arc<RwLock<Vec<NameCachedHandler>>>,
pushers: Pushers,
}

Expand All @@ -187,7 +206,7 @@ impl HeartbeatHandlerGroup {

pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
let mut handlers = self.handlers.write().await;
handlers.push(Box::new(handler));
handlers.push(NameCachedHandler::new(handler));
}

pub async fn register(&self, key: impl AsRef<str>, pusher: Pusher) {
Expand Down Expand Up @@ -223,13 +242,14 @@ impl HeartbeatHandlerGroup {
err_msg: format!("invalid role: {:?}", req.header),
})?;

for h in handlers.iter() {
for NameCachedHandler { name, handler } in handlers.iter() {
if ctx.is_skip_all() {
break;
}

if h.is_acceptable(role) {
h.handle(&req, &mut ctx, &mut acc).await?;
if handler.is_acceptable(role) {
let _timer = timer!(METRIC_META_HANDLER_EXECUTE, &[("name", *name)]);
handler.handle(&req, &mut ctx, &mut acc).await?;
}
}
let header = std::mem::take(&mut acc.header);
Expand Down Expand Up @@ -383,7 +403,11 @@ mod tests {
use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION};
use tokio::sync::mpsc;

use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher};
use crate::handler::mailbox_handler::MailboxHandler;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
OnLeaderStartHandler, PersistStatsHandler, Pusher, ResponseHeaderHandler,
};
use crate::sequence::Sequence;
use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef};
use crate::service::store::memory::MemStore;
Expand Down Expand Up @@ -452,4 +476,25 @@ mod tests {

(mailbox, receiver)
}

#[tokio::test]
async fn test_handler_name() {
let group = HeartbeatHandlerGroup::default();
group.add_handler(ResponseHeaderHandler::default()).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(MailboxHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;

let handlers = group.handlers.read().await;

assert_eq!(6, handlers.len());
assert_eq!("ResponseHeaderHandler", handlers[0].handler.name());
assert_eq!("CheckLeaderHandler", handlers[1].handler.name());
assert_eq!("OnLeaderStartHandler", handlers[2].handler.name());
assert_eq!("CollectStatsHandler", handlers[3].handler.name());
assert_eq!("MailboxHandler", handlers[4].handler.name());
assert_eq!("PersistStatsHandler", handlers[5].handler.name());
}
}
File renamed without changes.
1 change: 1 addition & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";

0 comments on commit 1703e93

Please sign in to comment.