Skip to content

Commit

Permalink
Merge branch 'protocol_changes' into encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 2, 2024
2 parents bb7ba70 + e41f768 commit 86e8b00
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 57 deletions.
2 changes: 0 additions & 2 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ where
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
self.write(&mut *writer, v)
}
Expand All @@ -58,7 +57,6 @@ where
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok(c)
Expand Down
16 changes: 5 additions & 11 deletions commons/zenoh-protocol/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub enum Consolidation {
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
/// Remove the duplicates of any samples based on the their timestamp.
Unique,
// Remove the duplicates of any samples based on the their timestamp.
// Unique,
}

impl Consolidation {
Expand All @@ -45,15 +45,9 @@ impl Consolidation {
use rand::prelude::SliceRandom;
let mut rng = rand::thread_rng();

*[
Self::None,
Self::Monotonic,
Self::Latest,
Self::Unique,
Self::Auto,
]
.choose(&mut rng)
.unwrap()
*[Self::None, Self::Monotonic, Self::Latest, Self::Auto]
.choose(&mut rng)
.unwrap()
}
}

Expand Down
45 changes: 8 additions & 37 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,13 @@ use std::collections::HashMap;
use std::future::Ready;
use std::time::Duration;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::zenoh::query::Consolidation;
use zenoh_result::ZResult;

/// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get).
pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType;

/// The kind of consolidation.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum ConsolidationMode {
/// No consolidation applied: multiple samples may be received for the same key-timestamp.
None,
/// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
/// has already been sent with the same key.
///
/// This optimizes latency while potentially reducing bandwidth.
///
/// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
/// been observed with the same key.
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
}

impl From<ConsolidationMode> for Consolidation {
fn from(val: ConsolidationMode) -> Self {
match val {
ConsolidationMode::None => Consolidation::None,
ConsolidationMode::Monotonic => Consolidation::Monotonic,
ConsolidationMode::Latest => Consolidation::Latest,
}
}
}
pub type ConsolidationMode = zenoh_protocol::zenoh::query::Consolidation;

/// The operation: either manual or automatic.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -65,30 +40,26 @@ pub enum Mode<T> {
/// The replies consolidation strategy to apply on replies to a [`get`](Session::get).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct QueryConsolidation {
pub(crate) mode: Mode<ConsolidationMode>,
pub(crate) mode: ConsolidationMode,
}

impl QueryConsolidation {
pub const DEFAULT: Self = Self::AUTO;
/// Automatic query consolidation strategy selection.
pub const AUTO: Self = Self { mode: Mode::Auto };
pub const AUTO: Self = Self {
mode: ConsolidationMode::Auto,
};

pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self {
Self {
mode: Mode::Manual(mode),
}
Self { mode }
}

/// Returns the requested [`ConsolidationMode`].
pub fn mode(&self) -> Mode<ConsolidationMode> {
pub fn mode(&self) -> ConsolidationMode {
self.mode
}
}
impl From<Mode<ConsolidationMode>> for QueryConsolidation {
fn from(mode: Mode<ConsolidationMode>) -> Self {
Self { mode }
}
}

impl From<ConsolidationMode> for QueryConsolidation {
fn from(mode: ConsolidationMode) -> Self {
Self::from_mode(mode)
Expand Down
7 changes: 5 additions & 2 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,6 @@ impl SyncResolve for ReplyBuilder<'_> {
let mut ext_attachment = None;
#[cfg(feature = "unstable")]
{
data_info.source_id = source_info.source_id;
data_info.source_sn = source_info.source_sn;
if let Some(attachment) = attachment {
ext_attachment = Some(attachment.into());
}
Expand All @@ -223,6 +221,11 @@ impl SyncResolve for ReplyBuilder<'_> {
}};
}

#[cfg(feature = "unstable")]
{
data_info.source_id = source_info.source_id;
data_info.source_sn = source_info.source_sn;
}
let ext_sinfo = if data_info.source_id.is_some() || data_info.source_sn.is_some() {
Some(zenoh::put::ext::SourceInfoType {
zid: data_info.source_id.unwrap_or_default(),
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1731,14 +1731,14 @@ impl Session {
log::trace!("get({}, {:?}, {:?})", selector, target, consolidation);
let mut state = zwrite!(self.state);
let consolidation = match consolidation.mode {
Mode::Auto => {
ConsolidationMode::Auto => {
if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) {
ConsolidationMode::None
} else {
ConsolidationMode::Latest
}
}
Mode::Manual(mode) => mode,
mode => mode,
};
let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst);
let nb_final = match destination {
Expand Down Expand Up @@ -1811,7 +1811,7 @@ impl Session {
ext_budget: None,
ext_timeout: Some(timeout),
payload: RequestBody::Query(zenoh_protocol::zenoh::Query {
consolidation: consolidation.into(),
consolidation,
parameters: selector.parameters().to_string(),
ext_sinfo: None,
ext_body: value.as_ref().map(|v| query::ext::QueryBodyType {
Expand All @@ -1832,7 +1832,7 @@ impl Session {
selector.parameters(),
qid,
target,
consolidation.into(),
consolidation,
value.as_ref().map(|v| query::ext::QueryBodyType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -2444,7 +2444,7 @@ impl Primitives for Session {
}
}
}
ConsolidationMode::Latest => {
Consolidation::Auto | ConsolidationMode::Latest => {
match query.replies.as_ref().unwrap().get(
new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(),
) {
Expand Down

0 comments on commit 86e8b00

Please sign in to comment.