diff --git a/io/zenoh-transport/src/common/stats.rs b/io/zenoh-transport/src/common/stats.rs index 287018aeb0..f095a58273 100644 --- a/io/zenoh-transport/src/common/stats.rs +++ b/io/zenoh-transport/src/common/stats.rs @@ -12,32 +12,89 @@ // ZettaScale Zenoh Team, // macro_rules! stats_struct { + (@field_type ) => {AtomicUsize}; + (@field_type $field_type:ident) => {std::sync::Arc<$field_type>}; + (@report_field_type ) => {usize}; + (@report_field_type $field_type:ident) => {paste::paste! {[<$field_type Report>]}}; + (@new($parent:expr) ) => {AtomicUsize::new(0)}; + (@new($parent:expr) $field_type:ident) => {std::sync::Arc::new($field_type::new($parent))}; + (@report_default ) => {0}; + (@report_default $field_type:ident) => {paste::paste! {[<$field_type Report>]::default()}}; + (@get $vis:vis $field_name:ident) => { + paste::paste! { + $vis fn [](&self) -> usize { + self.$field_name.load(Ordering::Relaxed) + } + } + }; + (@get $vis:vis $field_name:ident $field_type:ident) => { + paste::paste! { + $vis fn [](&self) -> [<$field_type Report>] { + self.$field_name.report() + } + } + }; + (@increment $vis:vis $field_name:ident) => { + paste::paste! { + $vis fn [](&self, nb: usize) { + self.$field_name.fetch_add(nb, Ordering::Relaxed); + if let Some(parent) = self.parent.as_ref() { + parent.[](nb); + } + } + } + }; + (@increment $vis:vis $field_name:ident $field_type:ident) => {}; + (@openmetrics($stats:expr, $string:expr) $field_name:ident) => { + $string.push_str(stringify!($field_name)); + $string.push_str(" "); + $string.push_str($stats.$field_name.to_string().as_str()); + $string.push_str("\n"); + }; + (@openmetrics($stats:expr, $string:expr) $field_name:ident $field_type:ident) => { + $string.push_str(&$stats.$field_name.sub_openmetrics_text(stringify!($field_name))); + }; + (@openmetrics_val($stats:expr) $field_name:ident) => { + $stats.$field_name.to_string().as_str() + }; + (@openmetrics_val($stats:expr) $field_name:ident $field_type:ident) => {""}; ( $(#[$meta:meta])* $vis:vis struct $struct_name:ident { + $( - $(#[$field_meta:meta])* - $field_vis:vis $field_name:ident, + $(# HELP $help:literal)? + $(# TYPE $type:literal)? + $(#[$field_meta:meta])* + $field_vis:vis $field_name:ident $($field_type:ident)?, )* } ) => { paste::paste! { $vis struct $struct_name { + parent: Option>, $( - $(#[$field_meta:meta])* - $field_vis $field_name: AtomicUsize, + $(#[$field_meta])* + $field_vis $field_name: stats_struct!(@field_type $($field_type)?), )* } $(#[$meta])* $vis struct [<$struct_name Report>] { $( - $(#[$field_meta:meta])* - $field_vis $field_name: usize, + $(#[$field_meta])* + $field_vis $field_name: stats_struct!(@report_field_type $($field_type)?), )* } impl $struct_name { + $vis fn new(parent: Option>) -> Self { + $struct_name { + parent: parent.clone(), + $($field_name: stats_struct!(@new(parent.as_ref().map(|p|p.$field_name.clone())) $($field_type)?),)* + } + } + $vis fn report(&self) -> [<$struct_name Report>] { [<$struct_name Report>] { $($field_name: self.[](),)* @@ -45,20 +102,64 @@ macro_rules! stats_struct { } $( - $vis fn [](&self) -> usize { - self.$field_name.load(Ordering::Relaxed) + stats_struct!(@get $vis $field_name $($field_type)?); + stats_struct!(@increment $vis $field_name $($field_type)?); + )* + } + + impl Default for $struct_name { + fn default() -> Self { + Self { + parent: None, + $($field_name: stats_struct!(@new(None) $($field_type)?),)* + } + } + } + + impl [<$struct_name Report>] { + #[allow(dead_code)] + fn sub_openmetrics_text(&self, prefix: &str) -> String { + let mut s = String::new(); + $( + s.push_str(prefix); + s.push_str("{space=\""); + s.push_str(stringify!($field_name)); + s.push_str("\"} "); + s.push_str( + stats_struct!(@openmetrics_val(self) $field_name $($field_type)?) + ); + s.push_str("\n"); + )* + s } - $vis fn [](&self, nb: usize) { - self.$field_name.fetch_add(nb, Ordering::Relaxed); + $vis fn openmetrics_text(&self) -> String { + let mut s = String::new(); + $( + $( + s.push_str("# HELP "); + s.push_str(stringify!($field_name)); + s.push_str(" "); + s.push_str($help); + s.push_str("\n"); + )? + $( + s.push_str("# TYPE "); + s.push_str(stringify!($field_name)); + s.push_str(" "); + s.push_str($type); + s.push_str("\n"); + )? + stats_struct!(@openmetrics(self, s) $field_name $($field_type)?); + )* + s } - )* } - impl Default for $struct_name { - fn default() -> $struct_name { - $struct_name { - $($field_name: AtomicUsize::new(0),)* + impl Default for [<$struct_name Report>] { + fn default() -> Self { + Self { + $($field_name: stats_struct!(@report_default $($field_type)?),)* } } } @@ -68,44 +169,103 @@ macro_rules! stats_struct { use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicUsize, Ordering}; +stats_struct! { + #[derive(Clone, Debug, Deserialize, Serialize)] + pub struct DiscriminatedStats { + pub user, + pub admin, + } +} + stats_struct! { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct TransportStats { + # HELP "Counter of sent bytes." + # TYPE "counter" + pub tx_bytes, + + # HELP "Counter of sent transport messages." + # TYPE "counter" pub tx_t_msgs, + + # HELP "Counter of sent network messages." + # TYPE "counter" pub tx_n_msgs, + + # HELP "Counter of dropped network messages." + # TYPE "counter" pub tx_n_dropped, - pub tx_z_put_user_msgs, - pub tx_z_put_user_pl_bytes, - pub tx_z_put_admin_msgs, - pub tx_z_put_admin_pl_bytes, - pub tx_z_del_user_msgs, - pub tx_z_del_admin_msgs, - pub tx_z_query_user_msgs, - pub tx_z_query_user_pl_bytes, - pub tx_z_query_admin_msgs, - pub tx_z_query_admin_pl_bytes, - pub tx_z_reply_user_msgs, - pub tx_z_reply_user_pl_bytes, - pub tx_z_reply_admin_msgs, - pub tx_z_reply_admin_pl_bytes, - pub tx_bytes, + # HELP "Counter of sent zenoh put messages." + # TYPE "counter" + pub tx_z_put_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh put message payloads." + # TYPE "counter" + pub tx_z_put_pl_bytes DiscriminatedStats, + + # HELP "Counter of sent zenoh del messages." + # TYPE "counter" + pub tx_z_del_msgs DiscriminatedStats, + + # HELP "Counter of sent zenoh query messages." + # TYPE "counter" + pub tx_z_query_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh query message payloads." + # TYPE "counter" + pub tx_z_query_pl_bytes DiscriminatedStats, + + # HELP "Counter of sent zenoh reply messages." + # TYPE "counter" + pub tx_z_reply_msgs DiscriminatedStats, + + # HELP "Counter of sent bytes in zenoh reply message payloads." + # TYPE "counter" + pub tx_z_reply_pl_bytes DiscriminatedStats, + + # HELP "Counter of received bytes." + # TYPE "counter" + pub rx_bytes, + + # HELP "Counter of received transport messages." + # TYPE "counter" pub rx_t_msgs, + + # HELP "Counter of received network messages." + # TYPE "counter" pub rx_n_msgs, - pub rx_z_put_user_msgs, - pub rx_z_put_user_pl_bytes, - pub rx_z_put_admin_msgs, - pub rx_z_put_admin_pl_bytes, - pub rx_z_del_user_msgs, - pub rx_z_del_admin_msgs, - pub rx_z_query_user_msgs, - pub rx_z_query_user_pl_bytes, - pub rx_z_query_admin_msgs, - pub rx_z_query_admin_pl_bytes, - pub rx_z_reply_user_msgs, - pub rx_z_reply_user_pl_bytes, - pub rx_z_reply_admin_msgs, - pub rx_z_reply_admin_pl_bytes, - pub rx_bytes, + + # HELP "Counter of dropped network messages." + # TYPE "counter" + pub rx_n_dropped, + + # HELP "Counter of received zenoh put messages." + # TYPE "counter" + pub rx_z_put_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh put message payloads." + # TYPE "counter" + pub rx_z_put_pl_bytes DiscriminatedStats, + + # HELP "Counter of received zenoh del messages." + # TYPE "counter" + pub rx_z_del_msgs DiscriminatedStats, + + # HELP "Counter of received zenoh query messages." + # TYPE "counter" + pub rx_z_query_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh query message payloads." + # TYPE "counter" + pub rx_z_query_pl_bytes DiscriminatedStats, + + # HELP "Counter of received zenoh reply messages." + # TYPE "counter" + pub rx_z_reply_msgs DiscriminatedStats, + + # HELP "Counter of received bytes in zenoh reply message payloads." + # TYPE "counter" + pub rx_z_reply_pl_bytes DiscriminatedStats, } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index c6a4ed5781..6847b12dd8 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -358,6 +358,8 @@ pub struct TransportManager { pub(crate) locator_inspector: zenoh_link::LocatorInspector, pub(crate) new_unicast_link_sender: NewLinkChannelSender, pub(crate) tx_executor: TransportExecutor, + #[cfg(feature = "stats")] + pub(crate) stats: Arc, } impl TransportManager { @@ -379,6 +381,8 @@ impl TransportManager { locator_inspector: Default::default(), new_unicast_link_sender, tx_executor: TransportExecutor::new(tx_threads), + #[cfg(feature = "stats")] + stats: std::sync::Arc::new(crate::stats::TransportStats::default()), }; // @TODO: this should be moved into the unicast module @@ -402,6 +406,11 @@ impl TransportManager { self.config.zid } + #[cfg(feature = "stats")] + pub fn get_stats(&self) -> std::sync::Arc { + self.stats.clone() + } + pub async fn close(&self) { self.close_unicast().await; self.tx_executor.stop().await; diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index 9d89fabc4b..c4412447cf 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -122,6 +122,9 @@ impl TransportMulticastInner { bail!("Invalid QoS configuration"); } + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + let ti = TransportMulticastInner { manager, priority_tx: priority_tx.into_boxed_slice().into(), @@ -131,7 +134,7 @@ impl TransportMulticastInner { callback: Arc::new(RwLock::new(None)), timer: Arc::new(Timer::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, }; let link = TransportLinkMulticast::new(ti.clone(), config.link); diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index f00876a2e8..ea97aa143b 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -74,6 +74,8 @@ impl TransportUnicastLowlatency { config: TransportConfigUnicast, link: LinkUnicast, ) -> ZResult { + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); let t = TransportUnicastLowlatency { manager, config, @@ -81,7 +83,7 @@ impl TransportUnicastLowlatency { callback: Arc::new(SyncRwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, handle_keepalive: Arc::new(RwLock::new(None)), handle_rx: Arc::new(RwLock::new(None)), }; diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 5a9d115494..5c17b36827 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -102,6 +102,9 @@ impl TransportUnicastUniversal { c.sync(initial_sn)?; } + #[cfg(feature = "stats")] + let stats = Arc::new(TransportStats::new(Some(manager.get_stats().clone()))); + let t = TransportUnicastUniversal { manager, config, @@ -111,7 +114,7 @@ impl TransportUnicastUniversal { callback: Arc::new(RwLock::new(None)), alive: Arc::new(AsyncMutex::new(false)), #[cfg(feature = "stats")] - stats: Arc::new(TransportStats::default()), + stats, }; Ok(t) diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 3f6795505e..b544b3899f 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -43,6 +43,7 @@ const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = lazy_static::lazy_static! { static ref LONG_VERSION: String = format!("{} built with {}", GIT_VERSION, env!("RUSTC_VERSION")); } +const RAW_KEY: &str = "_raw"; fn value_to_json(value: Value) -> String { // @TODO: transcode to JSON when implemented in Value @@ -108,6 +109,14 @@ async fn to_json(results: flume::Receiver) -> String { format!("[\n{values}\n]\n") } +async fn to_json_response(results: flume::Receiver) -> Response { + response( + StatusCode::Ok, + Mime::from_str("application/json").unwrap(), + &to_json(results).await, + ) +} + fn sample_to_html(sample: Sample) -> String { format!( "
{}
\n
{}
\n", @@ -138,6 +147,28 @@ async fn to_html(results: flume::Receiver) -> String { format!("
\n{values}\n
\n") } +async fn to_html_response(results: flume::Receiver) -> Response { + response(StatusCode::Ok, "text/html", &to_html(results).await) +} + +async fn to_raw_response(results: flume::Receiver) -> Response { + match results.recv_async().await { + Ok(reply) => match reply.sample { + Ok(sample) => response( + StatusCode::Ok, + sample.value.encoding.to_string().as_ref(), + String::from_utf8_lossy(&sample.payload.contiguous()).as_ref(), + ), + Err(value) => response( + StatusCode::Ok, + value.encoding.to_string().as_ref(), + String::from_utf8_lossy(&value.payload.contiguous()).as_ref(), + ), + }, + Err(_) => response(StatusCode::Ok, "", ""), + } +} + fn method_to_kind(method: Method) -> SampleKind { match method { Method::Put => SampleKind::Put, @@ -146,13 +177,15 @@ fn method_to_kind(method: Method) -> SampleKind { } } -fn response(status: StatusCode, content_type: Mime, body: &str) -> Response { - Response::builder(status) +fn response(status: StatusCode, content_type: impl TryInto, body: &str) -> Response { + let mut builder = Response::builder(status) .header("content-length", body.len().to_string()) .header("Access-Control-Allow-Origin", "*") - .content_type(content_type) - .body(body) - .build() + .body(body); + if let Ok(mime) = content_type.try_into() { + builder = builder.content_type(mime); + } + builder.build() } zenoh_plugin_trait::declare_plugin!(RestPlugin); @@ -347,7 +380,7 @@ async fn query(req: Request<(Arc, String)>) -> tide::Result { Err(e) => { return Ok(response( StatusCode::BadRequest, - Mime::from_str("text/plain").unwrap(), + "text/plain", &e.to_string(), )) } @@ -363,6 +396,7 @@ async fn query(req: Request<(Arc, String)>) -> tide::Result { } else { QueryConsolidation::from(zenoh::query::ConsolidationMode::Latest) }; + let raw = selector.decode().any(|(k, _)| k.as_ref() == RAW_KEY); match req .state() .0 @@ -372,23 +406,17 @@ async fn query(req: Request<(Arc, String)>) -> tide::Result { .await { Ok(receiver) => { - if first_accept == "text/html" { - Ok(response( - StatusCode::Ok, - Mime::from_str("text/html").unwrap(), - &to_html(receiver).await, - )) + if raw { + Ok(to_raw_response(receiver).await) + } else if first_accept == "text/html" { + Ok(to_html_response(receiver).await) } else { - Ok(response( - StatusCode::Ok, - Mime::from_str("application/json").unwrap(), - &to_json(receiver).await, - )) + Ok(to_json_response(receiver).await) } } Err(e) => Ok(response( StatusCode::InternalServerError, - Mime::from_str("text/plain").unwrap(), + "text/plain", &e.to_string(), )), } @@ -404,7 +432,7 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result { return Ok(response( StatusCode::BadRequest, - Mime::from_str("text/plain").unwrap(), + "text/plain", &e.to_string(), )) } @@ -427,14 +455,14 @@ async fn write(mut req: Request<(Arc, String)>) -> tide::Result Ok(Response::new(StatusCode::Ok)), Err(e) => Ok(response( StatusCode::InternalServerError, - Mime::from_str("text/plain").unwrap(), + "text/plain", &e.to_string(), )), } } Err(e) => Ok(response( StatusCode::NoContent, - Mime::from_str("text/plain").unwrap(), + "text/plain", &e.to_string(), )), } diff --git a/zenoh/src/net/routing/pubsub.rs b/zenoh/src/net/routing/pubsub.rs index f45b8029e3..3deba60260 100644 --- a/zenoh/src/net/routing/pubsub.rs +++ b/zenoh/src/net/routing/pubsub.rs @@ -1732,10 +1732,12 @@ macro_rules! inc_stats { use zenoh_buffers::SplitBuffer; match &$body { PushBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + } + PushBody::Del(_) => { + stats.[<$txrx _z_del_msgs>].[](1); } - PushBody::Del(_) => stats.[](1), } } } diff --git a/zenoh/src/net/routing/queries.rs b/zenoh/src/net/routing/queries.rs index e2608a0066..191c0a8071 100644 --- a/zenoh/src/net/routing/queries.rs +++ b/zenoh/src/net/routing/queries.rs @@ -2002,13 +2002,15 @@ macro_rules! inc_req_stats { use zenoh_buffers::SplitBuffer; match &$body { RequestBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + } + RequestBody::Del(_) => { + stats.[<$txrx _z_del_msgs>].[](1); } - RequestBody::Del(_) => stats.[](1), RequestBody::Query(q) => { - stats.[](1); - stats.[]( + stats.[<$txrx _z_query_msgs>].[](1); + stats.[<$txrx _z_query_pl_bytes>].[]( q.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } @@ -2032,16 +2034,16 @@ macro_rules! inc_res_stats { use zenoh_buffers::SplitBuffer; match &$body { ResponseBody::Put(p) => { - stats.[](1); - stats.[](p.payload.len()); + stats.[<$txrx _z_put_msgs>].[](1); + stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); } ResponseBody::Reply(r) => { - stats.[](1); - stats.[](r.payload.len()); + stats.[<$txrx _z_reply_msgs>].[](1); + stats.[<$txrx _z_reply_pl_bytes>].[](r.payload.len()); } ResponseBody::Err(e) => { - stats.[](1); - stats.[]( + stats.[<$txrx _z_reply_msgs>].[](1); + stats.[<$txrx _z_reply_pl_bytes>].[]( e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 23bb47d9b9..0eb099a098 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -72,6 +72,10 @@ impl AdminSpace { let mut handlers: HashMap<_, Handler> = HashMap::new(); handlers.insert(root_key.clone(), Arc::new(router_data)); + handlers.insert( + format!("@/router/{zid_str}/metrics").try_into().unwrap(), + Arc::new(router_metrics), + ); handlers.insert( format!("@/router/{zid_str}/linkstate/routers") .try_into() @@ -450,7 +454,8 @@ fn router_data(context: &AdminContext, query: Query) { .map(transport_to_json) .collect(); - let json = json!({ + #[allow(unused_mut)] + let mut json = json!({ "zid": context.zid_str, "version": context.version, "metadata": context.metadata, @@ -458,6 +463,19 @@ fn router_data(context: &AdminContext, query: Query) { "sessions": transports, "plugins": plugins, }); + + #[cfg(feature = "stats")] + { + let stats = crate::prelude::Parameters::decode(&query.selector()) + .any(|(k, v)| k.as_ref() == "_stats" && v != "false"); + if stats { + json.as_object_mut().unwrap().insert( + "stats".to_string(), + json!(transport_mgr.get_stats().report()), + ); + } + } + log::trace!("AdminSpace router_data: {:?}", json); if let Err(e) = query .reply(Ok(Sample::new( @@ -471,6 +489,40 @@ fn router_data(context: &AdminContext, query: Query) { } } +fn router_metrics(context: &AdminContext, query: Query) { + let reply_key: OwnedKeyExpr = format!("@/router/{}/metrics", context.zid_str) + .try_into() + .unwrap(); + #[allow(unused_mut)] + let mut metrics = format!( + r#"# HELP zenoh_build Informations about zenoh. +# TYPE zenoh_build gauge +zenoh_build{{version="{}"}} 1 +"#, + context.version + ); + + #[cfg(feature = "stats")] + metrics.push_str( + &context + .runtime + .manager() + .get_stats() + .report() + .openmetrics_text(), + ); + + if let Err(e) = query + .reply(Ok(Sample::new( + reply_key, + Value::from(metrics.as_bytes().to_vec()).encoding(KnownEncoding::TextPlain.into()), + ))) + .res() + { + log::error!("Error sending AdminSpace reply: {:?}", e); + } +} + fn routers_linkstate_data(context: &AdminContext, query: Query) { let reply_key: OwnedKeyExpr = format!("@/router/{}/linkstate/routers", context.zid_str) .try_into()