Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEN-520] Add support for query/reply messages to Downsampling #1797

Merged
14 changes: 12 additions & 2 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,18 @@
// /// Optional list of network interfaces messages will be processed on, the rest will be passed as is.
// /// If absent, the rules will be applied to all interfaces, in case of an empty list it means that they will not be applied to any.
// interfaces: [ "wlan0" ],
// /// Data flow messages will be processed on. ("egress" or "ingress")
// flow: "egress",
// /// Optional list of data flows messages will be processed on ("egress" and/or "ingress").
// /// If absent, the rules will be applied to both flows.
// flow: ["ingress", "egress"],
// /// List of message type on which downsampling will be applied. Must not be empty.
// messages: [
// /// Publication (Put and Delete)
// "push",
// /// Get
// "query",
// /// Queryable Reply to a Query
// "reply"
// ],
// /// A list of downsampling rules: key_expression and the maximum frequency in Hertz
// rules: [
// { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 },
Expand Down
14 changes: 12 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ pub enum InterceptorFlow {
Ingress,
}

#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum DownsamplingMessage {
Push,
Query,
Reply,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingRuleConf {
/// A list of key-expressions to which the downsampling will be applied.
Expand All @@ -101,10 +109,12 @@ pub struct DownsamplingItemConf {
/// A list of interfaces to which the downsampling will be applied
/// Downsampling will be applied for all interfaces if the parameter is None
pub interfaces: Option<Vec<String>>,
// list of message types on which the downsampling will be applied
pub messages: Vec<DownsamplingMessage>,
/// A list of interfaces to which the downsampling will be applied.
pub rules: Vec<DownsamplingRuleConf>,
/// Downsampling flow direction: egress, ingress
pub flow: InterceptorFlow,
/// Downsampling flow directions: egress and/or ingress
pub flows: Option<Vec<InterceptorFlow>>,
}

#[derive(Serialize, Debug, Deserialize, Clone)]
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/net/routing/interceptor/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use zenoh_keyexpr::{
keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut, KeBoxTree},
};
use zenoh_result::ZResult;

use super::InterfaceEnabled;
type PolicyForSubject = FlowPolicy;

type PolicyMap = HashMap<usize, PolicyForSubject, RandomState>;
Expand Down Expand Up @@ -243,12 +245,6 @@ impl FlowPolicy {
}
}

#[derive(Default, Debug)]
pub struct InterfaceEnabled {
pub ingress: bool,
pub egress: bool,
}

pub struct PolicyEnforcer {
pub(crate) acl_enabled: bool,
pub(crate) default_permission: Permission,
Expand Down
99 changes: 78 additions & 21 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
sync::{Arc, Mutex},
};

use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow};
use zenoh_config::{
DownsamplingItemConf, DownsamplingMessage, DownsamplingRuleConf, InterceptorFlow,
};
use zenoh_core::zlock;
use zenoh_keyexpr::keyexpr_tree::{
impls::KeyedSetProvider, support::UnknownWildness, IKeyExprTree, IKeyExprTreeMut, KeBoxTree,
Expand All @@ -46,7 +48,19 @@
bail!("Invalid Downsampling config: id '{id}' is repeated");
}
}
res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone())));
let mut ds = ds.clone();
// check for undefined flows and initialize them
let flows = ds
.flows
.get_or_insert(vec![InterceptorFlow::Ingress, InterceptorFlow::Egress]);
if flows.is_empty() {
bail!("Invalid Downsampling config: flows list must not be empty");

Check warning on line 57 in zenoh/src/net/routing/interceptor/downsampling.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/interceptor/downsampling.rs#L57

Added line #L57 was not covered by tests
}
// check for empty messages list
if ds.messages.is_empty() {
bail!("Invalid Downsampling config: messages list must not be empty");

Check warning on line 61 in zenoh/src/net/routing/interceptor/downsampling.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/interceptor/downsampling.rs#L61

Added line #L61 was not covered by tests
}
res.push(Box::new(DownsamplingInterceptorFactory::new(ds)));
}

Ok(res)
Expand All @@ -55,15 +69,21 @@
pub struct DownsamplingInterceptorFactory {
interfaces: Option<Vec<String>>,
rules: Vec<DownsamplingRuleConf>,
flow: InterceptorFlow,
flows: InterfaceEnabled,
messages: Arc<DownsamplingFilters>,
}

impl DownsamplingInterceptorFactory {
pub fn new(conf: DownsamplingItemConf) -> Self {
Self {
interfaces: conf.interfaces,
rules: conf.rules,
flow: conf.flow,
flows: conf
.flows
.expect("config flows should be set")
.as_slice()
.into(),
messages: Arc::new(conf.messages.as_slice().into()),
}
}
}
Expand Down Expand Up @@ -91,21 +111,20 @@
}
}
};

