Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus #556

Merged
merged 5 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 205 additions & 45 deletions io/zenoh-transport/src/common/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,154 @@
// ZettaScale Zenoh Team, <[email protected]>
//
macro_rules! stats_struct {
(@field_type ) => {AtomicUsize};
(@field_type $field_type:ident) => {std::sync::Arc<$field_type>};
(@report_field_type ) => {usize};
(@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}};
(@new($parent:expr) ) => {AtomicUsize::new(0)};
(@new($parent:expr) $field_type:ident) => {std::sync::Arc::new($field_type::new($parent))};
(@report_default ) => {0};
(@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}};
(@get $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
}
}
};
(@get $vis:vis $field_name:ident $field_type:ident) => {
paste::paste! {
$vis fn [<get_ $field_name>](&self) -> [<$field_type Report>] {
self.$field_name.report()
}
}
};
(@increment $vis:vis $field_name:ident) => {
paste::paste! {
$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
if let Some(parent) = self.parent.as_ref() {
parent.[<inc_ $field_name>](nb);
}
}
}
};
(@increment $vis:vis $field_name:ident $field_type:ident) => {};
(@openmetrics($stats:expr, $string:expr) $field_name:ident) => {
$string.push_str(stringify!($field_name));
$string.push_str(" ");
$string.push_str($stats.$field_name.to_string().as_str());
$string.push_str("\n");
};
(@openmetrics($stats:expr, $string:expr) $field_name:ident $field_type:ident) => {
$string.push_str(&$stats.$field_name.sub_openmetrics_text(stringify!($field_name)));
};
(@openmetrics_val($stats:expr) $field_name:ident) => {
$stats.$field_name.to_string().as_str()
};
(@openmetrics_val($stats:expr) $field_name:ident $field_type:ident) => {""};
(
$(#[$meta:meta])*
$vis:vis struct $struct_name:ident {

$(
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident,
$(# HELP $help:literal)?
$(# TYPE $type:literal)?
$(#[$field_meta:meta])*
$field_vis:vis $field_name:ident $($field_type:ident)?,
)*
}
) => {
paste::paste! {
$vis struct $struct_name {
parent: Option<std::sync::Arc<$struct_name>>,
$(
$(#[$field_meta:meta])*
$field_vis $field_name: AtomicUsize,
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@field_type $($field_type)?),
)*
}

$(#[$meta])*
$vis struct [<$struct_name Report>] {
$(
$(#[$field_meta:meta])*
$field_vis $field_name: usize,
$(#[$field_meta])*
$field_vis $field_name: stats_struct!(@report_field_type $($field_type)?),
)*
}

impl $struct_name {
$vis fn new(parent: Option<std::sync::Arc<$struct_name>>) -> Self {
$struct_name {
parent: parent.clone(),
$($field_name: stats_struct!(@new(parent.as_ref().map(|p|p.$field_name.clone())) $($field_type)?),)*
}
}

$vis fn report(&self) -> [<$struct_name Report>] {
[<$struct_name Report>] {
$($field_name: self.[<get_ $field_name>](),)*
}
}

$(
$vis fn [<get_ $field_name>](&self) -> usize {
self.$field_name.load(Ordering::Relaxed)
stats_struct!(@get $vis $field_name $($field_type)?);
stats_struct!(@increment $vis $field_name $($field_type)?);
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
parent: None,
$($field_name: stats_struct!(@new(None) $($field_type)?),)*
}
}
}

impl [<$struct_name Report>] {
#[allow(dead_code)]
fn sub_openmetrics_text(&self, prefix: &str) -> String {
let mut s = String::new();
$(
s.push_str(prefix);
s.push_str("{space=\"");
s.push_str(stringify!($field_name));
s.push_str("\"} ");
s.push_str(
stats_struct!(@openmetrics_val(self) $field_name $($field_type)?)
);
s.push_str("\n");
)*
s
}

$vis fn [<inc_ $field_name>](&self, nb: usize) {
self.$field_name.fetch_add(nb, Ordering::Relaxed);
$vis fn openmetrics_text(&self) -> String {
let mut s = String::new();
$(
$(
s.push_str("# HELP ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($help);
s.push_str("\n");
)?
$(
s.push_str("# TYPE ");
s.push_str(stringify!($field_name));
s.push_str(" ");
s.push_str($type);
s.push_str("\n");
)?
stats_struct!(@openmetrics(self, s) $field_name $($field_type)?);
)*
s
}
)*
}

impl Default for $struct_name {
fn default() -> $struct_name {
$struct_name {
$($field_name: AtomicUsize::new(0),)*
impl Default for [<$struct_name Report>] {
fn default() -> Self {
Self {
$($field_name: stats_struct!(@report_default $($field_type)?),)*
}
}
}
Expand All @@ -68,44 +169,103 @@ macro_rules! stats_struct {

use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicUsize, Ordering};
stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct DiscriminatedStats {
pub user,
pub admin,
}
}

stats_struct! {
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TransportStats {
# HELP "Counter of sent bytes."
# TYPE "counter"
pub tx_bytes,

# HELP "Counter of sent transport messages."
# TYPE "counter"
pub tx_t_msgs,

# HELP "Counter of sent network messages."
# TYPE "counter"
pub tx_n_msgs,

# HELP "Counter of dropped network messages."
# TYPE "counter"
pub tx_n_dropped,
pub tx_z_put_user_msgs,
pub tx_z_put_user_pl_bytes,
pub tx_z_put_admin_msgs,
pub tx_z_put_admin_pl_bytes,
pub tx_z_del_user_msgs,
pub tx_z_del_admin_msgs,
pub tx_z_query_user_msgs,
pub tx_z_query_user_pl_bytes,
pub tx_z_query_admin_msgs,
pub tx_z_query_admin_pl_bytes,
pub tx_z_reply_user_msgs,
pub tx_z_reply_user_pl_bytes,
pub tx_z_reply_admin_msgs,
pub tx_z_reply_admin_pl_bytes,
pub tx_bytes,

# HELP "Counter of sent zenoh put messages."
# TYPE "counter"
pub tx_z_put_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh put message payloads."
# TYPE "counter"
pub tx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh del messages."
# TYPE "counter"
pub tx_z_del_msgs DiscriminatedStats,

# HELP "Counter of sent zenoh query messages."
# TYPE "counter"
pub tx_z_query_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh query message payloads."
# TYPE "counter"
pub tx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of sent zenoh reply messages."
# TYPE "counter"
pub tx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of sent bytes in zenoh reply message payloads."
# TYPE "counter"
pub tx_z_reply_pl_bytes DiscriminatedStats,

# HELP "Counter of received bytes."
# TYPE "counter"
pub rx_bytes,

# HELP "Counter of received transport messages."
# TYPE "counter"
pub rx_t_msgs,

# HELP "Counter of received network messages."
# TYPE "counter"
pub rx_n_msgs,
pub rx_z_put_user_msgs,
pub rx_z_put_user_pl_bytes,
pub rx_z_put_admin_msgs,
pub rx_z_put_admin_pl_bytes,
pub rx_z_del_user_msgs,
pub rx_z_del_admin_msgs,
pub rx_z_query_user_msgs,
pub rx_z_query_user_pl_bytes,
pub rx_z_query_admin_msgs,
pub rx_z_query_admin_pl_bytes,
pub rx_z_reply_user_msgs,
pub rx_z_reply_user_pl_bytes,
pub rx_z_reply_admin_msgs,
pub rx_z_reply_admin_pl_bytes,
pub rx_bytes,

# HELP "Counter of dropped network messages."
# TYPE "counter"
pub rx_n_dropped,

# HELP "Counter of received zenoh put messages."
# TYPE "counter"
pub rx_z_put_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh put message payloads."
# TYPE "counter"
pub rx_z_put_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh del messages."
# TYPE "counter"
pub rx_z_del_msgs DiscriminatedStats,

# HELP "Counter of received zenoh query messages."
# TYPE "counter"
pub rx_z_query_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh query message payloads."
# TYPE "counter"
pub rx_z_query_pl_bytes DiscriminatedStats,

# HELP "Counter of received zenoh reply messages."
# TYPE "counter"
pub rx_z_reply_msgs DiscriminatedStats,

# HELP "Counter of received bytes in zenoh reply message payloads."
# TYPE "counter"
pub rx_z_reply_pl_bytes DiscriminatedStats,
}
}
9 changes: 9 additions & 0 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ pub struct TransportManager {
pub(crate) locator_inspector: zenoh_link::LocatorInspector,
pub(crate) new_unicast_link_sender: NewLinkChannelSender,
pub(crate) tx_executor: TransportExecutor,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
}

impl TransportManager {
Expand All @@ -379,6 +381,8 @@ impl TransportManager {
locator_inspector: Default::default(),
new_unicast_link_sender,
tx_executor: TransportExecutor::new(tx_threads),
#[cfg(feature = "stats")]
stats: std::sync::Arc::new(crate::stats::TransportStats::default()),
};

// @TODO: this should be moved into the unicast module
Expand All @@ -402,6 +406,11 @@ impl TransportManager {
self.config.zid
}

#[cfg(feature = "stats")]
pub fn get_stats(&self) -> std::sync::Arc<crate::stats::TransportStats> {
self.stats.clone()
}

pub async fn close(&self) {
self.close_unicast().await;
self.tx_executor.stop().await;
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ impl TransportMulticastInner {
bail!("Invalid QoS configuration");
}

#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));

let ti = TransportMulticastInner {
manager,
priority_tx: priority_tx.into_boxed_slice().into(),
Expand All @@ -131,7 +134,7 @@ impl TransportMulticastInner {
callback: Arc::new(RwLock::new(None)),
timer: Arc::new(Timer::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
};

let link = TransportLinkMulticast::new(ti.clone(), config.link);
Expand Down
4 changes: 3 additions & 1 deletion io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ impl TransportUnicastLowlatency {
config: TransportConfigUnicast,
link: LinkUnicast,
) -> ZResult<TransportUnicastLowlatency> {
#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));
let t = TransportUnicastLowlatency {
manager,
config,
link: Arc::new(RwLock::new(link)),
callback: Arc::new(SyncRwLock::new(None)),
alive: Arc::new(AsyncMutex::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
handle_keepalive: Arc::new(RwLock::new(None)),
handle_rx: Arc::new(RwLock::new(None)),
};
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-transport/src/unicast/universal/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ impl TransportUnicastUniversal {
c.sync(initial_sn)?;
}

#[cfg(feature = "stats")]
let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone())));

let t = TransportUnicastUniversal {
manager,
config,
Expand All @@ -111,7 +114,7 @@ impl TransportUnicastUniversal {
callback: Arc::new(RwLock::new(None)),
alive: Arc::new(AsyncMutex::new(false)),
#[cfg(feature = "stats")]
stats: Arc::new(TransportStats::default()),
stats,
};

Ok(t)
Expand Down
Loading
Loading