diff --git a/Cargo.lock b/Cargo.lock index 0129941564..0cb5efd249 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5152,6 +5152,7 @@ dependencies = [ "uhlc", "validated_struct", "zenoh-core", + "zenoh-keyexpr", "zenoh-macros", "zenoh-protocol", "zenoh-result", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 8d140ee9ba..6422759952 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -184,6 +184,27 @@ }, }, + // /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config for overwritten values) + // qos: { + // /// Overwrite QoS options for PUT and DELETE messages + // publication: [ + // { + // /// PUT and DELETE messages on key expressions that are included by these key expressions + // /// will have their QoS options overwritten by the given config. + // key_exprs: ["demo/**", "example/key"], + // /// Configurations that will be applied on the publisher. + // /// Options that are supplied here will overwrite the configuration given in Zenoh API + // config: { + // congestion_control: "block", + // priority: "data_high", + // express: true, + // reliability: "best_effort", + // allowed_destination: "remote", + // }, + // }, + // ], + // }, + // /// The declarations aggregation strategy. // aggregation: { // /// A list of key-expressions for which all included subscribers will be aggregated into. diff --git a/commons/zenoh-config/Cargo.toml b/commons/zenoh-config/Cargo.toml index 0a650d5d46..d7e24beae5 100644 --- a/commons/zenoh-config/Cargo.toml +++ b/commons/zenoh-config/Cargo.toml @@ -26,6 +26,7 @@ description = "Internal crate for zenoh." [features] internal = [] transport_tcp = [] +unstable = [] [dependencies] tracing = { workspace = true } @@ -36,6 +37,7 @@ serde_json = { workspace = true } serde_yaml = { workspace = true } validated_struct = { workspace = true, features = ["json5", "json_get"] } zenoh-core = { workspace = true } +zenoh-keyexpr = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-util = { workspace = true } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 51c851b3ab..87c1b9b90d 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -21,6 +21,7 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; +pub mod qos; pub mod wrappers; #[allow(unused_imports)] @@ -30,6 +31,7 @@ use std::{ }; use include::recursive_include; +use qos::PublisherQoSConfList; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -360,6 +362,14 @@ validated_struct::validator! { /// A list of key-expressions for which all included publishers will be aggregated into. publishers: Vec, }, + + /// Overwrite QoS options for Zenoh messages by key expression (ignores Zenoh API QoS config) + pub qos: #[derive(Default)] + QoSConfig { + /// A list of QoS configurations for PUT and DELETE messages by key expressions + publication: PublisherQoSConfList, + }, + pub transport: #[derive(Default)] TransportConf { pub unicast: TransportUnicastConf { diff --git a/commons/zenoh-config/src/qos.rs b/commons/zenoh-config/src/qos.rs new file mode 100644 index 0000000000..526c02d175 --- /dev/null +++ b/commons/zenoh-config/src/qos.rs @@ -0,0 +1,119 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use serde::{Deserialize, Serialize}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTreeMut, KeBoxTree}; +use zenoh_protocol::core::{key_expr::OwnedKeyExpr, CongestionControl, Reliability}; + +#[derive(Debug, Deserialize, Default, Serialize, Clone)] +pub struct PublisherQoSConfList(pub(crate) Vec); + +impl From for KeBoxTree { + fn from(value: PublisherQoSConfList) -> KeBoxTree { + let mut tree = KeBoxTree::new(); + for conf in value.0 { + for key_expr in conf.key_exprs { + // NOTE: we don't check key_expr unicity + tree.insert(&key_expr, conf.config.clone()); + } + } + tree + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub(crate) struct PublisherQoSConf { + pub key_exprs: Vec, + pub config: PublisherQoSConfig, +} + +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +pub struct PublisherQoSConfig { + pub congestion_control: Option, + pub priority: Option, + pub express: Option, + #[cfg(feature = "unstable")] + pub reliability: Option, + #[cfg(feature = "unstable")] + pub allowed_destination: Option, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum PublisherCongestionControlConf { + Drop, + Block, +} + +impl From for CongestionControl { + fn from(value: PublisherCongestionControlConf) -> Self { + match value { + PublisherCongestionControlConf::Drop => Self::Drop, + PublisherCongestionControlConf::Block => Self::Block, + } + } +} + +impl From for PublisherCongestionControlConf { + fn from(value: CongestionControl) -> Self { + match value { + CongestionControl::Drop => Self::Drop, + CongestionControl::Block => Self::Block, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherPriorityConf { + RealTime = 1, + InteractiveHigh = 2, + InteractiveLow = 3, + DataHigh = 4, + Data = 5, + DataLow = 6, + Background = 7, +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherReliabilityConf { + BestEffort, + Reliable, +} + +impl From for Reliability { + fn from(value: PublisherReliabilityConf) -> Self { + match value { + PublisherReliabilityConf::BestEffort => Self::BestEffort, + PublisherReliabilityConf::Reliable => Self::Reliable, + } + } +} + +impl From for PublisherReliabilityConf { + fn from(value: Reliability) -> Self { + match value { + Reliability::BestEffort => Self::BestEffort, + Reliability::Reliable => Self::Reliable, + } + } +} + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PublisherLocalityConf { + SessionLocal, + Remote, + Any, +} diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index ece72ab5c4..6a998b2962 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -64,7 +64,7 @@ transport_udp = ["zenoh-transport/transport_udp"] transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] transport_vsock = ["zenoh-transport/transport_vsock"] -unstable = ["internal_config", "zenoh-keyexpr/unstable"] +unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable"] internal_config = [] [dependencies] diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index a2515eb284..54070ad607 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -13,7 +13,10 @@ // use std::future::{IntoFuture, Ready}; +use itertools::Itertools; +use zenoh_config::qos::PublisherQoSConfig; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeNode}; use zenoh_protocol::core::CongestionControl; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; @@ -204,7 +207,8 @@ impl Resolvable for PublicationBuilder { impl Wait for PublicationBuilder, PublicationBuilderPut> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, self.kind.payload, @@ -226,7 +230,8 @@ impl Wait for PublicationBuilder, PublicationBuilderPut impl Wait for PublicationBuilder, PublicationBuilderDelete> { #[inline] - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self.publisher = self.publisher.apply_qos_overwrites(); self.publisher.session.0.resolve_put( &self.publisher.key_expr?, ZBytes::new(), @@ -341,6 +346,58 @@ impl QoSBuilderTrait for PublisherBuilder<'_, '_> { } impl PublisherBuilder<'_, '_> { + /// Looks up if any configured QoS overwrites apply on the builder's key expression. + /// Returns a new builder with the overwritten QoS parameters. + pub(crate) fn apply_qos_overwrites(self) -> Self { + let mut qos_overwrites = PublisherQoSConfig::default(); + if let Ok(key_expr) = &self.key_expr { + // get overwritten builder + let state = zread!(self.session.0.state); + let nodes_including = state + .publisher_qos_tree + .nodes_including(key_expr) + .collect_vec(); + for node in &nodes_including { + // Take the first one yielded by the iterator that has overwrites + if let Some(overwrites) = node.weight() { + qos_overwrites = overwrites.clone(); + // log warning if multiple keyexprs include it + if nodes_including.len() > 1 { + tracing::warn!( + "Publisher declared on `{}` which is included by multiple key_exprs in qos config. Using qos config for `{}`", + key_expr, + node.keyexpr(), + ); + } + break; + } + } + } + + Self { + congestion_control: qos_overwrites + .congestion_control + .map(|cc| cc.into()) + .unwrap_or(self.congestion_control), + priority: qos_overwrites + .priority + .map(|p| p.into()) + .unwrap_or(self.priority), + is_express: qos_overwrites.express.unwrap_or(self.is_express), + #[cfg(feature = "unstable")] + reliability: qos_overwrites + .reliability + .map(|r| r.into()) + .unwrap_or(self.reliability), + #[cfg(feature = "unstable")] + destination: qos_overwrites + .allowed_destination + .map(|d| d.into()) + .unwrap_or(self.destination), + ..self + } + } + /// Changes the [`crate::sample::Locality`] applied when routing the data. /// /// This restricts the matching subscribers that will receive the published data to the ones @@ -372,7 +429,8 @@ impl<'b> Resolvable for PublisherBuilder<'_, 'b> { } impl Wait for PublisherBuilder<'_, '_> { - fn wait(self) -> ::To { + fn wait(mut self) -> ::To { + self = self.apply_qos_overwrites(); let mut key_expr = self.key_expr?; if !key_expr.is_fully_optimized(&self.session.0) { key_expr = self.session.declare_keyexpr(key_expr).wait()?; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 78d44fc794..1217a12382 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,6 +22,7 @@ use std::{ use futures::Sink; use tracing::error; +use zenoh_config::qos::PublisherPriorityConf; use zenoh_core::{Resolvable, Resolve, Wait}; use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; @@ -477,6 +478,34 @@ impl TryFrom for Priority { } } +impl From for Priority { + fn from(value: PublisherPriorityConf) -> Self { + match value { + PublisherPriorityConf::RealTime => Self::RealTime, + PublisherPriorityConf::InteractiveHigh => Self::InteractiveHigh, + PublisherPriorityConf::InteractiveLow => Self::InteractiveLow, + PublisherPriorityConf::DataHigh => Self::DataHigh, + PublisherPriorityConf::Data => Self::Data, + PublisherPriorityConf::DataLow => Self::DataLow, + PublisherPriorityConf::Background => Self::Background, + } + } +} + +impl From for PublisherPriorityConf { + fn from(value: Priority) -> Self { + match value { + Priority::RealTime => Self::RealTime, + Priority::InteractiveHigh => Self::InteractiveHigh, + Priority::InteractiveLow => Self::InteractiveLow, + Priority::DataHigh => Self::DataHigh, + Priority::Data => Self::Data, + Priority::DataLow => Self::DataLow, + Priority::Background => Self::Background, + } + } +} + type ProtocolPriority = zenoh_protocol::core::Priority; impl From for ProtocolPriority { fn from(prio: Priority) -> Self { diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 75d6c32b63..27b43b1b89 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -16,7 +16,7 @@ use std::{convert::TryFrom, fmt}; use serde::{Deserialize, Serialize}; -use zenoh_config::wrappers::EntityGlobalId; +use zenoh_config::{qos::PublisherLocalityConf, wrappers::EntityGlobalId}; #[cfg(feature = "unstable")] use zenoh_protocol::core::Reliability; use zenoh_protocol::{ @@ -50,6 +50,26 @@ pub(crate) enum Locality { Any, } +impl From for Locality { + fn from(value: PublisherLocalityConf) -> Self { + match value { + PublisherLocalityConf::SessionLocal => Self::SessionLocal, + PublisherLocalityConf::Remote => Self::Remote, + PublisherLocalityConf::Any => Self::Any, + } + } +} + +impl From for PublisherLocalityConf { + fn from(value: Locality) -> Self { + match value { + Locality::SessionLocal => Self::SessionLocal, + Locality::Remote => Self::Remote, + Locality::Any => Self::Any, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Default)] pub(crate) struct DataInfo { pub kind: SampleKind, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ca18c288db..15b7fb4d7b 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -32,8 +32,9 @@ use uhlc::Timestamp; use uhlc::HLC; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; -use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; +use zenoh_config::{qos::PublisherQoSConfig, unwrap_or_default, wrappers::ZenohId}; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, Wait}; +use zenoh_keyexpr::keyexpr_tree::KeBoxTree; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ declare::{DeclareToken, SubscriberId, TokenId, UndeclareToken}, @@ -150,12 +151,14 @@ pub(crate) struct SessionState { pub(crate) liveliness_queries: HashMap, pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, + pub(crate) publisher_qos_tree: KeBoxTree, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, aggregated_publishers: Vec, + publisher_qos_tree: KeBoxTree, ) -> SessionState { SessionState { primitives: None, @@ -185,6 +188,7 @@ impl SessionState { liveliness_queries: HashMap::new(), aggregated_subscribers, aggregated_publishers, + publisher_qos_tree, } } } @@ -664,9 +668,13 @@ impl Session { ) -> impl Resolve { ResolveClosure::new(move || { let router = runtime.router(); + let config = runtime.config().lock(); + let publisher_qos = config.0.qos().publication().clone(); + drop(config); let state = RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, + publisher_qos.into(), )); let session = Session(Arc::new(SessionInner { weak_counter: Mutex::new(0), diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 4995d05e70..14a3f2b657 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -62,3 +62,163 @@ async fn qos_pubsub() { assert_eq!(sample.congestion_control(), CongestionControl::Block); assert!(!sample.express()); } + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn qos_pubsub_overwrite_config() { + use zenoh::{qos::Reliability, sample::Locality}; + + let qos_config_overwrite = zenoh::Config::from_json5( + r#" + { + qos: { + publication: [ + { + key_exprs: ["test/qos/overwritten", "test/not_applicable/**"], + config: { + congestion_control: "drop", + express: false, + reliability: "best_effort", + allowed_destination: "any", + }, + }, + { + key_exprs: ["test/not_applicable"], + config: { + congestion_control: "drop", + express: false, + reliability: "best_effort", + allowed_destination: "any", + }, + }, + ] + } + } + "#, + ) + .unwrap(); + let session1 = ztimeout!(zenoh::open(qos_config_overwrite)).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + + let subscriber = ztimeout!(session2.declare_subscriber("test/qos/**")).unwrap(); + tokio::time::sleep(SLEEP).await; + + // Session API tests + + // Session API - overwritten PUT + ztimeout!(session1 + .put("test/qos/overwritten", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - overwritten DELETE + ztimeout!(session1 + .delete("test/qos/overwritten") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // Session API - non-overwritten PUT + ztimeout!(session1 + .put("test/qos/no_overwrite", "qos") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Session API - non-overwritten DELETE + ztimeout!(session1 + .delete("test/qos/no_overwrite") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // Publisher API tests + + let overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos/overwritten") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable) + .allowed_destination(Locality::SessionLocal)) + .unwrap(); + + let no_overwrite_config_publisher = ztimeout!(session1 + .declare_publisher("test/qos/no_overwrite") + .congestion_control(CongestionControl::Block) + .priority(Priority::DataLow) + .express(true) + .reliability(Reliability::Reliable)) + .unwrap(); + + // PublisherBuilder API - overwritten PUT + ztimeout!(overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // PublisherBuilder API - overwritten DELETE + ztimeout!(overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Drop); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(!sample.express()); + assert_eq!(sample.reliability(), Reliability::BestEffort); + + // PublisherBuilder API - non-overwritten PUT + ztimeout!(no_overwrite_config_publisher.put("qos")).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); + + // PublisherBuilder API - non-overwritten DELETE + ztimeout!(no_overwrite_config_publisher.delete()).unwrap(); + let sample = ztimeout!(subscriber.recv_async()).unwrap(); + + assert_eq!(sample.congestion_control(), CongestionControl::Block); + assert_eq!(sample.priority(), Priority::DataLow); + assert!(sample.express()); + assert_eq!(sample.reliability(), Reliability::Reliable); +}