From 7d02e04e2f024c5cd71dbcfe9b19d3a9fdada1fe Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Tue, 2 Apr 2024 01:23:37 +0200 Subject: [PATCH] unfinished --- Cargo.lock | 50 ++-- zenoh/src/api.rs | 2 +- zenoh/src/api/key_expr.rs | 2 +- zenoh/src/api/publication.rs | 31 +-- zenoh/src/api/query.rs | 5 - zenoh/src/api/queryable.rs | 27 +- zenoh/src/api/sample.rs | 446 -------------------------------- zenoh/src/api/sample/builder.rs | 24 +- zenoh/src/api/sample/mod.rs | 4 - zenoh/src/api/session.rs | 5 +- zenoh/src/api/subscriber.rs | 6 +- zenoh/src/lib.rs | 8 +- 12 files changed, 60 insertions(+), 550 deletions(-) delete mode 100644 zenoh/src/api/sample.rs diff --git a/Cargo.lock b/Cargo.lock index f22e3c9a06..3cbed04284 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,9 +165,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.13" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" dependencies = [ "anstyle", "anstyle-parse", @@ -1103,9 +1103,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.3" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" dependencies = [ "anstream", "anstyle", @@ -1122,9 +1122,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.4.4" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" +checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" dependencies = [ "serde", ] @@ -1541,9 +1541,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", @@ -1854,9 +1854,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ "serde", "value-bag", @@ -2865,9 +2865,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring 0.17.6", @@ -2923,9 +2923,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.1" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" [[package]] name = "rustls-webpki" @@ -3701,9 +3701,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -3743,7 +3743,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.3", + "rustls 0.22.2", "rustls-pki-types", "tokio", ] @@ -4030,9 +4030,9 @@ dependencies = [ [[package]] name = "value-bag" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" dependencies = [ "value-bag-serde1", "value-bag-sval2", @@ -4040,9 +4040,9 @@ dependencies = [ [[package]] name = "value-bag-serde1" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc35703541cbccb5278ef7b589d79439fc808ff0b5867195a3230f9a47421d39" +checksum = "b0b9f3feef403a50d4d67e9741a6d8fc688bcbb4e4f31bd4aab72cc690284394" dependencies = [ "erased-serde", "serde", @@ -4051,9 +4051,9 @@ dependencies = [ [[package]] name = "value-bag-sval2" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "285b43c29d0b4c0e65aad24561baee67a1b69dc9be9375d4a85138cbf556f7f8" +checksum = "30b24f4146b6f3361e91cbf527d1fb35e9376c3c0cef72ca5ec5af6d640fad7d" dependencies = [ "sval", "sval_buffer", @@ -4670,7 +4670,7 @@ dependencies = [ "flume", "futures", "log", - "rustls 0.22.3", + "rustls 0.22.2", "rustls-webpki 0.102.2", "serde", "tokio", @@ -4757,7 +4757,7 @@ dependencies = [ "base64 0.21.4", "futures", "log", - "rustls 0.22.3", + "rustls 0.22.2", "rustls-pemfile 2.0.0", "rustls-pki-types", "rustls-webpki 0.102.2", diff --git a/zenoh/src/api.rs b/zenoh/src/api.rs index 2585a682da..00f9b20c7c 100644 --- a/zenoh/src/api.rs +++ b/zenoh/src/api.rs @@ -30,6 +30,6 @@ pub mod queryable; pub mod sample; mod scouting; pub mod selector; -mod subscriber; +pub mod subscriber; mod time; pub mod value; diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index ec0d4b419c..9cf4983d7c 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -54,7 +54,7 @@ use std::{ str::FromStr, }; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; -use zenoh_keyexpr::{keyexpr, OwnedKemtExpr}; +use zenoh_keyexpr::{keyexpr, OwnedKeyExpr}; use zenoh_protocol::{ core::{key_expr::canon::Canonizable, ExprId, WireExpr}, network::{declare, DeclareBody, Mapping, UndeclareKeyExpr}, diff --git a/zenoh/src/api/publication.rs b/zenoh/src/api/publication.rs index 322da4f766..1b08e9bf8d 100644 --- a/zenoh/src/api/publication.rs +++ b/zenoh/src/api/publication.rs @@ -13,18 +13,14 @@ // //! Publishing primitives. -<<<<<<< HEAD:zenoh/src/api/publication.rs -======= -use crate::net::primitives::Primitives; -use crate::prelude::*; ->>>>>>> sample_api_rework:zenoh/src/publication.rs #[zenoh_macros::unstable] use crate::api::sample::attachment::Attachment; #[zenoh_macros::unstable] use crate::api::sample::SourceInfo; use crate::api::{ encoding::Encoding, - handlers::{locked, Callback, DefaultHandler, IntoHandler}, + handlers::locked, + handlers::DefaultHandler, key_expr::{KeyExpr, KeyExprInner}, payload::Payload, sample::{ @@ -33,24 +29,18 @@ use crate::api::{ }, session::{SessionRef, Undeclarable}, value::Value, - Id, }; use crate::net::primitives::Primitives; use std::future::Ready; -use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, SyncResolve}; +use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, Result as ZResult, SyncResolve}; use zenoh_keyexpr::keyexpr; -use zenoh_protocol::core::EntityGlobalId; -use zenoh_protocol::core::EntityId; -use zenoh_protocol::network::push::ext; -use zenoh_protocol::network::Mapping; -use zenoh_protocol::network::Push; -use zenoh_protocol::zenoh::Del; -use zenoh_protocol::zenoh::PushBody; -use zenoh_protocol::zenoh::Put; -use zenoh_result::ZResult; - -/// The kind of congestion control. -pub use zenoh_protocol::core::CongestionControl; +use zenoh_protocol::{ + core::CongestionControl, + network::Mapping, + zenoh::{Del, Put}, +}; +use zenoh_protocol::{core::EntityGlobalId, network::Push}; +use zenoh_protocol::{network::push::ext, zenoh::PushBody}; #[derive(Debug, Clone)] pub struct PublicationBuilderPut { @@ -246,6 +236,7 @@ use std::convert::TryFrom; use std::convert::TryInto; use std::pin::Pin; use std::task::{Context, Poll}; +use zenoh_protocol::core::EntityId; use zenoh_result::Error; #[zenoh_macros::unstable] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index e5db9bbeef..44fb0bfa93 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -13,11 +13,6 @@ // //! Query primitives. -<<<<<<< HEAD:zenoh/src/api/query.rs -======= -use crate::handlers::{locked, Callback, DefaultHandler}; -use crate::prelude::*; ->>>>>>> sample_api_rework:zenoh/src/query.rs #[zenoh_macros::unstable] use crate::api::sample::SourceInfo; use crate::api::{ diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 6872fa0e2d..9d1c1c75ff 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -13,7 +13,6 @@ // //! Queryable primitives. -<<<<<<< HEAD:zenoh/src/api/queryable.rs #[zenoh_macros::unstable] use crate::api::query::ReplyKeyExpr; #[zenoh_macros::unstable] @@ -27,8 +26,8 @@ use crate::api::{ sample::{ attachment::Attachment, builder::{ - DeleteSampleBuilder, PutSampleBuilder, QoSBuilderTrait, SampleBuilder, - SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, + QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, + ValueBuilderTrait, }, Locality, Sample, SampleKind, }, @@ -38,20 +37,6 @@ use crate::api::{ Id, }; use crate::net::primitives::Primitives; -======= - -use crate::encoding::Encoding; -use crate::handlers::{locked, DefaultHandler}; -use crate::net::primitives::Primitives; -use crate::prelude::*; -use crate::sample::builder::SampleBuilder; -use crate::sample::{QoSBuilder, SourceInfo}; -use crate::Id; -use crate::SessionRef; -use crate::Undeclarable; -#[cfg(feature = "unstable")] -use crate::{query::ReplyKeyExpr, sample::Attachment}; ->>>>>>> sample_api_rework:zenoh/src/queryable.rs use std::fmt; use std::future::Ready; use std::ops::Deref; @@ -68,7 +53,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use super::query::_REPLY_KEY_EXPR_ANY_SEL_PARAM; +use super::{query::_REPLY_KEY_EXPR_ANY_SEL_PARAM, sample::QoSBuilder}; pub(crate) struct QueryInner { /// The key expression of this Query. @@ -176,6 +161,7 @@ impl Query { encoding: Encoding::default(), }, timestamp: None, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, } @@ -214,6 +200,7 @@ impl Query { qos: response::ext::QoSType::RESPONSE.into(), kind: ReplyBuilderDelete, timestamp: None, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, } @@ -283,8 +270,8 @@ impl AsyncResolve for ReplySample<'_> { #[derive(Debug)] pub struct ReplyBuilderPut { - payload: super::Payload, - encoding: super::Encoding, + payload: Payload, + encoding: Encoding, } #[derive(Debug)] pub struct ReplyBuilderDelete; diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs deleted file mode 100644 index c7ec2cf4cb..0000000000 --- a/zenoh/src/api/sample.rs +++ /dev/null @@ -1,446 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! Sample primitives -use crate::api::{ - encoding::Encoding, - key_expr::KeyExpr, - payload::Payload, - publication::Priority, - sample::builder::{QoSBuilderTrait, ValueBuilderTrait}, - time::Timestamp, - value::Value, -}; -#[zenoh_macros::unstable] -use serde::Serialize; -use std::{convert::TryFrom, fmt}; -use zenoh_protocol::core::EntityGlobalId; -use zenoh_protocol::network::declare::ext::QoSType; -use zenoh_protocol::{core::CongestionControl, zenoh}; - -use self::attachment::Attachment; - -pub mod attachment; -pub mod builder; - -pub type SourceSn = u64; - -/// The locality of samples to be received by subscribers or targeted by publishers. -#[zenoh_macros::unstable] -#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)] -pub enum Locality { - SessionLocal, - Remote, - #[default] - Any, -} -#[cfg(not(feature = "unstable"))] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub(crate) enum Locality { - SessionLocal, - Remote, - #[default] - Any, -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub(crate) struct DataInfo { - pub kind: SampleKind, - pub encoding: Option, - pub timestamp: Option, - pub source_id: Option, - pub source_sn: Option, - pub qos: QoS, -} - -pub(crate) trait DataInfoIntoSample { - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into; -} - -impl DataInfoIntoSample for DataInfo { - // This function is for internal use only. - // Technically it may create invalid sample (e.g. a delete sample with a payload and encoding) - // The test for it is intentionally not added to avoid inserting extra "if" into hot path. - // The correctness of the data should be ensured by the caller. - #[inline] - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - Sample { - key_expr: key_expr.into(), - payload: payload.into(), - kind: self.kind, - encoding: self.encoding.unwrap_or_default(), - timestamp: self.timestamp, - qos: self.qos, - #[cfg(feature = "unstable")] - source_info: SourceInfo { - source_id: self.source_id, - source_sn: self.source_sn, - }, - #[cfg(feature = "unstable")] - attachment, - } - } -} - -impl DataInfoIntoSample for Option { - #[inline] - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - if let Some(data_info) = self { - data_info.into_sample( - key_expr, - payload, - #[cfg(feature = "unstable")] - attachment, - ) - } else { - Sample { - key_expr: key_expr.into(), - payload: payload.into(), - kind: SampleKind::Put, - encoding: Encoding::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment, - } - } - } -} - -/// Informations on the source of a zenoh [`Sample`]. -#[zenoh_macros::unstable] -#[derive(Debug, Clone)] -pub struct SourceInfo { - /// The [`EntityGlobalId`] of the zenoh entity that published the concerned [`Sample`]. - pub source_id: Option, - /// The sequence number of the [`Sample`] from the source. - pub source_sn: Option, -} - -#[test] -#[cfg(feature = "unstable")] -fn source_info_stack_size() { - use zenoh_protocol::core::ZenohId; - - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::>(), 17); - assert_eq!(std::mem::size_of::>(), 16); - assert_eq!(std::mem::size_of::(), 17 + 16 + 7); -} - -#[zenoh_macros::unstable] -impl SourceInfo { - pub(crate) fn empty() -> Self { - SourceInfo { - source_id: None, - source_sn: None, - } - } - pub(crate) fn is_empty(&self) -> bool { - self.source_id.is_none() && self.source_sn.is_none() - } -} - -<<<<<<<< HEAD:zenoh/src/api/sample.rs -#[cfg(feature = "unstable")] -======== -#[zenoh_macros::unstable] ->>>>>>>> sample_api_rework:zenoh/src/api/sample/mod.rs -impl From for Option { - fn from(source_info: SourceInfo) -> Option { - if source_info.is_empty() { - None - } else { - Some(zenoh::put::ext::SourceInfoType { - id: source_info.source_id.unwrap_or_default(), - sn: source_info.source_sn.unwrap_or_default() as u32, - }) - } - } -} - -#[zenoh_macros::unstable] -impl From for SourceInfo { - fn from(data_info: DataInfo) -> Self { - SourceInfo { - source_id: data_info.source_id, - source_sn: data_info.source_sn, - } - } -} - -#[zenoh_macros::unstable] -impl From> for SourceInfo { - fn from(data_info: Option) -> Self { - match data_info { - Some(data_info) => data_info.into(), - None => SourceInfo::empty(), - } - } -} - -/// The kind of a `Sample`. -#[repr(u8)] -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -pub enum SampleKind { - /// if the `Sample` was issued by a `put` operation. - #[default] - Put = 0, - /// if the `Sample` was issued by a `delete` operation. - Delete = 1, -} - -impl fmt::Display for SampleKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SampleKind::Put => write!(f, "PUT"), - SampleKind::Delete => write!(f, "DELETE"), - } - } -} - -impl TryFrom for SampleKind { - type Error = u64; - fn try_from(kind: u64) -> Result { - match kind { - 0 => Ok(SampleKind::Put), - 1 => Ok(SampleKind::Delete), - _ => Err(kind), - } - } -} - -/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields. -pub struct SampleFields { - pub key_expr: KeyExpr<'static>, - pub payload: Payload, - pub kind: SampleKind, - pub encoding: Encoding, - pub timestamp: Option, - pub express: bool, - pub priority: Priority, - pub congestion_control: CongestionControl, - #[cfg(feature = "unstable")] - pub source_info: SourceInfo, - #[cfg(feature = "unstable")] - pub attachment: Option, -} - -impl From for SampleFields { - fn from(sample: Sample) -> Self { - SampleFields { - key_expr: sample.key_expr, - payload: sample.payload, - kind: sample.kind, - encoding: sample.encoding, - timestamp: sample.timestamp, - express: sample.qos.express(), - priority: sample.qos.priority(), - congestion_control: sample.qos.congestion_control(), - #[cfg(feature = "unstable")] - source_info: sample.source_info, - #[cfg(feature = "unstable")] - attachment: sample.attachment, - } - } -} - -/// A zenoh sample. -#[non_exhaustive] -#[derive(Clone, Debug)] -pub struct Sample { - pub(crate) key_expr: KeyExpr<'static>, - pub(crate) payload: Payload, - pub(crate) kind: SampleKind, - pub(crate) encoding: Encoding, - pub(crate) timestamp: Option, - pub(crate) qos: QoS, - - #[cfg(feature = "unstable")] - pub(crate) source_info: SourceInfo, - - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, -} - -impl Sample { - /// Gets the key expression on which this Sample was published. - #[inline] - pub fn key_expr(&self) -> &KeyExpr<'static> { - &self.key_expr - } - - /// Gets the payload of this Sample. - #[inline] - pub fn payload(&self) -> &Payload { - &self.payload - } - - /// Gets the kind of this Sample. - #[inline] - pub fn kind(&self) -> SampleKind { - self.kind - } - - /// Gets the encoding of this sample - #[inline] - pub fn encoding(&self) -> &Encoding { - &self.encoding - } - - /// Gets the timestamp of this Sample. - #[inline] - pub fn timestamp(&self) -> Option<&Timestamp> { - self.timestamp.as_ref() - } - - /// Gets the quality of service settings this Sample was sent with. - #[inline] - pub fn qos(&self) -> &QoS { - &self.qos - } - - /// Gets infos on the source of this Sample. - #[zenoh_macros::unstable] - #[inline] - pub fn source_info(&self) -> &SourceInfo { - &self.source_info - } - - /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. - #[zenoh_macros::unstable] - #[inline] - pub fn attachment(&self) -> Option<&Attachment> { - self.attachment.as_ref() - } -} - -impl From for Value { - fn from(sample: Sample) -> Self { - Value::new(sample.payload).encoding(sample.encoding) - } -} - -/// Structure containing quality of service data -#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] -pub struct QoS { - inner: QoSType, -} - -#[derive(Debug)] -pub struct QoSBuilder(QoS); - -impl From for QoSBuilder { - fn from(qos: QoS) -> Self { - QoSBuilder(qos) - } -} - -impl From for QoSBuilder { - fn from(qos: QoSType) -> Self { - QoSBuilder(QoS { inner: qos }) - } -} - -impl From for QoS { - fn from(builder: QoSBuilder) -> Self { - builder.0 - } -} - -impl QoSBuilderTrait for QoSBuilder { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let mut inner = self.0.inner; - inner.set_congestion_control(congestion_control); - Self(QoS { inner }) - } - - fn priority(self, priority: Priority) -> Self { - let mut inner = self.0.inner; - inner.set_priority(priority.into()); - Self(QoS { inner }) - } - - fn express(self, is_express: bool) -> Self { - let mut inner = self.0.inner; - inner.set_is_express(is_express); - Self(QoS { inner }) - } -} - -impl QoS { - /// Gets priority of the message. - pub fn priority(&self) -> Priority { - match Priority::try_from(self.inner.get_priority()) { - Ok(p) => p, - Err(e) => { - log::trace!( - "Failed to convert priority: {}; replacing with default value", - e.to_string() - ); - Priority::default() - } - } - } - - /// Gets congestion control of the message. - pub fn congestion_control(&self) -> CongestionControl { - self.inner.get_congestion_control() - } - - /// Gets express flag value. If `true`, the message is not batched during transmission, in order to reduce latency. - pub fn express(&self) -> bool { - self.inner.is_express() - } -} - -impl From for QoS { - fn from(qos: QoSType) -> Self { - QoS { inner: qos } - } -} - -impl From for QoSType { - fn from(qos: QoS) -> Self { - qos.inner - } -} diff --git a/zenoh/src/api/sample/builder.rs b/zenoh/src/api/sample/builder.rs index 776c6111ac..6f20c18391 100644 --- a/zenoh/src/api/sample/builder.rs +++ b/zenoh/src/api/sample/builder.rs @@ -1,3 +1,5 @@ +use std::marker::PhantomData; + // // Copyright (c) 2024 ZettaScale Technology // @@ -11,7 +13,6 @@ // Contributors: // ZettaScale Zenoh Team, // -<<<<<<< HEAD:zenoh/src/api/sample/builder.rs #[zenoh_macros::unstable] use crate::api::sample::Attachment; #[zenoh_macros::unstable] @@ -21,33 +22,14 @@ use crate::api::{ key_expr::KeyExpr, payload::Payload, publication::Priority, + sample::SampleKind, sample::{QoS, QoSBuilder, Sample}, value::Value, }; -======= - -use std::marker::PhantomData; - -#[cfg(feature = "unstable")] -use crate::sample::Attachment; -use crate::sample::QoS; -use crate::sample::QoSBuilder; -#[cfg(feature = "unstable")] -use crate::sample::SourceInfo; -use crate::Encoding; -use crate::KeyExpr; -use crate::Payload; -use crate::Priority; -use crate::Sample; -use crate::SampleKind; -use crate::Value; ->>>>>>> sample_api_rework:zenoh/src/sample/builder.rs use uhlc::Timestamp; use zenoh_core::zresult; use zenoh_protocol::core::CongestionControl; -use super::SampleKind; - pub trait QoSBuilderTrait { /// Change the `congestion_control` to apply when routing the data. fn congestion_control(self, congestion_control: CongestionControl) -> Self; diff --git a/zenoh/src/api/sample/mod.rs b/zenoh/src/api/sample/mod.rs index c7ec2cf4cb..8bcba3af4e 100644 --- a/zenoh/src/api/sample/mod.rs +++ b/zenoh/src/api/sample/mod.rs @@ -180,11 +180,7 @@ impl SourceInfo { } } -<<<<<<<< HEAD:zenoh/src/api/sample.rs -#[cfg(feature = "unstable")] -======== #[zenoh_macros::unstable] ->>>>>>>> sample_api_rework:zenoh/src/api/sample/mod.rs impl From for Option { fn from(source_info: SourceInfo) -> Option { if source_info.is_empty() { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 8bfd3460bd..0bcc32b335 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -53,8 +53,8 @@ use zenoh_core::Resolvable; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; #[cfg(feature = "unstable")] use zenoh_protocol::network::declare::SubscriberId; -use zenoh_protocol::network::AtomicRequestId; use zenoh_protocol::network::RequestId; +use zenoh_protocol::network::{request, AtomicRequestId}; use zenoh_protocol::zenoh::reply::ReplyBody; use zenoh_protocol::zenoh::Del; use zenoh_protocol::zenoh::Put; @@ -69,8 +69,7 @@ use zenoh_protocol::{ subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, }, - ext, - request::{self, ext::TargetType, Request}, + request::{ext::TargetType, Request}, Mapping, Push, Response, ResponseFinal, }, zenoh::{ diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index e33c05bb0f..752072b804 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -13,6 +13,8 @@ // //! Subscribing primitives. +#[cfg(not(feature = "unstable"))] +pub use crate::api::query::Mode; use crate::api::{ handlers::{locked, Callback, DefaultHandler, IntoHandler}, key_expr::KeyExpr, @@ -27,12 +29,10 @@ use std::sync::Arc; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; #[cfg(feature = "unstable")] use zenoh_protocol::core::EntityGlobalId; +pub use zenoh_protocol::core::Reliability; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; use zenoh_result::ZResult; -/// The kind of reliability. -pub use zenoh_protocol::core::Reliability; - pub(crate) struct SubscriberState { pub(crate) id: Id, pub(crate) remote_id: Id, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 77a4d3b641..0e1b83ead9 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -140,7 +140,7 @@ pub mod sample { builder::{ QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, }, - Sample, SampleKind, + QoSBuilder, Sample, SampleKind, }, value::Value, }; @@ -160,6 +160,7 @@ pub mod queryable { } pub mod query { + pub use crate::api::query::Mode; pub use crate::api::query::Reply; } @@ -167,6 +168,11 @@ pub mod publication { pub use crate::api::publication::Priority; } +pub mod subscriber { + pub use crate::api::subscriber::FlumeSubscriber; + pub use crate::api::subscriber::Subscriber; +} + pub mod handlers { pub use crate::api::handlers::IntoHandler; pub use crate::api::handlers::RingBuffer;