Skip to content

Commit

Permalink
Improve metrics structure
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Sep 21, 2023
1 parent 2edb8be commit 65f74f8
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 110 deletions.
195 changes: 99 additions & 96 deletions io/zenoh-transport/src/common/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,61 @@
// ZettaScale Zenoh Team, <[email protected]>
//
macro_rules! stats_struct {
(@field_type ) => {AtomicUsize};
(@field_type $field_type:ident) => {std::sync::Arc<$field_type>};
(@report_field_type ) => {usize};
(@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}};
(@new($parent:expr) ) => {AtomicUsize::new(0)};
(@new($parent:expr) $field_type:ident) => {std::sync::Arc::new($field_type::new($parent))};
(@report_default ) => {0};
(@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}};
(@get $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
}
}
};
(@get $vis:vis $field_name:ident $field_type:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> [<$field_type Report>] {
self.$field_name.report()
}
}
};
(@increment $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
if let Some(parent) = self.parent.as_ref() {
parent.[<inc_ $field_name>](nb);
}
}
}
};
(@increment $vis:vis $field_name:ident $field_type:ident) => {};
(@openmetrics($stats:expr, $string:expr) $field_name:ident) => {
$string.push_str(stringify!($field_name));
$string.push_str(" ");
$string.push_str($stats.$field_name.to_string().as_str());
$string.push_str("\n");
};
(@openmetrics($stats:expr, $string:expr) $field_name:ident $field_type:ident) => {
$string.push_str(&$stats.$field_name.sub_openmetrics_text(stringify!($field_name)));
};
(@openmetrics_val($stats:expr) $field_name:ident) => {
$stats.$field_name.to_string().as_str()
};
(@openmetrics_val($stats:expr) $field_name:ident $field_type:ident) => {""};
(
$(#[$meta:meta])*
$vis:vis struct $struct_name:ident {

$(
$(# HELP $field_help:literal)?
$(# TYPE $field_type:literal)?
$(# HELP $help:literal)?
$(# TYPE $type:literal)?
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident,
$field_vis:vis $field_name:ident $($field_type:ident)?,
)*
}
) => {
Expand All @@ -28,23 +75,23 @@ macro_rules! stats_struct {
parent: Option<std::sync::Arc<$struct_name>>,
$(
$(#[$field_meta])*
$field_vis $field_name: AtomicUsize,
$field_vis $field_name: stats_struct!(@field_type $($field_type)?),
)*
}

$(#[$meta])*
$vis struct [<$struct_name Report>] {
$(
$(#[$field_meta])*
$field_vis $field_name: usize,
$field_vis $field_name: stats_struct!(@report_field_type $($field_type)?),
)*
}

impl $struct_name {
$vis fn new(parent: Option<std::sync::Arc<$struct_name>>) -> Self {
$struct_name {
parent,
$($field_name: AtomicUsize::new(0),)*
parent: parent.clone(),
$($field_name: stats_struct!(@new(parent.as_ref().map(|p|p.$field_name.clone())) $($field_type)?),)*
}
}

Expand All @@ -55,50 +102,55 @@ macro_rules! stats_struct {
}

$(
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
}

$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
if let Some(parent) = self.parent.as_ref() {
parent.[<inc_ $field_name>](nb);
}
}
stats_struct!(@get $vis $field_name $($field_type)?);
stats_struct!(@increment $vis $field_name $($field_type)?);
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
parent: None,
$($field_name: AtomicUsize::new(0),)*
$($field_name: stats_struct!(@new(None) $($field_type)?),)*
}
}
}

impl [<$struct_name Report>] {
#[allow(dead_code)]
fn sub_openmetrics_text(&self, prefix: &str) -> String {
let mut s = String::new();
$(
s.push_str(prefix);
s.push_str("{space=\"");
s.push_str(stringify!($field_name));
s.push_str("\"} ");
s.push_str(
stats_struct!(@openmetrics_val(self) $field_name $($field_type)?)
);
s.push_str("\n");
)*
s
}

$vis fn openmetrics_text(&self) -> String {
let mut s = String::new();
$(
$(
s.push_str("# HELP ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($field_help);
s.push_str($help);
s.push_str("\n");
)?
$(
s.push_str("# TYPE ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($field_type);
s.push_str($type);
s.push_str("\n");
)?
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str(self.$field_name.to_string().as_str());
s.push_str("\n");
stats_struct!(@openmetrics(self, s) $field_name $($field_type)?);
)*
s
}
Expand All @@ -107,7 +159,7 @@ macro_rules! stats_struct {
impl Default for [<$struct_name Report>] {
fn default() -> Self {
Self {
$($field_name: 0,)*
$($field_name: stats_struct!(@report_default $($field_type)?),)*
}
}
}
Expand All @@ -117,6 +169,14 @@ macro_rules! stats_struct {

use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct DiscriminatedStats {
pub user,
pub admin,
}
}

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TransportStats {
Expand All @@ -138,60 +198,31 @@ stats_struct! {

# HELP "Counter of sent zenoh put messages."
# TYPE "counter"
pub tx_z_put_user_msgs,
pub tx_z_put_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh put message payloads."
# TYPE "counter"
pub tx_z_put_user_pl_bytes,

# HELP "Counter of sent zenoh put messages."
# TYPE "counter"
pub tx_z_put_admin_msgs,

# HELP "Counter of sent bytes in zenoh put message payloads."
# TYPE "counter"
pub tx_z_put_admin_pl_bytes,
pub tx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh del messages."
# TYPE "counter"
pub tx_z_del_user_msgs,

# HELP "Counter of sent zenoh del messages."
# TYPE "counter"
pub tx_z_del_admin_msgs,
pub tx_z_del_msgs DiscriminatedStats,

# HELP "Counter of sent zenoh query messages."
# TYPE "counter"
pub tx_z_query_user_msgs,
pub tx_z_query_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh query message payloads."
# TYPE "counter"
pub tx_z_query_user_pl_bytes,

# HELP "Counter of sent zenoh query messages."
# TYPE "counter"
pub tx_z_query_admin_msgs,

# HELP "Counter of sent bytes in zenoh query message payloads."
# TYPE "counter"
pub tx_z_query_admin_pl_bytes,

# HELP "Counter of sent zenoh reply messages."
# TYPE "counter"
pub tx_z_reply_user_msgs,

# HELP "Counter of sent bytes in zenoh reply message payloads."
# TYPE "counter"
pub tx_z_reply_user_pl_bytes,
pub tx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh reply messages."
# TYPE "counter"
pub tx_z_reply_admin_msgs,
pub tx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh reply message payloads."
# TYPE "counter"
pub tx_z_reply_admin_pl_bytes,

pub tx_z_reply_pl_bytes DiscriminatedStats,

# HELP "Counter of received bytes."
# TYPE "counter"
Expand All @@ -211,58 +242,30 @@ stats_struct! {

# HELP "Counter of received zenoh put messages."
# TYPE "counter"
pub rx_z_put_user_msgs,

# HELP "Counter of received bytes in zenoh put message payloads."
# TYPE "counter"
pub rx_z_put_user_pl_bytes,

# HELP "Counter of received zenoh put messages."
# TYPE "counter"
pub rx_z_put_admin_msgs,
pub rx_z_put_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh put message payloads."
# TYPE "counter"
pub rx_z_put_admin_pl_bytes,
pub rx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh del messages."
# TYPE "counter"
pub rx_z_del_user_msgs,

# HELP "Counter of received zenoh del messages."
# TYPE "counter"
pub rx_z_del_admin_msgs,

# HELP "Counter of received zenoh query messages."
# TYPE "counter"
pub rx_z_query_user_msgs,

# HELP "Counter of received bytes in zenoh query message payloads."
# TYPE "counter"
pub rx_z_query_user_pl_bytes,
pub rx_z_del_msgs DiscriminatedStats,

# HELP "Counter of received zenoh query messages."
# TYPE "counter"
pub rx_z_query_admin_msgs,
pub rx_z_query_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh query message payloads."
# TYPE "counter"
pub rx_z_query_admin_pl_bytes,

# HELP "Counter of received zenoh reply messages."
# TYPE "counter"
pub rx_z_reply_user_msgs,

# HELP "Counter of received bytes in zenoh reply message payloads."
# TYPE "counter"
pub rx_z_reply_user_pl_bytes,
pub rx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh reply messages."
# TYPE "counter"
pub rx_z_reply_admin_msgs,
pub rx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh reply message payloads."
# TYPE "counter"
pub rx_z_reply_admin_pl_bytes,
pub rx_z_reply_pl_bytes DiscriminatedStats,
}
}
8 changes: 5 additions & 3 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1732,10 +1732,12 @@ macro_rules! inc_stats {
use zenoh_buffers::SplitBuffer;
match &$body {
PushBody::Put(p) => {
stats.[<inc_ $txrx _z_put_ $space _msgs>](1);
stats.[<inc_ $txrx _z_put_ $space _pl_bytes>](p.payload.len());
stats.[<$txrx _z_put_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_put_pl_bytes>].[<inc_ $space>](p.payload.len());
}
PushBody::Del(_) => {
stats.[<$txrx _z_del_msgs>].[<inc_ $space>](1);
}
PushBody::Del(_) => stats.[<inc_ $txrx _z_del_ $space _msgs>](1),
}
}
}
Expand Down
24 changes: 13 additions & 11 deletions zenoh/src/net/routing/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2002,13 +2002,15 @@ macro_rules! inc_req_stats {
use zenoh_buffers::SplitBuffer;
match &$body {
RequestBody::Put(p) => {
stats.[<inc_ $txrx _z_put_ $space _msgs>](1);
stats.[<inc_ $txrx _z_put_ $space _pl_bytes>](p.payload.len());
stats.[<$txrx _z_put_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_put_pl_bytes>].[<inc_ $space>](p.payload.len());
}
RequestBody::Del(_) => {
stats.[<$txrx _z_del_msgs>].[<inc_ $space>](1);
}
RequestBody::Del(_) => stats.[<inc_ $txrx _z_del_ $space _msgs>](1),
RequestBody::Query(q) => {
stats.[<inc_ $txrx _z_query_ $space _msgs>](1);
stats.[<inc_ $txrx _z_query_ $space _pl_bytes>](
stats.[<$txrx _z_query_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_query_pl_bytes>].[<inc_ $space>](
q.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0),
);
}
Expand All @@ -2032,16 +2034,16 @@ macro_rules! inc_res_stats {
use zenoh_buffers::SplitBuffer;
match &$body {
ResponseBody::Put(p) => {
stats.[<inc_ $txrx _z_put_ $space _msgs>](1);
stats.[<inc_ $txrx _z_put_ $space _pl_bytes>](p.payload.len());
stats.[<$txrx _z_put_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_put_pl_bytes>].[<inc_ $space>](p.payload.len());
}
ResponseBody::Reply(r) => {
stats.[<inc_ $txrx _z_reply_ $space _msgs>](1);
stats.[<inc_ $txrx _z_reply_ $space _pl_bytes>](r.payload.len());
stats.[<$txrx _z_reply_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_reply_pl_bytes>].[<inc_ $space>](r.payload.len());
}
ResponseBody::Err(e) => {
stats.[<inc_ $txrx _z_reply_ $space _msgs>](1);
stats.[<inc_ $txrx _z_reply_ $space _pl_bytes>](
stats.[<$txrx _z_reply_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_reply_pl_bytes>].[<inc_ $space>](
e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0),
);
}
Expand Down

0 comments on commit 65f74f8

Please sign in to comment.