diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index fa95f16d6..addf29734 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -271,8 +271,18 @@ // /// Optional list of network interfaces messages will be processed on, the rest will be passed as is. // /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any. // interfaces: [ "wlan0" ], - // /// Data flow messages will be processed on. ("egress" or "ingress") - // flow: "egress", + // /// Optional list of data flows messages will be processed on ("egress" and/or "ingress"). + // /// If absent, the rules will be applied to both flows. + // flow: ["ingress", "egress"], + // /// List of message type on which downsampling will be applied. Must not be empty. + // messages: [ + // /// Publication (Put and Delete) + // "push", + // /// Get + // "query", + // /// Queryable Reply to a Query + // "reply" + // ], // /// A list of downsampling rules: key_expression and the maximum frequency in Hertz // rules: [ // { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 }, diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index aeeebc065..06216f5c0 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -85,6 +85,14 @@ pub enum InterceptorFlow { Ingress, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum DownsamplingMessage { + Push, + Query, + Reply, +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplingRuleConf { /// A list of key-expressions to which the downsampling will be applied. @@ -101,10 +109,12 @@ pub struct DownsamplingItemConf { /// A list of interfaces to which the downsampling will be applied /// Downsampling will be applied for all interfaces if the parameter is None pub interfaces: Option>, + // list of message types on which the downsampling will be applied + pub messages: Vec, /// A list of interfaces to which the downsampling will be applied. pub rules: Vec, - /// Downsampling flow direction: egress, ingress - pub flow: InterceptorFlow, + /// Downsampling flow directions: egress and/or ingress + pub flows: Option>, } #[derive(Serialize, Debug, Deserialize, Clone)] diff --git a/zenoh/src/net/routing/interceptor/authorization.rs b/zenoh/src/net/routing/interceptor/authorization.rs index 9959fece0..5ef62ae79 100644 --- a/zenoh/src/net/routing/interceptor/authorization.rs +++ b/zenoh/src/net/routing/interceptor/authorization.rs @@ -30,6 +30,8 @@ use zenoh_keyexpr::{ keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, KeBoxTree}, }; use zenoh_result::ZResult; + +use super::InterfaceEnabled; type PolicyForSubject = FlowPolicy; type PolicyMap = HashMap; @@ -243,12 +245,6 @@ impl FlowPolicy { } } -#[derive(Default, Debug)] -pub struct InterfaceEnabled { - pub ingress: bool, - pub egress: bool, -} - pub struct PolicyEnforcer { pub(crate) acl_enabled: bool, pub(crate) default_permission: Permission, diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 60af846bb..fc23a3b9d 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -23,7 +23,9 @@ use std::{ sync::{Arc, Mutex}, }; -use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; +use zenoh_config::{ + DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow, +}; use zenoh_core::zlock; use zenoh_keyexpr::keyexpr_tree::{ impls::KeyedSetProvider, support::UnknownWildness, IKeyExprTree, IKeyExprTreeMut, KeBoxTree, @@ -46,7 +48,19 @@ pub(crate) fn downsampling_interceptor_factories( bail!("Invalid Downsampling config: id '{id}' is repeated"); } } - res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone()))); + let mut ds = ds.clone(); + // check for undefined flows and initialize them + let flows = ds + .flows + .get_or_insert(vec![InterceptorFlow::Ingress, InterceptorFlow::Egress]); + if flows.is_empty() { + bail!("Invalid Downsampling config: flows list must not be empty"); + } + // check for empty messages list + if ds.messages.is_empty() { + bail!("Invalid Downsampling config: messages list must not be empty"); + } + res.push(Box::new(DownsamplingInterceptorFactory::new(ds))); } Ok(res) @@ -55,7 +69,8 @@ pub(crate) fn downsampling_interceptor_factories( pub struct DownsamplingInterceptorFactory { interfaces: Option>, rules: Vec, - flow: InterceptorFlow, + flows: InterfaceEnabled, + messages: Arc, } impl DownsamplingInterceptorFactory { @@ -63,7 +78,12 @@ impl DownsamplingInterceptorFactory { Self { interfaces: conf.interfaces, rules: conf.rules, - flow: conf.flow, + flows: conf + .flows + .expect("config flows should be set") + .as_slice() + .into(), + messages: Arc::new(conf.messages.as_slice().into()), } } } @@ -91,21 +111,20 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { } } }; - - match self.flow { - InterceptorFlow::Ingress => ( - Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + ( + self.flows.ingress.then(|| { + Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.messages.clone(), self.rules.clone(), - )))), - None, - ), - InterceptorFlow::Egress => ( - None, - Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + ))) as IngressInterceptor + }), + self.flows.egress.then(|| { + Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new( + self.messages.clone(), self.rules.clone(), - )))), - ), - } + ))) as EgressInterceptor + }), + ) } fn new_transport_multicast( @@ -120,16 +139,52 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { } } +#[derive(Debug, Default, Clone)] +pub(crate) struct DownsamplingFilters { + push: bool, + query: bool, + reply: bool, +} + +impl From<&[DownsamplingMessage]> for DownsamplingFilters { + fn from(value: &[DownsamplingMessage]) -> Self { + let mut res = Self::default(); + for v in value { + match v { + DownsamplingMessage::Push => res.push = true, + DownsamplingMessage::Query => res.query = true, + DownsamplingMessage::Reply => res.reply = true, + } + } + res + } +} + struct Timestate { pub threshold: tokio::time::Duration, pub latest_message_timestamp: tokio::time::Instant, } pub(crate) struct DownsamplingInterceptor { + filtered_messages: Arc, ke_id: Arc>>, ke_state: Arc>>, } +impl DownsamplingInterceptor { + fn is_msg_filtered(&self, ctx: &RoutingContext) -> bool { + match ctx.msg.body { + NetworkBody::Push(_) => self.filtered_messages.push, + NetworkBody::Request(_) => self.filtered_messages.query, + NetworkBody::Response(_) => self.filtered_messages.reply, + NetworkBody::ResponseFinal(_) => false, + NetworkBody::Interest(_) => false, + NetworkBody::Declare(_) => false, + NetworkBody::OAM(_) => false, + } + } +} + impl InterceptorTrait for DownsamplingInterceptor { fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option> { let ke_id = zlock!(self.ke_id); @@ -146,7 +201,7 @@ impl InterceptorTrait for DownsamplingInterceptor { ctx: RoutingContext, cache: Option<&Box>, ) -> Option> { - if matches!(ctx.msg.body, NetworkBody::Push(_)) { + if self.is_msg_filtered(&ctx) { if let Some(cache) = cache { if let Some(id) = cache.downcast_ref::>() { if let Some(id) = id { @@ -177,7 +232,7 @@ impl InterceptorTrait for DownsamplingInterceptor { const NANOS_PER_SEC: f64 = 1_000_000_000.0; impl DownsamplingInterceptor { - pub fn new(rules: Vec) -> Self { + pub fn new(messages: Arc, rules: Vec) -> Self { let mut ke_id = KeBoxTree::default(); let mut ke_state = HashMap::default(); for (id, rule) in rules.into_iter().enumerate() { @@ -197,12 +252,14 @@ impl DownsamplingInterceptor { }, ); tracing::debug!( - "New downsampler rule enabled: key_expr={:?}, threshold={:?}", + "New downsampler rule enabled: key_expr={:?}, threshold={:?}, messages={:?}", rule.key_expr, - threshold + threshold, + messages, ); } Self { + filtered_messages: messages, ke_id: Arc::new(Mutex::new(ke_id)), ke_state: Arc::new(Mutex::new(ke_state)), } diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 1e8ee6e61..69b8dd60c 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -24,7 +24,7 @@ use access_control::acl_interceptor_factories; mod authorization; use std::any::Any; -use zenoh_config::Config; +use zenoh_config::{Config, InterceptorFlow}; use zenoh_protocol::network::NetworkMessage; use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; @@ -35,6 +35,28 @@ use crate::api::key_expr::KeyExpr; pub mod downsampling; use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories; +#[derive(Default, Debug)] +pub struct InterfaceEnabled { + pub ingress: bool, + pub egress: bool, +} + +impl From<&[InterceptorFlow]> for InterfaceEnabled { + fn from(value: &[InterceptorFlow]) -> Self { + let mut res = Self { + ingress: false, + egress: false, + }; + for v in value { + match v { + InterceptorFlow::Egress => res.egress = true, + InterceptorFlow::Ingress => res.ingress = true, + } + } + res + } +} + pub(crate) trait InterceptorTrait { fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option>; diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 0c0d5d298..fd863a456 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -23,8 +23,10 @@ use std::{ }, }; -use zenoh::{key_expr::KeyExpr, Config, Wait}; -use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; +use zenoh::{key_expr::KeyExpr, query::ConsolidationMode, Config, Wait}; +use zenoh_config::{ + DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow, +}; // Tokio's time granularity on different platforms #[cfg(target_os = "windows")] @@ -40,40 +42,40 @@ fn build_config( ds_config: Vec, flow: InterceptorFlow, ) -> (Config, Config) { - let mut pub_config = Config::default(); - pub_config + let mut sender_config = Config::default(); + sender_config .scouting .multicast .set_enabled(Some(false)) .unwrap(); - let mut sub_config = Config::default(); - sub_config + let mut receiver_config = Config::default(); + receiver_config .scouting .multicast .set_enabled(Some(false)) .unwrap(); - sub_config + receiver_config .listen .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); - pub_config + sender_config .connect .endpoints .set(vec![locator.parse().unwrap()]) .unwrap(); match flow { - InterceptorFlow::Egress => pub_config.set_downsampling(ds_config).unwrap(), - InterceptorFlow::Ingress => sub_config.set_downsampling(ds_config).unwrap(), + InterceptorFlow::Egress => sender_config.set_downsampling(ds_config).unwrap(), + InterceptorFlow::Ingress => receiver_config.set_downsampling(ds_config).unwrap(), }; - (pub_config, sub_config) + (sender_config, receiver_config) } -fn downsampling_test( +fn downsampling_pub_sub_test( pub_config: Config, sub_config: Config, ke_prefix: &str, @@ -152,8 +154,9 @@ fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let ds_config = DownsamplingItemConf { id: None, - flow, + flows: Some(vec![flow]), interfaces: None, + messages: vec![DownsamplingMessage::Push], rules: vec![ DownsamplingRuleConf { key_expr: ke_10hz.clone().into(), @@ -186,7 +189,7 @@ fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { let (pub_config, sub_config) = build_config(locator, vec![ds_config], flow); - downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); + downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[test] @@ -208,8 +211,9 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { let ds_config = vec![ DownsamplingItemConf { id: Some("someid".to_string()), - flow, + flows: Some(vec![flow]), interfaces: Some(vec!["lo".to_string(), "lo0".to_string()]), + messages: vec![DownsamplingMessage::Push], rules: vec![DownsamplingRuleConf { key_expr: ke_10hz.clone().into(), freq: 10.0, @@ -217,8 +221,9 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { }, DownsamplingItemConf { id: None, - flow, + flows: Some(vec![flow]), interfaces: Some(vec!["some_unknown_interface".to_string()]), + messages: vec![DownsamplingMessage::Push], rules: vec![DownsamplingRuleConf { key_expr: ke_no_effect.clone().into(), freq: 10.0, @@ -240,7 +245,7 @@ fn downsampling_by_interface_impl(flow: InterceptorFlow) { let (pub_config, sub_config) = build_config(locator, ds_config, flow); - downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); + downsampling_pub_sub_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[cfg(unix)] @@ -263,7 +268,8 @@ fn downsampling_config_error_wrong_strategy() { r#" [ { - flow: "down", + flows: ["down"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }, { key_expr: "test/downsamples_by_keyexp/r50", freq: 20, } @@ -290,14 +296,16 @@ fn downsampling_config_error_repeated_id() { [ { id: "REPEATED", - flow: "egress", + flows: ["egress"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }, ], }, { id: "REPEATED", - flow: "ingress", + flows: ["ingress"], + messages: ["push"], rules: [ { key_expr: "test/downsamples_by_keyexp/r50", freq: 20, } ], @@ -309,3 +317,121 @@ fn downsampling_config_error_repeated_id() { zenoh::open(config).wait().unwrap(); } + +fn downsampling_query_reply_test( + query_config: Config, + queryable_config: Config, + queryable_ke: &str, + reply_counter: Arc, + nb_queries: usize, +) { + let query_session = zenoh::open(query_config).wait().unwrap(); + let queryable_session = zenoh::open(queryable_config).wait().unwrap(); + + let response_ke = queryable_ke.to_owned(); + queryable_session + .declare_queryable(queryable_ke) + .callback(move |query| { + query.reply(&response_ke, "".as_bytes()).wait().unwrap(); + }) + .background() + .wait() + .unwrap(); + + std::thread::sleep(std::time::Duration::from_secs(1)); + + let mut handles = Vec::new(); + for _ in 0..nb_queries { + let queryable_ke = queryable_ke.to_owned(); + let query_session = query_session.clone(); + let reply_counter = reply_counter.clone(); + let handle = std::thread::spawn(move || { + let query = query_session + .get(&queryable_ke) + .consolidation(ConsolidationMode::None) + .timeout(std::time::Duration::from_secs(5)) + .wait() + .unwrap(); + while let Ok(reply) = query.recv() { + if reply.into_result().is_ok() { + reply_counter.fetch_add(1, Ordering::SeqCst); + } + } + }); + handles.push(handle); + } + for handle in handles { + handle.join().unwrap(); + } + query_session.close().wait().unwrap(); + queryable_session.close().wait().unwrap(); +} + +fn downsampling_query_rate_test(flow: InterceptorFlow) { + let queryable_ke = "test/downsamples_query"; + let locator = "tcp/127.0.0.1:31448"; + + let ds_config = DownsamplingItemConf { + id: None, + flows: Some(vec![flow]), + interfaces: None, + messages: vec![DownsamplingMessage::Query], + rules: vec![DownsamplingRuleConf { + key_expr: queryable_ke.try_into().unwrap(), + freq: 0.01, + }], + }; + + let (query_config, queryable_config) = build_config(locator, vec![ds_config], flow); + let reply_counter = Arc::new(AtomicUsize::new(0)); + downsampling_query_reply_test( + query_config, + queryable_config, + queryable_ke, + reply_counter.clone(), + 2, + ); + assert!(reply_counter.load(Ordering::SeqCst) == 1); +} + +fn downsampling_reply_rate_test(flow: InterceptorFlow) { + let queryable_ke = "test/downsamples_reply"; + let locator = "tcp/127.0.0.1:31449"; + + let ds_config = DownsamplingItemConf { + id: None, + flows: Some(vec![flow]), + interfaces: None, + messages: vec![DownsamplingMessage::Reply], + rules: vec![DownsamplingRuleConf { + key_expr: queryable_ke.try_into().unwrap(), + freq: 0.01, + }], + }; + + let (queryable_config, query_config) = build_config(locator, vec![ds_config], flow); + let reply_counter = Arc::new(AtomicUsize::new(0)); + downsampling_query_reply_test( + query_config, + queryable_config, + queryable_ke, + reply_counter.clone(), + 2, + ); + + assert!(reply_counter.load(Ordering::SeqCst) == 1); +} + +#[test] +fn downsampling_query_test() { + zenoh::init_log_from_env_or("error"); + downsampling_query_rate_test(InterceptorFlow::Ingress); + downsampling_query_rate_test(InterceptorFlow::Egress); +} + +#[test] +fn downsampling_reply_test() { + zenoh::init_log_from_env_or("error"); + downsampling_reply_rate_test(InterceptorFlow::Ingress); + downsampling_reply_rate_test(InterceptorFlow::Egress); +}