From d3da6465eeec5b377efe6926e412b6fb956eba65 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 9 Feb 2024 11:11:41 +0100 Subject: [PATCH] ConsolidationMode moved into the API --- commons/zenoh-protocol/src/zenoh/query.rs | 14 +++-------- zenoh-ext/src/subscriber_ext.rs | 2 +- zenoh/src/liveliness.rs | 2 +- zenoh/src/query.rs | 30 +++++++++++++++++++++-- zenoh/src/session.rs | 2 +- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 7432840492..ff137856eb 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::{common::ZExtUnknown, core::ConsolidationMode}; +use crate::common::ZExtUnknown; use alloc::{string::String, vec::Vec}; /// The kind of consolidation. @@ -38,6 +38,8 @@ pub enum Consolidation { } impl Consolidation { + pub const DEFAULT: Self = Self::Auto; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::prelude::SliceRandom; @@ -55,16 +57,6 @@ impl Consolidation { } } -impl From for Consolidation { - fn from(val: ConsolidationMode) -> Self { - match val { - ConsolidationMode::None => Consolidation::None, - ConsolidationMode::Monotonic => Consolidation::Monotonic, - ConsolidationMode::Latest => Consolidation::Latest, - } - } -} - /// # Query message /// /// ```text diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index e007c70504..8fd809495d 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -338,7 +338,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> origin: Locality::default(), query_selector: None, query_target: QueryTarget::default(), - query_consolidation: QueryConsolidation::default(), + query_consolidation: QueryConsolidation::DEFAULT, query_accept_replies: ReplyKeyExpr::MatchingQuery, query_timeout: Duration::from_secs(10), handler: self.handler, diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 0883041bb7..a41ef4ebc7 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -748,7 +748,7 @@ where &self.key_expr?.into(), &Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)), QueryTarget::default(), - QueryConsolidation::default(), + QueryConsolidation::DEFAULT, Locality::default(), self.timeout, None, diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index c4f3fb35e9..6e820ad4ee 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -23,13 +23,38 @@ use std::collections::HashMap; use std::future::Ready; use std::time::Duration; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; +use zenoh_protocol::zenoh::query::Consolidation; use zenoh_result::ZResult; /// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get). pub use zenoh_protocol::core::QueryTarget; /// The kind of consolidation. -pub use zenoh_protocol::core::ConsolidationMode; +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum ConsolidationMode { + /// No consolidation applied: multiple samples may be received for the same key-timestamp. + None, + /// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp + /// has already been sent with the same key. + /// + /// This optimizes latency while potentially reducing bandwidth. + /// + /// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already + /// been observed with the same key. + Monotonic, + /// Holds back samples to only send the set of samples that had the highest timestamp for their key. + Latest, +} + +impl From for Consolidation { + fn from(val: ConsolidationMode) -> Self { + match val { + ConsolidationMode::None => Consolidation::None, + ConsolidationMode::Monotonic => Consolidation::Monotonic, + ConsolidationMode::Latest => Consolidation::Latest, + } + } +} /// The operation: either manual or automatic. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -45,6 +70,7 @@ pub struct QueryConsolidation { } impl QueryConsolidation { + pub const DEFAULT: Self = Self::AUTO; /// Automatic query consolidation strategy selection. pub const AUTO: Self = Self { mode: Mode::Auto }; @@ -72,7 +98,7 @@ impl From for QueryConsolidation { impl Default for QueryConsolidation { fn default() -> Self { - QueryConsolidation::AUTO + Self::DEFAULT } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 580e11cd75..ba5d23c0e3 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -776,7 +776,7 @@ impl Session { selector, scope: Ok(None), target: QueryTarget::default(), - consolidation: QueryConsolidation::default(), + consolidation: QueryConsolidation::DEFAULT, destination: Locality::default(), timeout, value: None,