Skip to content

Commit

Permalink
Prometheus (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Sep 22, 2023
1 parent 0a09208 commit 46c8fa1
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 84 deletions.
250 changes: 205 additions & 45 deletions io/zenoh-transport/src/common/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,154 @@
// ZettaScale Zenoh Team, <[email protected]>
//
macro_rules! stats_struct {
(@field_type ) => {AtomicUsize};
(@field_type $field_type:ident) => {std::sync::Arc<$field_type>};
(@report_field_type ) => {usize};
(@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}};
(@new($parent:expr) ) => {AtomicUsize::new(0)};
(@new($parent:expr) $field_type:ident) => {std::sync::Arc::new($field_type::new($parent))};
(@report_default ) => {0};
(@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}};
(@get $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
}
}
};
(@get $vis:vis $field_name:ident $field_type:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> [<$field_type Report>] {
self.$field_name.report()
}
}
};
(@increment $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
if let Some(parent) = self.parent.as_ref() {
parent.[<inc_ $field_name>](nb);
}
}
}
};
(@increment $vis:vis $field_name:ident $field_type:ident) => {};
(@openmetrics($stats:expr, $string:expr) $field_name:ident) => {
$string.push_str(stringify!($field_name));
$string.push_str(" ");
$string.push_str($stats.$field_name.to_string().as_str());
$string.push_str("\n");
};
(@openmetrics($stats:expr, $string:expr) $field_name:ident $field_type:ident) => {
$string.push_str(&$stats.$field_name.sub_openmetrics_text(stringify!($field_name)));
};
(@openmetrics_val($stats:expr) $field_name:ident) => {
$stats.$field_name.to_string().as_str()
};
(@openmetrics_val($stats:expr) $field_name:ident $field_type:ident) => {""};
(
$(#[$meta:meta])*
$vis:vis struct $struct_name:ident {

$(
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident,
$(# HELP $help:literal)?
$(# TYPE $type:literal)?
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident $($field_type:ident)?,
)*
}
) => {
paste::paste! {
$vis struct $struct_name {
parent: Option<std::sync::Arc<$struct_name>>,
$(
$(#[$field_meta:meta])*
$field_vis $field_name: AtomicUsize,
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@field_type $($field_type)?),
)*
}

$(#[$meta])*
$vis struct [<$struct_name Report>] {
$(
$(#[$field_meta:meta])*
$field_vis $field_name: usize,
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@report_field_type $($field_type)?),
)*
}

impl $struct_name {
$vis fn new(parent: Option<std::sync::Arc<$struct_name>>) -> Self {
$struct_name {
parent: parent.clone(),
$($field_name: stats_struct!(@new(parent.as_ref().map(|p|p.$field_name.clone())) $($field_type)?),)*
}
}

$vis fn report(&self) -> [<$struct_name Report>] {
[<$struct_name Report>] {
$($field_name: self.[<get_ $field_name>](),)*
}
}

$(
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
stats_struct!(@get $vis $field_name $($field_type)?);
stats_struct!(@increment $vis $field_name $($field_type)?);
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
parent: None,
$($field_name: stats_struct!(@new(None) $($field_type)?),)*
}
}
}

impl [<$struct_name Report>] {
#[allow(dead_code)]
fn sub_openmetrics_text(&self, prefix: &str) -> String {
let mut s = String::new();
$(
s.push_str(prefix);
s.push_str("{space=\"");
s.push_str(stringify!($field_name));
s.push_str("\"} ");
s.push_str(
stats_struct!(@openmetrics_val(self) $field_name $($field_type)?)
);
s.push_str("\n");
)*
s
}

$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
$vis fn openmetrics_text(&self) -> String {
let mut s = String::new();
$(
$(
s.push_str("# HELP ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($help);
s.push_str("\n");
)?
$(
s.push_str("# TYPE ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($type);
s.push_str("\n");
)?
stats_struct!(@openmetrics(self, s) $field_name $($field_type)?);
)*
s
}
)*
}

impl Default for $struct_name {
fn default() -> $struct_name {
$struct_name {
$($field_name: AtomicUsize::new(0),)*
impl Default for [<$struct_name Report>] {
fn default() -> Self {
Self {
$($field_name: stats_struct!(@report_default $($field_type)?),)*
}
}
}
Expand All @@ -68,44 +169,103 @@ macro_rules! stats_struct {

use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct DiscriminatedStats {
pub user,
pub admin,
}
}

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TransportStats {
# HELP "Counter of sent bytes."
# TYPE "counter"
pub tx_bytes,

# HELP "Counter of sent transport messages."
# TYPE "counter"
pub tx_t_msgs,

# HELP "Counter of sent network messages."
# TYPE "counter"
pub tx_n_msgs,

# HELP "Counter of dropped network messages."
# TYPE "counter"
pub tx_n_dropped,
pub tx_z_put_user_msgs,
pub tx_z_put_user_pl_bytes,
pub tx_z_put_admin_msgs,
pub tx_z_put_admin_pl_bytes,
pub tx_z_del_user_msgs,
pub tx_z_del_admin_msgs,
pub tx_z_query_user_msgs,
pub tx_z_query_user_pl_bytes,
pub tx_z_query_admin_msgs,
pub tx_z_query_admin_pl_bytes,
pub tx_z_reply_user_msgs,
pub tx_z_reply_user_pl_bytes,
pub tx_z_reply_admin_msgs,
pub tx_z_reply_admin_pl_bytes,
pub tx_bytes,

# HELP "Counter of sent zenoh put messages."
# TYPE "counter"
pub tx_z_put_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh put message payloads."
# TYPE "counter"
pub tx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh del messages."
# TYPE "counter"
pub tx_z_del_msgs DiscriminatedStats,

# HELP "Counter of sent zenoh query messages."
# TYPE "counter"
pub tx_z_query_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh query message payloads."
# TYPE "counter"
pub tx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh reply messages."
# TYPE "counter"
pub tx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh reply message payloads."
# TYPE "counter"
pub tx_z_reply_pl_bytes DiscriminatedStats,

# HELP "Counter of received bytes."
# TYPE "counter"
pub rx_bytes,

# HELP "Counter of received transport messages."
# TYPE "counter"
pub rx_t_msgs,

# HELP "Counter of received network messages."
# TYPE "counter"
pub rx_n_msgs,
pub rx_z_put_user_msgs,
pub rx_z_put_user_pl_bytes,
pub rx_z_put_admin_msgs,
pub rx_z_put_admin_pl_bytes,
pub rx_z_del_user_msgs,
pub rx_z_del_admin_msgs,
pub rx_z_query_user_msgs,
pub rx_z_query_user_pl_bytes,
pub rx_z_query_admin_msgs,
pub rx_z_query_admin_pl_bytes,
pub rx_z_reply_user_msgs,
pub rx_z_reply_user_pl_bytes,
pub rx_z_reply_admin_msgs,
pub rx_z_reply_admin_pl_bytes,
pub rx_bytes,

# HELP "Counter of dropped network messages."
# TYPE "counter"
pub rx_n_dropped,

# HELP "Counter of received zenoh put messages."
# TYPE "counter"
pub rx_z_put_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh put message payloads."
# TYPE "counter"
pub rx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh del messages."
# TYPE "counter"
pub rx_z_del_msgs DiscriminatedStats,

# HELP "Counter of received zenoh query messages."
# TYPE "counter"
pub rx_z_query_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh query message payloads."
# TYPE "counter"
pub rx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh reply messages."
# TYPE "counter"
pub rx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh reply message payloads."
# TYPE "counter"
pub rx_z_reply_pl_bytes DiscriminatedStats,
}
}
9 changes: 9 additions & 0 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ pub struct TransportManager {
pub(crate) locator_inspector: zenoh_link::LocatorInspector,
pub(crate) new_unicast_link_sender: NewLinkChannelSender,
pub(crate) tx_executor: TransportExecutor,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

impl TransportManager {
Expand All @@ -379,6 +381,8 @@ impl TransportManager {
locator_inspector: Default::default(),
new_unicast_link_sender,
tx_executor: TransportExecutor::new(tx_threads),
#[cfg(feature = "stats")]
stats: std::sync::Arc::new(crate::stats::TransportStats::default()),
};

// @TODO: this should be moved into the unicast module
Expand All @@ -402,6 +406,11 @@ impl TransportManager {
self.config.zid
}

#[cfg(feature = "stats")]
pub fn get_stats(&self) -> std::sync::Arc<crate::stats::TransportStats> {
self.stats.clone()
}

pub async fn close(&self) {
self.close_unicast().await;
self.tx_executor.stop().await;
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ impl TransportMulticastInner {
bail!("Invalid QoS configuration");
}

#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));

let ti = TransportMulticastInner {
manager,
priority_tx: priority_tx.into_boxed_slice().into(),
Expand All @@ -131,7 +134,7 @@ impl TransportMulticastInner {
callback: Arc::new(RwLock::new(None)),
timer: Arc::new(Timer::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
};

let link = TransportLinkMulticast::new(ti.clone(), config.link);
Expand Down
4 changes: 3 additions & 1 deletion io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ impl TransportUnicastLowlatency {
config: TransportConfigUnicast,
link: LinkUnicast,
) -> ZResult<TransportUnicastLowlatency> {
#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));
let t = TransportUnicastLowlatency {
manager,
config,
link: Arc::new(RwLock::new(link)),
callback: Arc::new(SyncRwLock::new(None)),
alive: Arc::new(AsyncMutex::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
handle_keepalive: Arc::new(RwLock::new(None)),
handle_rx: Arc::new(RwLock::new(None)),
};
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ impl TransportUnicastUniversal {
c.sync(initial_sn)?;
}

#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));

let t = TransportUnicastUniversal {
manager,
config,
Expand All @@ -111,7 +114,7 @@ impl TransportUnicastUniversal {
callback: Arc::new(RwLock::new(None)),
alive: Arc::new(AsyncMutex::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
};

Ok(t)
Expand Down
Loading

0 comments on commit 46c8fa1

Please sign in to comment.