diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index c3a633b0e2..0ae9fb6f31 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -70,6 +70,12 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DownsamplerConf { + pub keyexpr: OwnedKeyExpr, + pub threshold_ms: u64, +} + pub type ValidationFunction = std::sync::Arc< dyn Fn( &str, @@ -399,6 +405,11 @@ validated_struct::validator! { }, }, + /// Configuration of the downsampling. + pub downsampling: #[derive(Default)] + DownsamplingConf { + downsamples: Vec, + }, /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. plugins_search_dirs: Vec, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 2fcfdf27c7..9e7c29f6c9 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -84,6 +84,22 @@ impl Tables { // let queries_default_timeout = // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); + + //TODO(sashacmc): add interceptors config reloading there or incapsulate in the interceptors, but it + //will require interface changes + // + //// config reloading sample: + //let cfg_rx = config.subscribe(); + //task::spawn({ + // async move { + // while let Ok(change) = cfg_rx.recv_async().await { + // let change = change.strip_prefix('/').unwrap_or(&change); + // if !change.starts_with("plugins") { + // continue; + // } + // } + //}); + Tables { zid, whatami, diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs new file mode 100644 index 0000000000..8f17081129 --- /dev/null +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -0,0 +1,126 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) + +use crate::net::routing::interceptor::*; +use crate::KeyExpr; +use std::sync::{Arc, Mutex}; +use zenoh_config::DownsamplerConf; + +pub(crate) struct IngressMsgDownsampler { + conf: DownsamplerConf, + latest_message_timestamp: Arc>, +} + +impl InterceptorTrait for IngressMsgDownsampler { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if let Some(full_expr) = ctx.full_expr() { + match KeyExpr::new(full_expr) { + Ok(keyexpr) => { + if !self.conf.keyexpr.intersects(&keyexpr) { + return Some(ctx); + } + + let timestamp = std::time::Instant::now(); + let mut latest_message_timestamp = + self.latest_message_timestamp.lock().unwrap(); + + if timestamp - *latest_message_timestamp + > std::time::Duration::from_millis(self.conf.threshold_ms) + { + *latest_message_timestamp = timestamp; + log::trace!("Interceptor: Passed threshold, passing."); + Some(ctx) + } else { + log::trace!("Interceptor: Skipped due to threshold."); + None + } + } + Err(_) => { + log::warn!("Interceptor: Wrong KeyExpr, passing."); + Some(ctx) + } + } + } else { + // message has no key expr + Some(ctx) + } + } +} + +impl IngressMsgDownsampler { + pub fn new(conf: DownsamplerConf) -> Self { + // TODO (sashacmc): I need just := 0, but how??? + let zero_ts = + std::time::Instant::now() - std::time::Duration::from_micros(conf.threshold_ms); + Self { + conf, + latest_message_timestamp: Arc::new(Mutex::new(zero_ts)), + } + } +} + +pub(crate) struct EgressMsgDownsampler {} + +impl InterceptorTrait for EgressMsgDownsampler { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + // TODO(sashacmc): Do we need Ergress Downsampler? + Some(ctx) + } +} + +pub struct DownsamplerInterceptor { + conf: DownsamplerConf, +} + +impl DownsamplerInterceptor { + pub fn new(conf: DownsamplerConf) -> Self { + log::debug!("DownsamplerInterceptor enabled: {:?}", conf); + Self { conf } + } +} + +impl InterceptorFactoryTrait for DownsamplerInterceptor { + fn new_transport_unicast( + &self, + transport: &TransportUnicast, + ) -> (Option, Option) { + log::debug!("New transport unicast {:?}", transport); + ( + Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))), + Some(Box::new(EgressMsgDownsampler {})), + ) + } + + fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { + log::debug!("New transport multicast {:?}", transport); + Some(Box::new(EgressMsgDownsampler {})) + } + + fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { + log::debug!("New peer multicast {:?}", transport); + Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))) + } +} diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 22e0e4e549..e5c044116b 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -17,11 +17,15 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) + use super::RoutingContext; use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; +pub mod downsampling; +use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor; + pub(crate) trait InterceptorTrait { fn intercept( &self, @@ -44,11 +48,14 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(_config: &Config) -> Vec { - // Add interceptors here - // TODO build the list of intercetors with the correct order from the config - // vec![Box::new(LoggerInterceptor {})] - vec![] +pub(crate) fn interceptor_factories(config: &Config) -> Vec { + let mut res: Vec = vec![]; + + for ds in config.downsampling().downsamples() { + res.push(Box::new(DownsamplerInterceptor::new(ds.clone()))) + } + res.push(Box::new(LoggerInterceptor {})); + res } pub(crate) struct InterceptorsChain { diff --git a/zenoh/src/net/tests/mod.rs b/zenoh/src/net/tests/mod.rs index b8b42bef12..7f90b72002 100644 --- a/zenoh/src/net/tests/mod.rs +++ b/zenoh/src/net/tests/mod.rs @@ -1 +1,2 @@ +pub(crate) mod interceptors; pub(crate) mod tables; diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs new file mode 100644 index 0000000000..33b8a94d49 --- /dev/null +++ b/zenoh/tests/interceptors.rs @@ -0,0 +1,74 @@ +use std::sync::{Arc, Mutex}; + +#[test] +fn downsampling() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare publisher + let mut config = Config::default(); + config + .insert_json5( + "downsampling/downsamples", + r#" + [ + { + keyexpr: "test/downsamples/r100", + threshold_ms: 100, + }, + { + keyexpr: "test/downsamples/r50", + threshold_ms: 50, + }, + ] + "#, + ) + .unwrap(); + + // declare subscriber + let zenoh_sub = zenoh::open(config).res().unwrap(); + + let count_r100 = Arc::new(Mutex::new(0)); + let count_r100_clone = count_r100.clone(); + + let count_r50 = Arc::new(Mutex::new(0)); + let count_r50_clone = count_r50.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples/*") + .callback(move |sample| { + if sample.key_expr.as_str() == "test/downsamples/r100" { + let mut count = count_r100_clone.lock().unwrap(); + *count += 1; + } else if sample.key_expr.as_str() == "test/downsamples/r50" { + let mut count = count_r50_clone.lock().unwrap(); + *count += 1; + } + }) + .res() + .unwrap(); + + let zenoh_pub = zenoh::open(Config::default()).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples/r100") + .res() + .unwrap(); + + let publisher_r50 = zenoh_pub + .declare_publisher("test/downsamples/r50") + .res() + .unwrap(); + + let ten_millis = std::time::Duration::from_millis(10); + for i in 0..100 { + println!("message {}", i); + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_r50.put(format!("message {}", i)).res().unwrap(); + std::thread::sleep(ten_millis); + } + + // check result count of queries + assert_eq!(*count_r100.lock().unwrap(), 10); + assert_eq!(*count_r50.lock().unwrap(), 20); +}