match self.flow {
InterceptorFlow::Ingress => (
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
(
self.flows.ingress.then(|| {
Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.messages.clone(),
self.rules.clone(),
)))),
None,
),
InterceptorFlow::Egress => (
None,
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
))) as IngressInterceptor
}),
self.flows.egress.then(|| {
Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.messages.clone(),
self.rules.clone(),
)))),
),
}
))) as EgressInterceptor
}),
)
}

fn new_transport_multicast(
Expand All @@ -120,16 +139,52 @@
}
}

#[derive(Debug, Default, Clone)]
pub(crate) struct DownsamplingFilters {
push: bool,
query: bool,
reply: bool,
}

impl From<&[DownsamplingMessage]> for DownsamplingFilters {
fn from(value: &[DownsamplingMessage]) -> Self {
let mut res = Self::default();
for v in value {
match v {
DownsamplingMessage::Push => res.push = true,
DownsamplingMessage::Query => res.query = true,
DownsamplingMessage::Reply => res.reply = true,
}
}
res
}
}

struct Timestate {
pub threshold: tokio::time::Duration,
pub latest_message_timestamp: tokio::time::Instant,
}

pub(crate) struct DownsamplingInterceptor {
filtered_messages: Arc<DownsamplingFilters>,
ke_id: Arc<Mutex<KeBoxTree<usize, UnknownWildness, KeyedSetProvider>>>,
ke_state: Arc<Mutex<HashMap<usize, Timestate>>>,
}

impl DownsamplingInterceptor {
fn is_msg_filtered(&self, ctx: &RoutingContext<NetworkMessage>) -> bool {
match ctx.msg.body {
NetworkBody::Push(_) => self.filtered_messages.push,
NetworkBody::Request(_) => self.filtered_messages.query,
NetworkBody::Response(_) => self.filtered_messages.reply,
NetworkBody::ResponseFinal(_) => false,
NetworkBody::Interest(_) => false,

Check warning on line 181 in zenoh/src/net/routing/interceptor/downsampling.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/interceptor/downsampling.rs#L181

Added line #L181 was not covered by tests
NetworkBody::Declare(_) => false,
NetworkBody::OAM(_) => false,
}
}
}

impl InterceptorTrait for DownsamplingInterceptor {
fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>> {
let ke_id = zlock!(self.ke_id);
Expand All @@ -146,7 +201,7 @@
ctx: RoutingContext<NetworkMessage>,
cache: Option<&Box<dyn Any + Send + Sync>>,
) -> Option<RoutingContext<NetworkMessage>> {
if matches!(ctx.msg.body, NetworkBody::Push(_)) {
if self.is_msg_filtered(&ctx) {
if let Some(cache) = cache {
if let Some(id) = cache.downcast_ref::<Option<usize>>() {
if let Some(id) = id {
Expand Down Expand Up @@ -177,7 +232,7 @@
const NANOS_PER_SEC: f64 = 1_000_000_000.0;

impl DownsamplingInterceptor {
pub fn new(rules: Vec<DownsamplingRuleConf>) -> Self {
pub fn new(messages: Arc<DownsamplingFilters>, rules: Vec<DownsamplingRuleConf>) -> Self {
let mut ke_id = KeBoxTree::default();
let mut ke_state = HashMap::default();
for (id, rule) in rules.into_iter().enumerate() {
Expand All @@ -197,12 +252,14 @@
},
);
tracing::debug!(
"New downsampler rule enabled: key_expr={:?}, threshold={:?}",
"New downsampler rule enabled: key_expr={:?}, threshold={:?}, messages={:?}",

Check warning on line 255 in zenoh/src/net/routing/interceptor/downsampling.rs

View check run for this annotation

Codecov / codecov/patch

zenoh/src/net/routing/interceptor/downsampling.rs#L255

Added line #L255 was not covered by tests
rule.key_expr,
threshold
threshold,
messages,
);
}
Self {
filtered_messages: messages,
ke_id: Arc::new(Mutex::new(ke_id)),
ke_state: Arc::new(Mutex::new(ke_state)),
}
Expand Down
24 changes: 23 additions & 1 deletion zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use access_control::acl_interceptor_factories;
mod authorization;
use std::any::Any;

use zenoh_config::Config;
use zenoh_config::{Config, InterceptorFlow};
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};
Expand All @@ -35,6 +35,28 @@ use crate::api::key_expr::KeyExpr;
pub mod downsampling;
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;

#[derive(Default, Debug)]
pub struct InterfaceEnabled {
pub ingress: bool,
pub egress: bool,
}

impl From<&[InterceptorFlow]> for InterfaceEnabled {
fn from(value: &[InterceptorFlow]) -> Self {
let mut res = Self {
ingress: false,
egress: false,
};
for v in value {
match v {
InterceptorFlow::Egress => res.egress = true,
InterceptorFlow::Ingress => res.ingress = true,
}
}
res
}
}

pub(crate) trait InterceptorTrait {
fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>>;

Expand Down
Loading