From 43a49379c0f126032f89505789d158b908c62ad6 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 18:18:28 +0100 Subject: [PATCH 1/5] SampleBuilder uses generics --- Cargo.lock | 50 ++--- zenoh/src/queryable.rs | 97 ++-------- zenoh/src/sample/builder.rs | 363 +++++++++++++++--------------------- 3 files changed, 189 insertions(+), 321 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3ea8978b5..9dff82ad80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,9 +165,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.12" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" dependencies = [ "anstyle", "anstyle-parse", @@ -1103,9 +1103,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" dependencies = [ "anstream", "anstyle", @@ -1122,9 +1122,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.3.31" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" +checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" dependencies = [ "serde", ] @@ -1541,9 +1541,9 @@ dependencies = [ [[package]] name = "http" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -1854,9 +1854,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" dependencies = [ "serde", "value-bag", @@ -2865,9 +2865,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" dependencies = [ "log", "ring 0.17.6", @@ -2923,9 +2923,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" +checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" [[package]] name = "rustls-webpki" @@ -3701,9 +3701,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -3743,7 +3743,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.2", + "rustls 0.22.3", "rustls-pki-types", "tokio", ] @@ -4030,9 +4030,9 @@ dependencies = [ [[package]] name = "value-bag" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" +checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" dependencies = [ "value-bag-serde1", "value-bag-sval2", @@ -4040,9 +4040,9 @@ dependencies = [ [[package]] name = "value-bag-serde1" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0b9f3feef403a50d4d67e9741a6d8fc688bcbb4e4f31bd4aab72cc690284394" +checksum = "cc35703541cbccb5278ef7b589d79439fc808ff0b5867195a3230f9a47421d39" dependencies = [ "erased-serde", "serde", @@ -4051,9 +4051,9 @@ dependencies = [ [[package]] name = "value-bag-sval2" -version = "1.4.1" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b24f4146b6f3361e91cbf527d1fb35e9376c3c0cef72ca5ec5af6d640fad7d" +checksum = "285b43c29d0b4c0e65aad24561baee67a1b69dc9be9375d4a85138cbf556f7f8" dependencies = [ "sval", "sval_buffer", @@ -4676,7 +4676,7 @@ dependencies = [ "flume", "futures", "log", - "rustls 0.22.2", + "rustls 0.22.3", "rustls-webpki 0.102.2", "serde", "tokio", @@ -4763,7 +4763,7 @@ dependencies = [ "base64 0.21.4", "futures", "log", - "rustls 0.22.2", + "rustls 0.22.3", "rustls-pemfile 2.0.0", "rustls-pki-types", "rustls-webpki 0.102.2", diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 5df0d73d44..0e977f3def 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -19,8 +19,8 @@ use crate::handlers::{locked, DefaultHandler}; use crate::net::primitives::Primitives; use crate::prelude::*; use crate::sample::builder::{ - DeleteSampleBuilder, PutSampleBuilder, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, - TimestampBuilderTrait, ValueBuilderTrait, + OpDelete, OpPut, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, + ValueBuilderTrait, }; use crate::sample::SourceInfo; use crate::Id; @@ -115,10 +115,10 @@ impl Query { #[inline(always)] #[cfg(feature = "unstable")] #[doc(hidden)] - pub fn reply_sample(&self, sample: Sample) -> ReplySampleBuilder<'_> { - ReplySampleBuilder { + pub fn reply_sample(&self, sample: Sample) -> ReplySample<'_> { + ReplySample { query: self, - sample_builder: sample.into(), + sample, } } @@ -168,7 +168,7 @@ impl Query { IntoKeyExpr: Into>, { let sample_builder = - DeleteSampleBuilder::new(key_expr).with_qos(response::ext::QoSType::RESPONSE.into()); + SampleBuilder::delete(key_expr).qos(response::ext::QoSType::RESPONSE.into()); ReplyDelBuilder { query: self, sample_builder, @@ -214,91 +214,22 @@ impl fmt::Display for Query { } } -pub struct ReplySampleBuilder<'a> { +pub struct ReplySample<'a> { query: &'a Query, - sample_builder: SampleBuilder, + sample: Sample, } -impl<'a> ReplySampleBuilder<'a> { - pub fn put(self, payload: IntoPayload) -> ReplyBuilder<'a> - where - IntoPayload: Into, - { - let builder = ReplyBuilder { - query: self.query, - sample_builder: self.sample_builder.into(), - }; - builder.payload(payload) - } - pub fn delete(self) -> ReplyDelBuilder<'a> { - ReplyDelBuilder { - query: self.query, - sample_builder: self.sample_builder.into(), - } - } -} - -impl TimestampBuilderTrait for ReplySampleBuilder<'_> { - fn timestamp>>(self, timestamp: T) -> Self { - Self { - sample_builder: self.sample_builder.timestamp(timestamp), - ..self - } - } -} - -impl SampleBuilderTrait for ReplySampleBuilder<'_> { - #[cfg(feature = "unstable")] - fn source_info(self, source_info: SourceInfo) -> Self { - Self { - sample_builder: self.sample_builder.source_info(source_info), - ..self - } - } - - #[cfg(feature = "unstable")] - fn attachment>>(self, attachment: T) -> Self { - Self { - sample_builder: self.sample_builder.attachment(attachment), - ..self - } - } -} - -impl QoSBuilderTrait for ReplySampleBuilder<'_> { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - Self { - sample_builder: self.sample_builder.congestion_control(congestion_control), - ..self - } - } - - fn priority(self, priority: Priority) -> Self { - Self { - sample_builder: self.sample_builder.priority(priority), - ..self - } - } - - fn express(self, is_express: bool) -> Self { - Self { - sample_builder: self.sample_builder.express(is_express), - ..self - } - } -} - -impl Resolvable for ReplySampleBuilder<'_> { +impl Resolvable for ReplySample<'_> { type To = ZResult<()>; } -impl SyncResolve for ReplySampleBuilder<'_> { +impl SyncResolve for ReplySample<'_> { fn res_sync(self) -> ::To { - self.query._reply_sample(self.sample_builder.into()) + self.query._reply_sample(self.sample) } } -impl AsyncResolve for ReplySampleBuilder<'_> { +impl AsyncResolve for ReplySample<'_> { type Future = Ready; fn res_async(self) -> Self::Future { @@ -311,7 +242,7 @@ impl AsyncResolve for ReplySampleBuilder<'_> { #[derive(Debug)] pub struct ReplyBuilder<'a> { query: &'a Query, - sample_builder: PutSampleBuilder, + sample_builder: SampleBuilder, } impl TimestampBuilderTrait for ReplyBuilder<'_> { @@ -392,7 +323,7 @@ impl ValueBuilderTrait for ReplyBuilder<'_> { #[derive(Debug)] pub struct ReplyDelBuilder<'a> { query: &'a Query, - sample_builder: DeleteSampleBuilder, + sample_builder: SampleBuilder, } impl TimestampBuilderTrait for ReplyDelBuilder<'_> { diff --git a/zenoh/src/sample/builder.rs b/zenoh/src/sample/builder.rs index 920bd2b7b7..cae58514ff 100644 --- a/zenoh/src/sample/builder.rs +++ b/zenoh/src/sample/builder.rs @@ -12,6 +12,8 @@ // ZettaScale Zenoh Team, // +use std::marker::PhantomData; + use crate::sample::Attachment; use crate::sample::QoS; use crate::sample::QoSBuilder; @@ -63,290 +65,225 @@ pub trait ValueBuilderTrait { } #[derive(Debug)] -pub struct SampleBuilder(Sample); +pub struct OpPut; +#[derive(Debug)] +pub struct OpDelete; +#[derive(Debug)] +pub struct OpAny; -impl SampleBuilder { +#[derive(Debug)] +pub struct SampleBuilder { + sample: Sample, + _t: PhantomData, +} + +impl SampleBuilder { pub fn put( key_expr: IntoKeyExpr, payload: IntoPayload, - ) -> PutSampleBuilder + ) -> SampleBuilder where IntoKeyExpr: Into>, IntoPayload: Into, { - PutSampleBuilder::new(key_expr, payload) + Self { + sample: Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }, + _t: PhantomData::, + } } - pub fn delete(key_expr: IntoKeyExpr) -> DeleteSampleBuilder +} + +impl SampleBuilder { + pub fn delete(key_expr: IntoKeyExpr) -> SampleBuilder where IntoKeyExpr: Into>, { - DeleteSampleBuilder::new(key_expr) + Self { + sample: Sample { + key_expr: key_expr.into(), + payload: Payload::empty(), + kind: SampleKind::Delete, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }, + _t: PhantomData::, + } } +} + +impl SampleBuilder { /// Allows to change keyexpr of [`Sample`] pub fn keyexpr(self, key_expr: IntoKeyExpr) -> Self where IntoKeyExpr: Into>, { - Self(Sample { - key_expr: key_expr.into(), - ..self.0 - }) + Self { + sample: Sample { + key_expr: key_expr.into(), + ..self.sample + }, + _t: PhantomData::, + } } } -impl TimestampBuilderTrait for SampleBuilder { - fn timestamp>>(self, timestamp: T) -> Self { - Self(Sample { - timestamp: timestamp.into(), - ..self.0 - }) +impl TimestampBuilderTrait for SampleBuilder { + fn timestamp>>(self, timestamp: U) -> Self { + Self { + sample: Sample { + timestamp: timestamp.into(), + ..self.sample + }, + _t: PhantomData::, + } } } -impl SampleBuilderTrait for SampleBuilder { +impl SampleBuilderTrait for SampleBuilder { #[zenoh_macros::unstable] fn source_info(self, source_info: SourceInfo) -> Self { - Self(Sample { - source_info, - ..self.0 - }) + Self { + sample: Sample { + source_info, + ..self.sample + }, + _t: PhantomData::, + } } #[zenoh_macros::unstable] - fn attachment>>(self, attachment: T) -> Self { - Self(Sample { - attachment: attachment.into(), - ..self.0 - }) - } -} - -impl QoSBuilderTrait for SampleBuilder { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let qos: QoSBuilder = self.0.qos.into(); - let qos = qos.congestion_control(congestion_control).into(); - Self(Sample { qos, ..self.0 }) - } - fn priority(self, priority: Priority) -> Self { - let qos: QoSBuilder = self.0.qos.into(); - let qos = qos.priority(priority).into(); - Self(Sample { qos, ..self.0 }) - } - fn express(self, is_express: bool) -> Self { - let qos: QoSBuilder = self.0.qos.into(); - let qos = qos.express(is_express).into(); - Self(Sample { qos, ..self.0 }) - } -} - -#[derive(Debug)] -pub struct PutSampleBuilder(SampleBuilder); - -impl From for PutSampleBuilder { - fn from(sample_builder: SampleBuilder) -> Self { - Self(SampleBuilder(Sample { - kind: SampleKind::Put, - ..sample_builder.0 - })) - } -} - -impl PutSampleBuilder { - fn new(key_expr: IntoKeyExpr, payload: IntoPayload) -> Self - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - Self(SampleBuilder::from(Sample { - key_expr: key_expr.into(), - payload: payload.into(), - kind: SampleKind::Put, - encoding: Encoding::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment: None, - })) - } - /// Allows to change keyexpr of [`Sample`] - pub fn keyexpr(self, key_expr: IntoKeyExpr) -> Self - where - IntoKeyExpr: Into>, - { - Self(self.0.keyexpr(key_expr)) - } - // It's convenient to set QoS as a whole for internal usage. For user API there are `congestion_control`, `priority` and `express` methods. - pub(crate) fn qos(self, qos: QoS) -> Self { - Self(SampleBuilder(Sample { qos, ..self.0 .0 })) - } -} - -impl TimestampBuilderTrait for PutSampleBuilder { - fn timestamp>>(self, timestamp: T) -> Self { - Self(self.0.timestamp(timestamp)) + fn attachment>>(self, attachment: U) -> Self { + Self { + sample: Sample { + attachment: attachment.into(), + ..self.sample + }, + _t: PhantomData::, + } } } -impl SampleBuilderTrait for PutSampleBuilder { - #[zenoh_macros::unstable] - fn source_info(self, source_info: SourceInfo) -> Self { - Self(self.0.source_info(source_info)) - } - #[zenoh_macros::unstable] - fn attachment>>(self, attachment: T) -> Self { - Self(self.0.attachment(attachment)) +impl SampleBuilder { + pub fn qos(self, qos: QoS) -> Self { + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } } } -impl QoSBuilderTrait for PutSampleBuilder { +impl QoSBuilderTrait for SampleBuilder { fn congestion_control(self, congestion_control: CongestionControl) -> Self { - Self(self.0.congestion_control(congestion_control)) + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.congestion_control(congestion_control).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } } fn priority(self, priority: Priority) -> Self { - Self(self.0.priority(priority)) + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.priority(priority).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } } fn express(self, is_express: bool) -> Self { - Self(self.0.express(is_express)) + let qos: QoSBuilder = self.sample.qos.into(); + let qos = qos.express(is_express).into(); + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } } } -impl ValueBuilderTrait for PutSampleBuilder { +impl ValueBuilderTrait for SampleBuilder { fn encoding>(self, encoding: T) -> Self { - Self(SampleBuilder(Sample { - encoding: encoding.into(), - ..self.0 .0 - })) + Self { + sample: Sample { + encoding: encoding.into(), + ..self.sample + }, + _t: PhantomData::, + } } fn payload>(self, payload: T) -> Self { - Self(SampleBuilder(Sample { - payload: payload.into(), - ..self.0 .0 - })) + Self { + sample: Sample { + payload: payload.into(), + ..self.sample + }, + _t: PhantomData::, + } } fn value>(self, value: T) -> Self { let Value { payload, encoding } = value.into(); - Self(SampleBuilder(Sample { - payload, - encoding, - ..self.0 .0 - })) - } -} - -#[derive(Debug)] -pub struct DeleteSampleBuilder(SampleBuilder); - -impl From for DeleteSampleBuilder { - fn from(sample_builder: SampleBuilder) -> Self { - Self(SampleBuilder(Sample { - kind: SampleKind::Delete, - ..sample_builder.0 - })) - } -} - -impl DeleteSampleBuilder { - pub fn new(key_expr: IntoKeyExpr) -> Self - where - IntoKeyExpr: Into>, - { - Self(SampleBuilder::from(Sample { - key_expr: key_expr.into(), - payload: Payload::empty(), - kind: SampleKind::Delete, - encoding: Encoding::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment: None, - })) - } - /// Allows to change keyexpr of [`Sample`] - pub fn with_keyexpr(self, key_expr: IntoKeyExpr) -> Self - where - IntoKeyExpr: Into>, - { - Self(self.0.keyexpr(key_expr)) - } - // It's convenient to set QoS as a whole for internal usage. For user API there are `congestion_control`, `priority` and `express` methods. - pub(crate) fn with_qos(self, qos: QoS) -> Self { - Self(SampleBuilder(Sample { qos, ..self.0 .0 })) - } -} - -impl TimestampBuilderTrait for DeleteSampleBuilder { - fn timestamp>>(self, timestamp: T) -> Self { - Self(self.0.timestamp(timestamp)) - } -} - -impl SampleBuilderTrait for DeleteSampleBuilder { - #[zenoh_macros::unstable] - fn source_info(self, source_info: SourceInfo) -> Self { - Self(self.0.source_info(source_info)) - } - #[zenoh_macros::unstable] - fn attachment>>(self, attachment: T) -> Self { - Self(self.0.attachment(attachment)) - } -} - -impl QoSBuilderTrait for DeleteSampleBuilder { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - Self(self.0.congestion_control(congestion_control)) - } - fn priority(self, priority: Priority) -> Self { - Self(self.0.priority(priority)) - } - fn express(self, is_express: bool) -> Self { - Self(self.0.express(is_express)) + Self { + sample: Sample { + payload, + encoding, + ..self.sample + }, + _t: PhantomData::, + } } } -impl From for SampleBuilder { +impl From for SampleBuilder { fn from(sample: Sample) -> Self { - SampleBuilder(sample) + SampleBuilder { + sample, + _t: PhantomData::, + } } } -impl TryFrom for PutSampleBuilder { +impl TryFrom for SampleBuilder { type Error = zresult::Error; fn try_from(sample: Sample) -> Result { if sample.kind != SampleKind::Put { bail!("Sample is not a put sample") } - Ok(Self(SampleBuilder(sample))) + Ok(SampleBuilder { + sample, + _t: PhantomData::, + }) } } -impl TryFrom for DeleteSampleBuilder { +impl TryFrom for SampleBuilder { type Error = zresult::Error; fn try_from(sample: Sample) -> Result { if sample.kind != SampleKind::Delete { bail!("Sample is not a delete sample") } - Ok(Self(SampleBuilder(sample))) - } -} - -impl From for Sample { - fn from(sample_builder: SampleBuilder) -> Self { - sample_builder.0 - } -} - -impl From for Sample { - fn from(put_sample_builder: PutSampleBuilder) -> Self { - put_sample_builder.0 .0 + Ok(SampleBuilder { + sample, + _t: PhantomData::, + }) } } -impl From for Sample { - fn from(delete_sample_builder: DeleteSampleBuilder) -> Self { - delete_sample_builder.0 .0 +impl From> for Sample { + fn from(sample_builder: SampleBuilder) -> Self { + sample_builder.sample } } From 6c305a130043a66ee58f3985eb4f71eb708ff5dc Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 18:35:40 +0100 Subject: [PATCH 2/5] Improve Query builders with generics --- zenoh/src/queryable.rs | 178 +++++++++++++----------------------- zenoh/src/sample/builder.rs | 65 ++++++------- 2 files changed, 99 insertions(+), 144 deletions(-) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 0e977f3def..fea148e6e6 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -19,7 +19,7 @@ use crate::handlers::{locked, DefaultHandler}; use crate::net::primitives::Primitives; use crate::prelude::*; use crate::sample::builder::{ - OpDelete, OpPut, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, + op, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, }; use crate::sample::SourceInfo; @@ -132,18 +132,19 @@ impl Query { &self, key_expr: IntoKeyExpr, payload: IntoPayload, - ) -> ReplyBuilder<'_> + ) -> ReplySampleBuilder<'_, op::Put> where IntoKeyExpr: Into>, IntoPayload: Into, { let sample_builder = SampleBuilder::put(key_expr, payload).qos(response::ext::QoSType::RESPONSE.into()); - ReplyBuilder { + ReplySampleBuilder { query: self, sample_builder, } } + /// Sends a error reply to this Query. /// #[inline(always)] @@ -163,13 +164,16 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply_del(&self, key_expr: IntoKeyExpr) -> ReplyDelBuilder<'_> + pub fn reply_del( + &self, + key_expr: IntoKeyExpr, + ) -> ReplySampleBuilder<'_, op::Delete> where IntoKeyExpr: Into>, { let sample_builder = SampleBuilder::delete(key_expr).qos(response::ext::QoSType::RESPONSE.into()); - ReplyDelBuilder { + ReplySampleBuilder { query: self, sample_builder, } @@ -240,13 +244,13 @@ impl AsyncResolve for ReplySample<'_> { /// A builder returned by [`Query::reply()`](Query::reply) #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct ReplyBuilder<'a> { +pub struct ReplySampleBuilder<'a, T> { query: &'a Query, - sample_builder: SampleBuilder, + sample_builder: SampleBuilder, } -impl TimestampBuilderTrait for ReplyBuilder<'_> { - fn timestamp>>(self, timestamp: T) -> Self { +impl TimestampBuilderTrait for ReplySampleBuilder<'_, T> { + fn timestamp>>(self, timestamp: U) -> Self { Self { sample_builder: self.sample_builder.timestamp(timestamp), ..self @@ -254,7 +258,7 @@ impl TimestampBuilderTrait for ReplyBuilder<'_> { } } -impl SampleBuilderTrait for ReplyBuilder<'_> { +impl SampleBuilderTrait for ReplySampleBuilder<'_, T> { #[cfg(feature = "unstable")] fn source_info(self, source_info: SourceInfo) -> Self { Self { @@ -264,7 +268,7 @@ impl SampleBuilderTrait for ReplyBuilder<'_> { } #[cfg(feature = "unstable")] - fn attachment>>(self, attachment: T) -> Self { + fn attachment>>(self, attachment: U) -> Self { Self { sample_builder: self.sample_builder.attachment(attachment), ..self @@ -272,7 +276,7 @@ impl SampleBuilderTrait for ReplyBuilder<'_> { } } -impl QoSBuilderTrait for ReplyBuilder<'_> { +impl QoSBuilderTrait for ReplySampleBuilder<'_, T> { fn congestion_control(self, congestion_control: CongestionControl) -> Self { Self { sample_builder: self.sample_builder.congestion_control(congestion_control), @@ -295,7 +299,7 @@ impl QoSBuilderTrait for ReplyBuilder<'_> { } } -impl ValueBuilderTrait for ReplyBuilder<'_> { +impl ValueBuilderTrait for ReplySampleBuilder<'_, op::Put> { fn encoding>(self, encoding: T) -> Self { Self { sample_builder: self.sample_builder.encoding(encoding), @@ -318,101 +322,86 @@ impl ValueBuilderTrait for ReplyBuilder<'_> { } } -/// A builder returned by [`Query::reply_del()`](Query::reply) -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] -pub struct ReplyDelBuilder<'a> { - query: &'a Query, - sample_builder: SampleBuilder, +impl<'a, T> Resolvable for ReplySampleBuilder<'a, T> { + type To = ZResult<()>; } -impl TimestampBuilderTrait for ReplyDelBuilder<'_> { - fn timestamp>>(self, timestamp: T) -> Self { - Self { - sample_builder: self.sample_builder.timestamp(timestamp), - ..self - } +impl SyncResolve for ReplySampleBuilder<'_, T> { + fn res_sync(self) -> ::To { + self.query._reply_sample(self.sample_builder.into()) } } -impl SampleBuilderTrait for ReplyDelBuilder<'_> { - #[cfg(feature = "unstable")] - fn source_info(self, source_info: SourceInfo) -> Self { - Self { - sample_builder: self.sample_builder.source_info(source_info), - ..self - } - } +impl<'a, T> AsyncResolve for ReplySampleBuilder<'a, T> { + type Future = Ready; - #[cfg(feature = "unstable")] - fn attachment>>(self, attachment: T) -> Self { - Self { - sample_builder: self.sample_builder.attachment(attachment), - ..self - } + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) } } -impl QoSBuilderTrait for ReplyDelBuilder<'_> { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { +/// A builder returned by [`Query::reply_err()`](Query::reply_err). +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +#[derive(Debug)] +pub struct ReplyErrBuilder<'a> { + query: &'a Query, + value: Value, +} + +impl ValueBuilderTrait for ReplyErrBuilder<'_> { + fn encoding>(self, encoding: T) -> Self { Self { - sample_builder: self.sample_builder.congestion_control(congestion_control), + value: self.value.encoding(encoding), ..self } } - fn priority(self, priority: Priority) -> Self { + fn payload>(self, payload: T) -> Self { Self { - sample_builder: self.sample_builder.priority(priority), + value: self.value.payload(payload), ..self } } - fn express(self, is_express: bool) -> Self { + fn value>(self, value: T) -> Self { Self { - sample_builder: self.sample_builder.express(is_express), + value: value.into(), ..self } } } -/// A builder returned by [`Query::reply_err()`](Query::reply_err). -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] -pub struct ReplyErrBuilder<'a> { - query: &'a Query, - value: Value, -} - -impl<'a> Resolvable for ReplyBuilder<'a> { - type To = ZResult<()>; -} - -impl SyncResolve for ReplyBuilder<'_> { - fn res_sync(self) -> ::To { - self.query._reply_sample(self.sample_builder.into()) - } -} - -impl<'a> Resolvable for ReplyDelBuilder<'a> { +impl<'a> Resolvable for ReplyErrBuilder<'a> { type To = ZResult<()>; } -impl SyncResolve for ReplyDelBuilder<'_> { +impl SyncResolve for ReplyErrBuilder<'_> { fn res_sync(self) -> ::To { - self.query._reply_sample(self.sample_builder.into()) - } -} - -impl<'a> AsyncResolve for ReplyBuilder<'a> { - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) + self.query.inner.primitives.send_response(Response { + rid: self.query.inner.qid, + wire_expr: WireExpr { + scope: 0, + suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), + mapping: Mapping::Sender, + }, + payload: ResponseBody::Err(zenoh::Err { + encoding: self.value.encoding.into(), + ext_sinfo: None, + ext_unknown: vec![], + payload: self.value.payload.into(), + }), + ext_qos: response::ext::QoSType::RESPONSE, + ext_tstamp: None, + ext_respid: Some(response::ext::ResponderIdType { + zid: self.query.inner.zid, + eid: self.query.eid, + }), + }); + Ok(()) } } -impl<'a> AsyncResolve for ReplyDelBuilder<'a> { +impl<'a> AsyncResolve for ReplyErrBuilder<'a> { type Future = Ready; fn res_async(self) -> Self::Future { @@ -477,43 +466,6 @@ impl Query { } } -impl<'a> Resolvable for ReplyErrBuilder<'a> { - type To = ZResult<()>; -} - -impl SyncResolve for ReplyErrBuilder<'_> { - fn res_sync(self) -> ::To { - self.query.inner.primitives.send_response(Response { - rid: self.query.inner.qid, - wire_expr: WireExpr { - scope: 0, - suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()), - mapping: Mapping::Sender, - }, - payload: ResponseBody::Err(zenoh::Err { - encoding: self.value.encoding.into(), - ext_sinfo: None, - ext_unknown: vec![], - payload: self.value.payload.into(), - }), - ext_qos: response::ext::QoSType::RESPONSE, - ext_tstamp: None, - ext_respid: Some(response::ext::ResponderIdType { - zid: self.query.inner.zid, - eid: self.query.eid, - }), - }); - Ok(()) - } -} -impl<'a> AsyncResolve for ReplyErrBuilder<'a> { - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} - pub(crate) struct QueryableState { pub(crate) id: Id, pub(crate) key_expr: WireExpr<'static>, diff --git a/zenoh/src/sample/builder.rs b/zenoh/src/sample/builder.rs index cae58514ff..1ec20209aa 100644 --- a/zenoh/src/sample/builder.rs +++ b/zenoh/src/sample/builder.rs @@ -64,12 +64,16 @@ pub trait ValueBuilderTrait { fn value>(self, value: T) -> Self; } -#[derive(Debug)] -pub struct OpPut; -#[derive(Debug)] -pub struct OpDelete; -#[derive(Debug)] -pub struct OpAny; +pub mod op { + #[derive(Debug)] + pub struct Put; + #[derive(Debug)] + pub struct Delete; + #[derive(Debug)] + pub struct Error; + #[derive(Debug)] + pub struct Any; +} #[derive(Debug)] pub struct SampleBuilder { @@ -77,11 +81,11 @@ pub struct SampleBuilder { _t: PhantomData, } -impl SampleBuilder { +impl SampleBuilder { pub fn put( key_expr: IntoKeyExpr, payload: IntoPayload, - ) -> SampleBuilder + ) -> SampleBuilder where IntoKeyExpr: Into>, IntoPayload: Into, @@ -99,13 +103,13 @@ impl SampleBuilder { #[cfg(feature = "unstable")] attachment: None, }, - _t: PhantomData::, + _t: PhantomData::, } } } -impl SampleBuilder { - pub fn delete(key_expr: IntoKeyExpr) -> SampleBuilder +impl SampleBuilder { + pub fn delete(key_expr: IntoKeyExpr) -> SampleBuilder where IntoKeyExpr: Into>, { @@ -122,7 +126,7 @@ impl SampleBuilder { #[cfg(feature = "unstable")] attachment: None, }, - _t: PhantomData::, + _t: PhantomData::, } } } @@ -141,6 +145,14 @@ impl SampleBuilder { _t: PhantomData::, } } + + // Allows to change qos as a whole of [`Sample`] + pub fn qos(self, qos: QoS) -> Self { + Self { + sample: Sample { qos, ..self.sample }, + _t: PhantomData::, + } + } } impl TimestampBuilderTrait for SampleBuilder { @@ -179,15 +191,6 @@ impl SampleBuilderTrait for SampleBuilder { } } -impl SampleBuilder { - pub fn qos(self, qos: QoS) -> Self { - Self { - sample: Sample { qos, ..self.sample }, - _t: PhantomData::, - } - } -} - impl QoSBuilderTrait for SampleBuilder { fn congestion_control(self, congestion_control: CongestionControl) -> Self { let qos: QoSBuilder = self.sample.qos.into(); @@ -215,14 +218,14 @@ impl QoSBuilderTrait for SampleBuilder { } } -impl ValueBuilderTrait for SampleBuilder { +impl ValueBuilderTrait for SampleBuilder { fn encoding>(self, encoding: T) -> Self { Self { sample: Sample { encoding: encoding.into(), ..self.sample }, - _t: PhantomData::, + _t: PhantomData::, } } fn payload>(self, payload: T) -> Self { @@ -231,7 +234,7 @@ impl ValueBuilderTrait for SampleBuilder { payload: payload.into(), ..self.sample }, - _t: PhantomData::, + _t: PhantomData::, } } fn value>(self, value: T) -> Self { @@ -242,21 +245,21 @@ impl ValueBuilderTrait for SampleBuilder { encoding, ..self.sample }, - _t: PhantomData::, + _t: PhantomData::, } } } -impl From for SampleBuilder { +impl From for SampleBuilder { fn from(sample: Sample) -> Self { SampleBuilder { sample, - _t: PhantomData::, + _t: PhantomData::, } } } -impl TryFrom for SampleBuilder { +impl TryFrom for SampleBuilder { type Error = zresult::Error; fn try_from(sample: Sample) -> Result { if sample.kind != SampleKind::Put { @@ -264,12 +267,12 @@ impl TryFrom for SampleBuilder { } Ok(SampleBuilder { sample, - _t: PhantomData::, + _t: PhantomData::, }) } } -impl TryFrom for SampleBuilder { +impl TryFrom for SampleBuilder { type Error = zresult::Error; fn try_from(sample: Sample) -> Result { if sample.kind != SampleKind::Delete { @@ -277,7 +280,7 @@ impl TryFrom for SampleBuilder { } Ok(SampleBuilder { sample, - _t: PhantomData::, + _t: PhantomData::, }) } } From bca953da3de684228241cbd1c8bc8641945b2b84 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 18:36:58 +0100 Subject: [PATCH 3/5] Reorg sample files --- zenoh/src/sample.rs | 655 -------------------------------------------- 1 file changed, 655 deletions(-) delete mode 100644 zenoh/src/sample.rs diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs deleted file mode 100644 index 2b71105d5e..0000000000 --- a/zenoh/src/sample.rs +++ /dev/null @@ -1,655 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! Sample primitives -use crate::encoding::Encoding; -use crate::payload::Payload; -use crate::prelude::{KeyExpr, Value}; -use crate::sample::builder::{QoSBuilderTrait, ValueBuilderTrait}; -use crate::time::Timestamp; -use crate::Priority; -#[zenoh_macros::unstable] -use serde::Serialize; -use std::{convert::TryFrom, fmt}; -use zenoh_protocol::core::EntityGlobalId; -use zenoh_protocol::network::declare::ext::QoSType; -use zenoh_protocol::{core::CongestionControl, zenoh}; - -pub mod builder; - -pub type SourceSn = u64; - -/// The locality of samples to be received by subscribers or targeted by publishers. -#[zenoh_macros::unstable] -#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)] -pub enum Locality { - SessionLocal, - Remote, - #[default] - Any, -} -#[cfg(not(feature = "unstable"))] -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] -pub(crate) enum Locality { - SessionLocal, - Remote, - #[default] - Any, -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub(crate) struct DataInfo { - pub kind: SampleKind, - pub encoding: Option, - pub timestamp: Option, - pub source_id: Option, - pub source_sn: Option, - pub qos: QoS, -} - -pub(crate) trait DataInfoIntoSample { - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into; -} - -impl DataInfoIntoSample for DataInfo { - // This function is for internal use only. - // Technically it may create invalid sample (e.g. a delete sample with a payload and encoding) - // The test for it is intentionally not added to avoid inserting extra "if" into hot path. - // The correctness of the data should be ensured by the caller. - #[inline] - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - Sample { - key_expr: key_expr.into(), - payload: payload.into(), - kind: self.kind, - encoding: self.encoding.unwrap_or_default(), - timestamp: self.timestamp, - qos: self.qos, - #[cfg(feature = "unstable")] - source_info: SourceInfo { - source_id: self.source_id, - source_sn: self.source_sn, - }, - #[cfg(feature = "unstable")] - attachment, - } - } -} - -impl DataInfoIntoSample for Option { - #[inline] - fn into_sample( - self, - key_expr: IntoKeyExpr, - payload: IntoPayload, - #[cfg(feature = "unstable")] attachment: Option, - ) -> Sample - where - IntoKeyExpr: Into>, - IntoPayload: Into, - { - if let Some(data_info) = self { - data_info.into_sample(key_expr, payload, attachment) - } else { - Sample { - key_expr: key_expr.into(), - payload: payload.into(), - kind: SampleKind::Put, - encoding: Encoding::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment, - } - } - } -} - -/// Informations on the source of a zenoh [`Sample`]. -#[zenoh_macros::unstable] -#[derive(Debug, Clone)] -pub struct SourceInfo { - /// The [`EntityGlobalId`] of the zenoh entity that published the concerned [`Sample`]. - pub source_id: Option, - /// The sequence number of the [`Sample`] from the source. - pub source_sn: Option, -} - -#[test] -#[cfg(feature = "unstable")] -fn source_info_stack_size() { - use crate::{ - sample::{SourceInfo, SourceSn}, - ZenohId, - }; - - assert_eq!(std::mem::size_of::(), 16); - assert_eq!(std::mem::size_of::>(), 17); - assert_eq!(std::mem::size_of::>(), 16); - assert_eq!(std::mem::size_of::(), 17 + 16 + 7); -} - -#[zenoh_macros::unstable] -impl SourceInfo { - pub(crate) fn empty() -> Self { - SourceInfo { - source_id: None, - source_sn: None, - } - } - pub(crate) fn is_empty(&self) -> bool { - self.source_id.is_none() && self.source_sn.is_none() - } -} - -impl From for Option { - fn from(source_info: SourceInfo) -> Option { - if source_info.is_empty() { - None - } else { - Some(zenoh::put::ext::SourceInfoType { - id: source_info.source_id.unwrap_or_default(), - sn: source_info.source_sn.unwrap_or_default() as u32, - }) - } - } -} - -#[zenoh_macros::unstable] -impl From for SourceInfo { - fn from(data_info: DataInfo) -> Self { - SourceInfo { - source_id: data_info.source_id, - source_sn: data_info.source_sn, - } - } -} - -#[zenoh_macros::unstable] -impl From> for SourceInfo { - fn from(data_info: Option) -> Self { - match data_info { - Some(data_info) => data_info.into(), - None => SourceInfo::empty(), - } - } -} - -mod attachment { - #[zenoh_macros::unstable] - use zenoh_buffers::{ - reader::{HasReader, Reader}, - writer::HasWriter, - ZBuf, ZBufReader, ZSlice, - }; - #[zenoh_macros::unstable] - use zenoh_codec::{RCodec, WCodec, Zenoh080}; - #[zenoh_macros::unstable] - use zenoh_protocol::zenoh::ext::AttachmentType; - - /// A builder for [`Attachment`] - #[zenoh_macros::unstable] - #[derive(Debug)] - pub struct AttachmentBuilder { - pub(crate) inner: Vec, - } - #[zenoh_macros::unstable] - impl Default for AttachmentBuilder { - fn default() -> Self { - Self::new() - } - } - #[zenoh_macros::unstable] - impl AttachmentBuilder { - pub fn new() -> Self { - Self { inner: Vec::new() } - } - fn _insert(&mut self, key: &[u8], value: &[u8]) { - let codec = Zenoh080; - let mut writer = self.inner.writer(); - codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure - codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure - } - /// Inserts a key-value pair to the attachment. - /// - /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. - pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( - &mut self, - key: &Key, - value: &Value, - ) { - self._insert(key.as_ref(), value.as_ref()) - } - pub fn build(self) -> Attachment { - Attachment { - inner: self.inner.into(), - } - } - } - #[zenoh_macros::unstable] - impl From for Attachment { - fn from(value: AttachmentBuilder) -> Self { - Attachment { - inner: value.inner.into(), - } - } - } - #[zenoh_macros::unstable] - impl From for Option { - fn from(value: AttachmentBuilder) -> Self { - if value.inner.is_empty() { - None - } else { - Some(value.into()) - } - } - } - - #[zenoh_macros::unstable] - #[derive(Clone)] - pub struct Attachment { - pub(crate) inner: ZBuf, - } - #[zenoh_macros::unstable] - impl Default for Attachment { - fn default() -> Self { - Self::new() - } - } - #[zenoh_macros::unstable] - impl From for AttachmentType { - fn from(this: Attachment) -> Self { - AttachmentType { buffer: this.inner } - } - } - #[zenoh_macros::unstable] - impl From> for Attachment { - fn from(this: AttachmentType) -> Self { - Attachment { inner: this.buffer } - } - } - #[zenoh_macros::unstable] - impl Attachment { - pub fn new() -> Self { - Self { - inner: ZBuf::empty(), - } - } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn len(&self) -> usize { - self.iter().count() - } - pub fn iter(&self) -> AttachmentIterator { - self.into_iter() - } - fn _get(&self, key: &[u8]) -> Option { - self.iter() - .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) - } - pub fn get>(&self, key: &Key) -> Option { - self._get(key.as_ref()) - } - fn _insert(&mut self, key: &[u8], value: &[u8]) { - let codec = Zenoh080; - let mut writer = self.inner.writer(); - codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure - codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure - } - /// Inserts a key-value pair to the attachment. - /// - /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. - /// - /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] - pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( - &mut self, - key: &Key, - value: &Value, - ) { - self._insert(key.as_ref(), value.as_ref()) - } - fn _extend(&mut self, with: Self) -> &mut Self { - for slice in with.inner.zslices().cloned() { - self.inner.push_zslice(slice); - } - self - } - pub fn extend(&mut self, with: impl Into) -> &mut Self { - let with = with.into(); - self._extend(with) - } - } - #[zenoh_macros::unstable] - pub struct AttachmentIterator<'a> { - reader: ZBufReader<'a>, - } - #[zenoh_macros::unstable] - impl<'a> core::iter::IntoIterator for &'a Attachment { - type Item = (ZSlice, ZSlice); - type IntoIter = AttachmentIterator<'a>; - fn into_iter(self) -> Self::IntoIter { - AttachmentIterator { - reader: self.inner.reader(), - } - } - } - #[zenoh_macros::unstable] - impl core::fmt::Debug for Attachment { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{{")?; - for (key, value) in self { - let key = key.as_slice(); - let value = value.as_slice(); - match core::str::from_utf8(key) { - Ok(key) => write!(f, "\"{key}\": ")?, - Err(_) => { - write!(f, "0x")?; - for byte in key { - write!(f, "{byte:02X}")? - } - } - } - match core::str::from_utf8(value) { - Ok(value) => write!(f, "\"{value}\", ")?, - Err(_) => { - write!(f, "0x")?; - for byte in value { - write!(f, "{byte:02X}")? - } - write!(f, ", ")? - } - } - } - write!(f, "}}") - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::Iterator for AttachmentIterator<'a> { - type Item = (ZSlice, ZSlice); - fn next(&mut self) -> Option { - let key = Zenoh080.read(&mut self.reader).ok()?; - let value = Zenoh080.read(&mut self.reader).ok()?; - Some((key, value)) - } - fn size_hint(&self) -> (usize, Option) { - ( - (self.reader.remaining() != 0) as usize, - Some(self.reader.remaining() / 2), - ) - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { - fn from_iter>(iter: T) -> Self { - let codec = Zenoh080; - let mut buffer: Vec = Vec::new(); - let mut writer = buffer.writer(); - for (key, value) in iter { - codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures - codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures - } - Self { inner: buffer } - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { - fn from_iter>(iter: T) -> Self { - AttachmentBuilder::from_iter(iter).into() - } - } -} - -/// The kind of a `Sample`. -#[repr(u8)] -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -pub enum SampleKind { - /// if the `Sample` was issued by a `put` operation. - #[default] - Put = 0, - /// if the `Sample` was issued by a `delete` operation. - Delete = 1, -} - -impl fmt::Display for SampleKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SampleKind::Put => write!(f, "PUT"), - SampleKind::Delete => write!(f, "DELETE"), - } - } -} - -impl TryFrom for SampleKind { - type Error = u64; - fn try_from(kind: u64) -> Result { - match kind { - 0 => Ok(SampleKind::Put), - 1 => Ok(SampleKind::Delete), - _ => Err(kind), - } - } -} - -#[zenoh_macros::unstable] -pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; - -/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields. -pub struct SampleFields { - pub key_expr: KeyExpr<'static>, - pub payload: Payload, - pub kind: SampleKind, - pub encoding: Encoding, - pub timestamp: Option, - pub express: bool, - pub priority: Priority, - pub congestion_control: CongestionControl, - #[cfg(feature = "unstable")] - pub source_info: SourceInfo, - #[cfg(feature = "unstable")] - pub attachment: Option, -} - -impl From for SampleFields { - fn from(sample: Sample) -> Self { - SampleFields { - key_expr: sample.key_expr, - payload: sample.payload, - kind: sample.kind, - encoding: sample.encoding, - timestamp: sample.timestamp, - express: sample.qos.express(), - priority: sample.qos.priority(), - congestion_control: sample.qos.congestion_control(), - #[cfg(feature = "unstable")] - source_info: sample.source_info, - #[cfg(feature = "unstable")] - attachment: sample.attachment, - } - } -} - -/// A zenoh sample. -#[non_exhaustive] -#[derive(Clone, Debug)] -pub struct Sample { - pub(crate) key_expr: KeyExpr<'static>, - pub(crate) payload: Payload, - pub(crate) kind: SampleKind, - pub(crate) encoding: Encoding, - pub(crate) timestamp: Option, - pub(crate) qos: QoS, - - #[cfg(feature = "unstable")] - pub(crate) source_info: SourceInfo, - - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, -} - -impl Sample { - /// Gets the key expression on which this Sample was published. - #[inline] - pub fn key_expr(&self) -> &KeyExpr<'static> { - &self.key_expr - } - - /// Gets the payload of this Sample. - #[inline] - pub fn payload(&self) -> &Payload { - &self.payload - } - - /// Gets the kind of this Sample. - #[inline] - pub fn kind(&self) -> SampleKind { - self.kind - } - - /// Gets the encoding of this sample - #[inline] - pub fn encoding(&self) -> &Encoding { - &self.encoding - } - - /// Gets the timestamp of this Sample. - #[inline] - pub fn timestamp(&self) -> Option<&Timestamp> { - self.timestamp.as_ref() - } - - /// Gets the quality of service settings this Sample was sent with. - #[inline] - pub fn qos(&self) -> &QoS { - &self.qos - } - - /// Gets infos on the source of this Sample. - #[zenoh_macros::unstable] - #[inline] - pub fn source_info(&self) -> &SourceInfo { - &self.source_info - } - - /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. - #[zenoh_macros::unstable] - #[inline] - pub fn attachment(&self) -> Option<&Attachment> { - self.attachment.as_ref() - } -} - -impl From for Value { - fn from(sample: Sample) -> Self { - Value::new(sample.payload).encoding(sample.encoding) - } -} - -/// Structure containing quality of service data -#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] -pub struct QoS { - inner: QoSType, -} - -#[derive(Debug)] -pub struct QoSBuilder(QoS); - -impl From for QoSBuilder { - fn from(qos: QoS) -> Self { - QoSBuilder(qos) - } -} - -impl From for QoS { - fn from(builder: QoSBuilder) -> Self { - builder.0 - } -} - -impl QoSBuilderTrait for QoSBuilder { - fn congestion_control(self, congestion_control: CongestionControl) -> Self { - let mut inner = self.0.inner; - inner.set_congestion_control(congestion_control); - Self(QoS { inner }) - } - - fn priority(self, priority: Priority) -> Self { - let mut inner = self.0.inner; - inner.set_priority(priority.into()); - Self(QoS { inner }) - } - - fn express(self, is_express: bool) -> Self { - let mut inner = self.0.inner; - inner.set_is_express(is_express); - Self(QoS { inner }) - } -} - -impl QoS { - /// Gets priority of the message. - pub fn priority(&self) -> Priority { - match Priority::try_from(self.inner.get_priority()) { - Ok(p) => p, - Err(e) => { - log::trace!( - "Failed to convert priority: {}; replacing with default value", - e.to_string() - ); - Priority::default() - } - } - } - - /// Gets congestion control of the message. - pub fn congestion_control(&self) -> CongestionControl { - self.inner.get_congestion_control() - } - - /// Gets express flag value. If `true`, the message is not batched during transmission, in order to reduce latency. - pub fn express(&self) -> bool { - self.inner.is_express() - } -} - -impl From for QoS { - fn from(qos: QoSType) -> Self { - QoS { inner: qos } - } -} - -impl From for QoSType { - fn from(qos: QoS) -> Self { - qos.inner - } -} From 9d1a5409541831926e70420fdf89006a67b1020c Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 18:37:23 +0100 Subject: [PATCH 4/5] Remove error op struct in SampleBuilder --- zenoh/src/sample/builder.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/zenoh/src/sample/builder.rs b/zenoh/src/sample/builder.rs index 1ec20209aa..7f438d1381 100644 --- a/zenoh/src/sample/builder.rs +++ b/zenoh/src/sample/builder.rs @@ -70,8 +70,6 @@ pub mod op { #[derive(Debug)] pub struct Delete; #[derive(Debug)] - pub struct Error; - #[derive(Debug)] pub struct Any; } From 7904d099ba3d069ecc51b76241ef136678a5e005 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 29 Mar 2024 20:29:43 +0100 Subject: [PATCH 5/5] Add forgotten file --- zenoh/src/sample/mod.rs | 655 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 zenoh/src/sample/mod.rs diff --git a/zenoh/src/sample/mod.rs b/zenoh/src/sample/mod.rs new file mode 100644 index 0000000000..2b71105d5e --- /dev/null +++ b/zenoh/src/sample/mod.rs @@ -0,0 +1,655 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Sample primitives +use crate::encoding::Encoding; +use crate::payload::Payload; +use crate::prelude::{KeyExpr, Value}; +use crate::sample::builder::{QoSBuilderTrait, ValueBuilderTrait}; +use crate::time::Timestamp; +use crate::Priority; +#[zenoh_macros::unstable] +use serde::Serialize; +use std::{convert::TryFrom, fmt}; +use zenoh_protocol::core::EntityGlobalId; +use zenoh_protocol::network::declare::ext::QoSType; +use zenoh_protocol::{core::CongestionControl, zenoh}; + +pub mod builder; + +pub type SourceSn = u64; + +/// The locality of samples to be received by subscribers or targeted by publishers. +#[zenoh_macros::unstable] +#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)] +pub enum Locality { + SessionLocal, + Remote, + #[default] + Any, +} +#[cfg(not(feature = "unstable"))] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub(crate) enum Locality { + SessionLocal, + Remote, + #[default] + Any, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub(crate) struct DataInfo { + pub kind: SampleKind, + pub encoding: Option, + pub timestamp: Option, + pub source_id: Option, + pub source_sn: Option, + pub qos: QoS, +} + +pub(crate) trait DataInfoIntoSample { + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into; +} + +impl DataInfoIntoSample for DataInfo { + // This function is for internal use only. + // Technically it may create invalid sample (e.g. a delete sample with a payload and encoding) + // The test for it is intentionally not added to avoid inserting extra "if" into hot path. + // The correctness of the data should be ensured by the caller. + #[inline] + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into, + { + Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: self.kind, + encoding: self.encoding.unwrap_or_default(), + timestamp: self.timestamp, + qos: self.qos, + #[cfg(feature = "unstable")] + source_info: SourceInfo { + source_id: self.source_id, + source_sn: self.source_sn, + }, + #[cfg(feature = "unstable")] + attachment, + } + } +} + +impl DataInfoIntoSample for Option { + #[inline] + fn into_sample( + self, + key_expr: IntoKeyExpr, + payload: IntoPayload, + #[cfg(feature = "unstable")] attachment: Option, + ) -> Sample + where + IntoKeyExpr: Into>, + IntoPayload: Into, + { + if let Some(data_info) = self { + data_info.into_sample(key_expr, payload, attachment) + } else { + Sample { + key_expr: key_expr.into(), + payload: payload.into(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment, + } + } + } +} + +/// Informations on the source of a zenoh [`Sample`]. +#[zenoh_macros::unstable] +#[derive(Debug, Clone)] +pub struct SourceInfo { + /// The [`EntityGlobalId`] of the zenoh entity that published the concerned [`Sample`]. + pub source_id: Option, + /// The sequence number of the [`Sample`] from the source. + pub source_sn: Option, +} + +#[test] +#[cfg(feature = "unstable")] +fn source_info_stack_size() { + use crate::{ + sample::{SourceInfo, SourceSn}, + ZenohId, + }; + + assert_eq!(std::mem::size_of::(), 16); + assert_eq!(std::mem::size_of::>(), 17); + assert_eq!(std::mem::size_of::>(), 16); + assert_eq!(std::mem::size_of::(), 17 + 16 + 7); +} + +#[zenoh_macros::unstable] +impl SourceInfo { + pub(crate) fn empty() -> Self { + SourceInfo { + source_id: None, + source_sn: None, + } + } + pub(crate) fn is_empty(&self) -> bool { + self.source_id.is_none() && self.source_sn.is_none() + } +} + +impl From for Option { + fn from(source_info: SourceInfo) -> Option { + if source_info.is_empty() { + None + } else { + Some(zenoh::put::ext::SourceInfoType { + id: source_info.source_id.unwrap_or_default(), + sn: source_info.source_sn.unwrap_or_default() as u32, + }) + } + } +} + +#[zenoh_macros::unstable] +impl From for SourceInfo { + fn from(data_info: DataInfo) -> Self { + SourceInfo { + source_id: data_info.source_id, + source_sn: data_info.source_sn, + } + } +} + +#[zenoh_macros::unstable] +impl From> for SourceInfo { + fn from(data_info: Option) -> Self { + match data_info { + Some(data_info) => data_info.into(), + None => SourceInfo::empty(), + } + } +} + +mod attachment { + #[zenoh_macros::unstable] + use zenoh_buffers::{ + reader::{HasReader, Reader}, + writer::HasWriter, + ZBuf, ZBufReader, ZSlice, + }; + #[zenoh_macros::unstable] + use zenoh_codec::{RCodec, WCodec, Zenoh080}; + #[zenoh_macros::unstable] + use zenoh_protocol::zenoh::ext::AttachmentType; + + /// A builder for [`Attachment`] + #[zenoh_macros::unstable] + #[derive(Debug)] + pub struct AttachmentBuilder { + pub(crate) inner: Vec, + } + #[zenoh_macros::unstable] + impl Default for AttachmentBuilder { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl AttachmentBuilder { + pub fn new() -> Self { + Self { inner: Vec::new() } + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + pub fn build(self) -> Attachment { + Attachment { + inner: self.inner.into(), + } + } + } + #[zenoh_macros::unstable] + impl From for Attachment { + fn from(value: AttachmentBuilder) -> Self { + Attachment { + inner: value.inner.into(), + } + } + } + #[zenoh_macros::unstable] + impl From for Option { + fn from(value: AttachmentBuilder) -> Self { + if value.inner.is_empty() { + None + } else { + Some(value.into()) + } + } + } + + #[zenoh_macros::unstable] + #[derive(Clone)] + pub struct Attachment { + pub(crate) inner: ZBuf, + } + #[zenoh_macros::unstable] + impl Default for Attachment { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl From for AttachmentType { + fn from(this: Attachment) -> Self { + AttachmentType { buffer: this.inner } + } + } + #[zenoh_macros::unstable] + impl From> for Attachment { + fn from(this: AttachmentType) -> Self { + Attachment { inner: this.buffer } + } + } + #[zenoh_macros::unstable] + impl Attachment { + pub fn new() -> Self { + Self { + inner: ZBuf::empty(), + } + } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn len(&self) -> usize { + self.iter().count() + } + pub fn iter(&self) -> AttachmentIterator { + self.into_iter() + } + fn _get(&self, key: &[u8]) -> Option { + self.iter() + .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) + } + pub fn get>(&self, key: &Key) -> Option { + self._get(key.as_ref()) + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + /// + /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + fn _extend(&mut self, with: Self) -> &mut Self { + for slice in with.inner.zslices().cloned() { + self.inner.push_zslice(slice); + } + self + } + pub fn extend(&mut self, with: impl Into) -> &mut Self { + let with = with.into(); + self._extend(with) + } + } + #[zenoh_macros::unstable] + pub struct AttachmentIterator<'a> { + reader: ZBufReader<'a>, + } + #[zenoh_macros::unstable] + impl<'a> core::iter::IntoIterator for &'a Attachment { + type Item = (ZSlice, ZSlice); + type IntoIter = AttachmentIterator<'a>; + fn into_iter(self) -> Self::IntoIter { + AttachmentIterator { + reader: self.inner.reader(), + } + } + } + #[zenoh_macros::unstable] + impl core::fmt::Debug for Attachment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + for (key, value) in self { + let key = key.as_slice(); + let value = value.as_slice(); + match core::str::from_utf8(key) { + Ok(key) => write!(f, "\"{key}\": ")?, + Err(_) => { + write!(f, "0x")?; + for byte in key { + write!(f, "{byte:02X}")? + } + } + } + match core::str::from_utf8(value) { + Ok(value) => write!(f, "\"{value}\", ")?, + Err(_) => { + write!(f, "0x")?; + for byte in value { + write!(f, "{byte:02X}")? + } + write!(f, ", ")? + } + } + } + write!(f, "}}") + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::Iterator for AttachmentIterator<'a> { + type Item = (ZSlice, ZSlice); + fn next(&mut self) -> Option { + let key = Zenoh080.read(&mut self.reader).ok()?; + let value = Zenoh080.read(&mut self.reader).ok()?; + Some((key, value)) + } + fn size_hint(&self) -> (usize, Option) { + ( + (self.reader.remaining() != 0) as usize, + Some(self.reader.remaining() / 2), + ) + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { + fn from_iter>(iter: T) -> Self { + let codec = Zenoh080; + let mut buffer: Vec = Vec::new(); + let mut writer = buffer.writer(); + for (key, value) in iter { + codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures + codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures + } + Self { inner: buffer } + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { + fn from_iter>(iter: T) -> Self { + AttachmentBuilder::from_iter(iter).into() + } + } +} + +/// The kind of a `Sample`. +#[repr(u8)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +pub enum SampleKind { + /// if the `Sample` was issued by a `put` operation. + #[default] + Put = 0, + /// if the `Sample` was issued by a `delete` operation. + Delete = 1, +} + +impl fmt::Display for SampleKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SampleKind::Put => write!(f, "PUT"), + SampleKind::Delete => write!(f, "DELETE"), + } + } +} + +impl TryFrom for SampleKind { + type Error = u64; + fn try_from(kind: u64) -> Result { + match kind { + 0 => Ok(SampleKind::Put), + 1 => Ok(SampleKind::Delete), + _ => Err(kind), + } + } +} + +#[zenoh_macros::unstable] +pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; + +/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields. +pub struct SampleFields { + pub key_expr: KeyExpr<'static>, + pub payload: Payload, + pub kind: SampleKind, + pub encoding: Encoding, + pub timestamp: Option, + pub express: bool, + pub priority: Priority, + pub congestion_control: CongestionControl, + #[cfg(feature = "unstable")] + pub source_info: SourceInfo, + #[cfg(feature = "unstable")] + pub attachment: Option, +} + +impl From for SampleFields { + fn from(sample: Sample) -> Self { + SampleFields { + key_expr: sample.key_expr, + payload: sample.payload, + kind: sample.kind, + encoding: sample.encoding, + timestamp: sample.timestamp, + express: sample.qos.express(), + priority: sample.qos.priority(), + congestion_control: sample.qos.congestion_control(), + #[cfg(feature = "unstable")] + source_info: sample.source_info, + #[cfg(feature = "unstable")] + attachment: sample.attachment, + } + } +} + +/// A zenoh sample. +#[non_exhaustive] +#[derive(Clone, Debug)] +pub struct Sample { + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) payload: Payload, + pub(crate) kind: SampleKind, + pub(crate) encoding: Encoding, + pub(crate) timestamp: Option, + pub(crate) qos: QoS, + + #[cfg(feature = "unstable")] + pub(crate) source_info: SourceInfo, + + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, +} + +impl Sample { + /// Gets the key expression on which this Sample was published. + #[inline] + pub fn key_expr(&self) -> &KeyExpr<'static> { + &self.key_expr + } + + /// Gets the payload of this Sample. + #[inline] + pub fn payload(&self) -> &Payload { + &self.payload + } + + /// Gets the kind of this Sample. + #[inline] + pub fn kind(&self) -> SampleKind { + self.kind + } + + /// Gets the encoding of this sample + #[inline] + pub fn encoding(&self) -> &Encoding { + &self.encoding + } + + /// Gets the timestamp of this Sample. + #[inline] + pub fn timestamp(&self) -> Option<&Timestamp> { + self.timestamp.as_ref() + } + + /// Gets the quality of service settings this Sample was sent with. + #[inline] + pub fn qos(&self) -> &QoS { + &self.qos + } + + /// Gets infos on the source of this Sample. + #[zenoh_macros::unstable] + #[inline] + pub fn source_info(&self) -> &SourceInfo { + &self.source_info + } + + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. + #[zenoh_macros::unstable] + #[inline] + pub fn attachment(&self) -> Option<&Attachment> { + self.attachment.as_ref() + } +} + +impl From for Value { + fn from(sample: Sample) -> Self { + Value::new(sample.payload).encoding(sample.encoding) + } +} + +/// Structure containing quality of service data +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] +pub struct QoS { + inner: QoSType, +} + +#[derive(Debug)] +pub struct QoSBuilder(QoS); + +impl From for QoSBuilder { + fn from(qos: QoS) -> Self { + QoSBuilder(qos) + } +} + +impl From for QoS { + fn from(builder: QoSBuilder) -> Self { + builder.0 + } +} + +impl QoSBuilderTrait for QoSBuilder { + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + let mut inner = self.0.inner; + inner.set_congestion_control(congestion_control); + Self(QoS { inner }) + } + + fn priority(self, priority: Priority) -> Self { + let mut inner = self.0.inner; + inner.set_priority(priority.into()); + Self(QoS { inner }) + } + + fn express(self, is_express: bool) -> Self { + let mut inner = self.0.inner; + inner.set_is_express(is_express); + Self(QoS { inner }) + } +} + +impl QoS { + /// Gets priority of the message. + pub fn priority(&self) -> Priority { + match Priority::try_from(self.inner.get_priority()) { + Ok(p) => p, + Err(e) => { + log::trace!( + "Failed to convert priority: {}; replacing with default value", + e.to_string() + ); + Priority::default() + } + } + } + + /// Gets congestion control of the message. + pub fn congestion_control(&self) -> CongestionControl { + self.inner.get_congestion_control() + } + + /// Gets express flag value. If `true`, the message is not batched during transmission, in order to reduce latency. + pub fn express(&self) -> bool { + self.inner.is_express() + } +} + +impl From for QoS { + fn from(qos: QoSType) -> Self { + QoS { inner: qos } + } +} + +impl From for QoSType { + fn from(qos: QoS) -> Self { + qos.inner + } +}