diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index 55f25cd5ea..efac7b5671 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -39,7 +39,6 @@ where Consolidation::None => 1, Consolidation::Monotonic => 2, Consolidation::Latest => 3, - Consolidation::Unique => 4, }; self.write(&mut *writer, v) } @@ -58,7 +57,6 @@ where 1 => Consolidation::None, 2 => Consolidation::Monotonic, 3 => Consolidation::Latest, - 4 => Consolidation::Unique, _ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown }; Ok(c) diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index ac53b963f5..f1baaebe20 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -33,8 +33,8 @@ pub enum Consolidation { Monotonic, /// Holds back samples to only send the set of samples that had the highest timestamp for their key. Latest, - /// Remove the duplicates of any samples based on the their timestamp. - Unique, + // Remove the duplicates of any samples based on the their timestamp. + // Unique, } impl Consolidation { @@ -45,15 +45,9 @@ impl Consolidation { use rand::prelude::SliceRandom; let mut rng = rand::thread_rng(); - *[ - Self::None, - Self::Monotonic, - Self::Latest, - Self::Unique, - Self::Auto, - ] - .choose(&mut rng) - .unwrap() + *[Self::None, Self::Monotonic, Self::Latest, Self::Auto] + .choose(&mut rng) + .unwrap() } } diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index 7a7a867cd8..a848913c7a 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -22,38 +22,13 @@ use std::collections::HashMap; use std::future::Ready; use std::time::Duration; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; -use zenoh_protocol::zenoh::query::Consolidation; use zenoh_result::ZResult; /// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get). pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType; /// The kind of consolidation. -#[derive(Debug, Clone, PartialEq, Eq, Copy)] -pub enum ConsolidationMode { - /// No consolidation applied: multiple samples may be received for the same key-timestamp. - None, - /// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp - /// has already been sent with the same key. - /// - /// This optimizes latency while potentially reducing bandwidth. - /// - /// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already - /// been observed with the same key. - Monotonic, - /// Holds back samples to only send the set of samples that had the highest timestamp for their key. - Latest, -} - -impl From for Consolidation { - fn from(val: ConsolidationMode) -> Self { - match val { - ConsolidationMode::None => Consolidation::None, - ConsolidationMode::Monotonic => Consolidation::Monotonic, - ConsolidationMode::Latest => Consolidation::Latest, - } - } -} +pub type ConsolidationMode = zenoh_protocol::zenoh::query::Consolidation; /// The operation: either manual or automatic. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -65,30 +40,26 @@ pub enum Mode { /// The replies consolidation strategy to apply on replies to a [`get`](Session::get). #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct QueryConsolidation { - pub(crate) mode: Mode, + pub(crate) mode: ConsolidationMode, } impl QueryConsolidation { pub const DEFAULT: Self = Self::AUTO; /// Automatic query consolidation strategy selection. - pub const AUTO: Self = Self { mode: Mode::Auto }; + pub const AUTO: Self = Self { + mode: ConsolidationMode::Auto, + }; pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self { - Self { - mode: Mode::Manual(mode), - } + Self { mode } } /// Returns the requested [`ConsolidationMode`]. - pub fn mode(&self) -> Mode { + pub fn mode(&self) -> ConsolidationMode { self.mode } } -impl From> for QueryConsolidation { - fn from(mode: Mode) -> Self { - Self { mode } - } -} + impl From for QueryConsolidation { fn from(mode: ConsolidationMode) -> Self { Self::from_mode(mode) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 5acd4c542a..fc7174ad4c 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -213,8 +213,6 @@ impl SyncResolve for ReplyBuilder<'_> { let mut ext_attachment = None; #[cfg(feature = "unstable")] { - data_info.source_id = source_info.source_id; - data_info.source_sn = source_info.source_sn; if let Some(attachment) = attachment { ext_attachment = Some(attachment.into()); } @@ -223,6 +221,11 @@ impl SyncResolve for ReplyBuilder<'_> { }}; } + #[cfg(feature = "unstable")] + { + data_info.source_id = source_info.source_id; + data_info.source_sn = source_info.source_sn; + } let ext_sinfo = if data_info.source_id.is_some() || data_info.source_sn.is_some() { Some(zenoh::put::ext::SourceInfoType { zid: data_info.source_id.unwrap_or_default(), diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index c09b9ae358..aa99cd7105 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1731,14 +1731,14 @@ impl Session { log::trace!("get({}, {:?}, {:?})", selector, target, consolidation); let mut state = zwrite!(self.state); let consolidation = match consolidation.mode { - Mode::Auto => { + ConsolidationMode::Auto => { if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) { ConsolidationMode::None } else { ConsolidationMode::Latest } } - Mode::Manual(mode) => mode, + mode => mode, }; let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst); let nb_final = match destination { @@ -1811,7 +1811,7 @@ impl Session { ext_budget: None, ext_timeout: Some(timeout), payload: RequestBody::Query(zenoh_protocol::zenoh::Query { - consolidation: consolidation.into(), + consolidation, parameters: selector.parameters().to_string(), ext_sinfo: None, ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { @@ -1832,7 +1832,7 @@ impl Session { selector.parameters(), qid, target, - consolidation.into(), + consolidation, value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, @@ -2444,7 +2444,7 @@ impl Primitives for Session { } } } - ConsolidationMode::Latest => { + Consolidation::Auto | ConsolidationMode::Latest => { match query.replies.as_ref().unwrap().get( new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(), ) {