diff --git a/io/zenoh-transport/src/common/stats.rs b/io/zenoh-transport/src/common/stats.rs index 287018aeb0..f095a58273 100644 --- a/io/zenoh-transport/src/common/stats.rs +++ b/io/zenoh-transport/src/common/stats.rs @@ -12,32 +12,89 @@ // ZettaScale Zenoh Team, // macro_rules! stats_struct { + (@field_type ) => {AtomicUsize}; + (@field_type $field_type:ident) => {std::sync::Arc<$field_type>}; + (@report_field_type ) => {usize}; + (@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}}; + (@new($parent:expr) ) => {AtomicUsize::new(0)}; + (@new($parent:expr) $field_type:ident) => {std::sync::Arc::new($field_type::new($parent))}; + (@report_default ) => {0}; + (@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}}; + (@get $vis:vis $field_name:ident) => { + paste::paste! { + $vis fn [](&self) -> usize { + self.$field_name.load(Ordering::Relaxed) + } + } + }; + (@get $vis:vis $field_name:ident $field_type:ident) => { + paste::paste! { + $vis fn [](&self) -> [<$field_type Report>] { + self.$field_name.report() + } + } + }; + (@increment $vis:vis $field_name:ident) => { + paste::paste! { + $vis fn [](&self, nb: usize) { + self.$field_name.fetch_add(nb, Ordering::Relaxed); + if let Some(parent) = self.parent.as_ref() { + parent.[](nb); + } + } + } + }; + (@increment $vis:vis $field_name:ident $field_type:ident) => {}; + (@openmetrics($stats:expr, $string:expr) $field_name:ident) => { + $string.push_str(stringify!($field_name)); + $string.push_str(" "); + $string.push_str($stats.$field_name.to_string().as_str()); + $string.push_str("\n"); + }; + (@openmetrics($stats:expr, $string:expr) $field_name:ident $field_type:ident) => { + $string.push_str(&$stats.$field_name.sub_openmetrics_text(stringify!($field_name))); + }; + (@openmetrics_val($stats:expr) $field_name:ident) => { + $stats.$field_name.to_string().as_str() + }; + (@openmetrics_val($stats:expr) $field_name:ident $field_type:ident) => {""}; ( $(#[$meta:meta])* $vis:vis struct $struct_name:ident { + $( - $(#[$field_meta:meta])* - $field_vis:vis $field_name:ident, + $(# HELP $help:literal)? + $(# TYPE $type:literal)? + $(#[$field_meta:meta])* + $field_vis:vis $field_name:ident $($field_type:ident)?, )* } ) => { paste::paste! { $vis struct $struct_name { + parent: Option>, $( - $(#[$field_meta:meta])* - $field_vis $field_name: AtomicUsize, + $(#[$field_meta])* + $field_vis $field_name: stats_struct!(@field_type $($field_type)?), )* } $(#[$meta])* $vis struct [<$struct_name Report>] { $( - $(#[$field_meta:meta])* - $field_vis $field_name: usize, + $(#[$field_meta])* + $field_vis $field_name: stats_struct!(@report_field_type $($field_type)?), )* } impl $struct_name { + $vis fn new(parent: Option>) -> Self { + $struct_name { + parent: parent.clone(), + $($field_name: stats_struct!(@new(parent.as_ref().map(|p|p.$field_name.clone())) $($field_type)?),)* + } + } + $vis fn report(&self) -> [<$struct_name Report>] { [<$struct_name Report>] { $($field_name: self.[](),)* @@ -45,20 +102,64 @@ macro_rules! stats_struct { } $( - $vis fn [](&self) -> usize { - self.$field_name.load(Ordering::Relaxed) + stats_struct!(@get $vis $field_name $($field_type)?); + stats_struct!(@increment $vis $field_name $($field_type)?); + )* + } + + impl Default for $struct_name { + fn default() -> Self { + Self { + parent: None, + $($field_name: stats_struct!(@new(None) $($field_type)?),)* + } + } + } + + impl [<$struct_name Report>] { + #[allow(dead_code)] + fn sub_openmetrics_text(&self, prefix: &str) -> String { + let mut s = String::new(); + $( + s.push_str(prefix); + s.push_str("{space=\""); + s.push_str(stringify!($field_name)); + s.push_str("\"} "); + s.push_str( + stats_struct!(@openmetrics_val(self) $field_name $($field_type)?) + ); + s.push_str("\n"); + )* + s } - $vis fn [](&self, nb: usize) { - self.$field_name.fetch_add(nb, Ordering::Relaxed); + $vis fn openmetrics_text(&self) -> String { + let mut s = String::new(); + $( + $( + s.push_str("# HELP "); + s.push_str(stringify!($field_name)); + s.push_str(" "); + s.push_str($help); + s.push_str("\n"); + )? + $( + s.push_str("# TYPE "); + s.push_str(stringify!($field_name)); + s.push_str(" "); + s.push_str($type); + s.push_str("\n"); + )? + stats_struct!(@openmetrics(self, s) $field_name $($field_type)?); + )* + s } - )* } - impl Default for $struct_name { - fn default() -> $struct_name { - $struct_name { - $($field_name: AtomicUsize::new(0),)* + impl Default for [<$struct_name Report>] { + fn default() -> Self { + Self { + $($field_name: stats_struct!(@report_default $($field_type)?),)* } } } @@ -68,44 +169,103 @@ macro_rules! stats_struct { use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicUsize, Ordering}; +stats_struct! { + #[derive(Clone, Debug, Deserialize, Serialize)] + pub struct DiscriminatedStats { + pub user, + pub admin, + } +} + stats_struct! { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TransportStats { + # HELP "Counter of sent bytes." + # TYPE "counter" + pub tx_bytes, + + # HELP "Counter of sent transport messages." + # TYPE "counter" pub tx_t_msgs, + + # HELP "Counter of sent network messages." + # TYPE "counter" pub tx_n_msgs, + + # HELP "Counter of dropped network messages." + # TYPE "counter" pub tx_n_dropped, - pub tx_z_put_user_msgs, - pub tx_z_put_user_pl_bytes, - pub tx_z_put_admin_msgs, - pub tx_z_put_admin_pl_bytes, - pub tx_z_del_user_msgs, - pub tx_z_del_admin_msgs, - pub tx_z_query_user_msgs, - pub tx_z_query_user_pl_bytes, - pub tx_z_query_admin_msgs, - pub tx_z_query_admin_pl_bytes, - pub tx_z_reply_user_msgs, - pub tx_z_reply_user_pl_bytes, - pub tx_z_reply_admin_msgs, - pub tx_z_reply_admin_pl_bytes, - pub tx_bytes, + # HELP "Counter of sent zenoh put messages." + # TYPE "counter" + pub tx_z_put_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh put message payloads." + # TYPE "counter" + pub tx_z_put_pl_bytes DiscriminatedStats, + + # HELP "Counter of sent zenoh del messages." + # TYPE "counter" + pub tx_z_del_msgs DiscriminatedStats, + + # HELP "Counter of sent zenoh query messages." + # TYPE "counter" + pub tx_z_query_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh query message payloads." + # TYPE "counter" + pub tx_z_query_pl_bytes DiscriminatedStats, + + # HELP "Counter of sent zenoh reply messages." + # TYPE "counter" + pub tx_z_reply_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh reply message payloads." + # TYPE "counter" + pub tx_z_reply_pl_bytes DiscriminatedStats, + + # HELP "Counter of received bytes." + # TYPE "counter" + pub rx_bytes, + + # HELP "Counter of received transport messages." + # TYPE "counter" pub rx_t_msgs, + + # HELP "Counter of received network messages." + # TYPE "counter" pub rx_n_msgs, - pub rx_z_put_user_msgs, - pub rx_z_put_user_pl_bytes, - pub rx_z_put_admin_msgs, - pub rx_z_put_admin_pl_bytes, - pub rx_z_del_user_msgs, - pub rx_z_del_admin_msgs, - pub rx_z_query_user_msgs, - pub rx_z_query_user_pl_bytes, - pub rx_z_query_admin_msgs, - pub rx_z_query_admin_pl_bytes, - pub rx_z_reply_user_msgs, - pub rx_z_reply_user_pl_bytes, - pub rx_z_reply_admin_msgs, - pub rx_z_reply_admin_pl_bytes, - pub rx_bytes, + + # HELP "Counter of dropped network messages." + # TYPE "counter" + pub rx_n_dropped, + + # HELP "Counter of received zenoh put messages." + # TYPE "counter" + pub rx_z_put_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh put message payloads." + # TYPE "counter" + pub rx_z_put_pl_bytes DiscriminatedStats, + + # HELP "Counter of received zenoh del messages." + # TYPE "counter" + pub rx_z_del_msgs DiscriminatedStats, + + # HELP "Counter of received zenoh query messages." + # TYPE "counter" + pub rx_z_query_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh query message payloads." + # TYPE "counter" + pub rx_z_query_pl_bytes DiscriminatedStats, + + # HELP "Counter of received zenoh reply messages." + # TYPE "counter" + pub rx_z_reply_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh reply message payloads." + # TYPE "counter" + pub rx_z_reply_pl_bytes DiscriminatedStats, } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index c6a4ed5781..6847b12dd8 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -358,6 +358,8 @@ pub struct TransportManager { pub(crate) locator_inspector: zenoh_link::LocatorInspector, pub(crate) new_unicast_link_sender: NewLinkChannelSender, pub(crate) tx_executor: TransportExecutor, + #[cfg(feature = "stats")] + pub(crate) stats: Arc, } impl TransportManager { @@ -379,6 +381,8 @@ impl TransportManager { locator_inspector: Default::default(), new_unicast_link_sender, tx_executor: TransportExecutor::new(tx_threads), + #[cfg(feature = "stats")] + stats: std::sync::Arc::new(crate::stats::TransportStats::default()), }; // @TODO: this should be moved into the unicast module @@ -402,6 +406,11 @@ impl TransportManager { self.config.zid } + #[cfg(feature = "stats")] + pub fn get_stats(&self) -> std::sync::Arc { + self.stats.clone() + } + pub async fn close(&self) { self.close_unicast().await; self.tx_executor.stop().await; diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 9d89fabc4b..c4412447cf 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -122,6 +122,9 @@ impl TransportMulticastInner { bail!("Invalid QoS configuration"); } + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + let ti = TransportMulticastInner { manager, priority_tx: priority_tx.into_boxed_slice().into(), @@ -131,7 +134,7 @@ impl TransportMulticastInner { callback: Arc::new(RwLock::new(None)), timer: Arc::new(Timer::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, }; let link = TransportLinkMulticast::new(ti.clone(), config.link); diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index f00876a2e8..ea97aa143b 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -74,6 +74,8 @@ impl TransportUnicastLowlatency { config: TransportConfigUnicast, link: LinkUnicast, ) -> ZResult { + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); let t = TransportUnicastLowlatency { manager, config, @@ -81,7 +83,7 @@ impl TransportUnicastLowlatency { callback: Arc::new(SyncRwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, handle_keepalive: Arc::new(RwLock::new(None)), handle_rx: Arc::new(RwLock::new(None)), }; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 5a9d115494..5c17b36827 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -102,6 +102,9 @@ impl TransportUnicastUniversal { c.sync(initial_sn)?; } + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + let t = TransportUnicastUniversal { manager, config, @@ -111,7 +114,7 @@ impl TransportUnicastUniversal { callback: Arc::new(RwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, }; Ok(t) diff --git a/zenoh/src/net/routing/pubsub.rs b/zenoh/src/net/routing/pubsub.rs index f45b8029e3..3deba60260 100644 --- a/zenoh/src/net/routing/pubsub.rs +++ b/zenoh/src/net/routing/pubsub.rs @@ -1732,10 +1732,12 @@ macro_rules! inc_stats { use zenoh_buffers::SplitBuffer; match &$body { PushBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + } + PushBody::Del(_) => { + stats.[<$txrx _z_del_msgs>].[](1); } - PushBody::Del(_) => stats.[](1), } } } diff --git a/zenoh/src/net/routing/queries.rs b/zenoh/src/net/routing/queries.rs index e2608a0066..191c0a8071 100644 --- a/zenoh/src/net/routing/queries.rs +++ b/zenoh/src/net/routing/queries.rs @@ -2002,13 +2002,15 @@ macro_rules! inc_req_stats { use zenoh_buffers::SplitBuffer; match &$body { RequestBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + } + RequestBody::Del(_) => { + stats.[<$txrx _z_del_msgs>].[](1); } - RequestBody::Del(_) => stats.[](1), RequestBody::Query(q) => { - stats.[](1); - stats.[]( + stats.[<$txrx _z_query_msgs>].[](1); + stats.[<$txrx _z_query_pl_bytes>].[]( q.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } @@ -2032,16 +2034,16 @@ macro_rules! inc_res_stats { use zenoh_buffers::SplitBuffer; match &$body { ResponseBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); } ResponseBody::Reply(r) => { - stats.[](1); - stats.[](r.payload.len()); + stats.[<$txrx _z_reply_msgs>].[](1); + stats.[<$txrx _z_reply_pl_bytes>].[](r.payload.len()); } ResponseBody::Err(e) => { - stats.[](1); - stats.[]( + stats.[<$txrx _z_reply_msgs>].[](1); + stats.[<$txrx _z_reply_pl_bytes>].[]( e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 23bb47d9b9..0eb099a098 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -72,6 +72,10 @@ impl AdminSpace { let mut handlers: HashMap<_, Handler> = HashMap::new(); handlers.insert(root_key.clone(), Arc::new(router_data)); + handlers.insert( + format!("@/router/{zid_str}/metrics").try_into().unwrap(), + Arc::new(router_metrics), + ); handlers.insert( format!("@/router/{zid_str}/linkstate/routers") .try_into() @@ -450,7 +454,8 @@ fn router_data(context: &AdminContext, query: Query) { .map(transport_to_json) .collect(); - let json = json!({ + #[allow(unused_mut)] + let mut json = json!({ "zid": context.zid_str, "version": context.version, "metadata": context.metadata, @@ -458,6 +463,19 @@ fn router_data(context: &AdminContext, query: Query) { "sessions": transports, "plugins": plugins, }); + + #[cfg(feature = "stats")] + { + let stats = crate::prelude::Parameters::decode(&query.selector()) + .any(|(k, v)| k.as_ref() == "_stats" && v != "false"); + if stats { + json.as_object_mut().unwrap().insert( + "stats".to_string(), + json!(transport_mgr.get_stats().report()), + ); + } + } + log::trace!("AdminSpace router_data: {:?}", json); if let Err(e) = query .reply(Ok(Sample::new( @@ -471,6 +489,40 @@ fn router_data(context: &AdminContext, query: Query) { } } +fn router_metrics(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!("@/router/{}/metrics", context.zid_str) + .try_into() + .unwrap(); + #[allow(unused_mut)] + let mut metrics = format!( + r#"# HELP zenoh_build Informations about zenoh. +# TYPE zenoh_build gauge +zenoh_build{{version="{}"}} 1 +"#, + context.version + ); + + #[cfg(feature = "stats")] + metrics.push_str( + &context + .runtime + .manager() + .get_stats() + .report() + .openmetrics_text(), + ); + + if let Err(e) = query + .reply(Ok(Sample::new( + reply_key, + Value::from(metrics.as_bytes().to_vec()).encoding(KnownEncoding::TextPlain.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } +} + fn routers_linkstate_data(context: &AdminContext, query: Query) { let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/routers", context.zid_str) .try_into()