From ab349b2e91ee2fce1b0776526f6bb26af26a3b76 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Mon, 1 Apr 2024 11:46:29 +0200 Subject: [PATCH] support of TryIntoKeyexpr --- zenoh/src/key_expr.rs | 2 +- zenoh/src/queryable.rs | 144 ++++++++++++++++++++++++++++------------ zenoh/src/sample/mod.rs | 6 ++ 3 files changed, 107 insertions(+), 45 deletions(-) diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index aaa1d13724..d2bfb5bcfe 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -185,7 +185,7 @@ impl<'a> KeyExpr<'a> { /// # Safety /// Key Expressions must follow some rules to be accepted by a Zenoh network. /// Messages addressed with invalid key expressions will be dropped. - pub unsafe fn from_str_uncheckend(s: &'a str) -> Self { + pub unsafe fn from_str_unchecked(s: &'a str) -> Self { keyexpr::from_str_unchecked(s).into() } diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index c2a5557440..37c3a2303a 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -19,10 +19,9 @@ use crate::handlers::{locked, DefaultHandler}; use crate::net::primitives::Primitives; use crate::prelude::*; use crate::sample::builder::{ - op, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, - ValueBuilderTrait, + QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, }; -use crate::sample::SourceInfo; +use crate::sample::{QoSBuilder, SourceInfo}; use crate::Id; use crate::SessionRef; use crate::Undeclarable; @@ -132,17 +131,23 @@ impl Query { &self, key_expr: TryIntoKeyExpr, payload: IntoPayload, - ) -> ReplySampleBuilder<'_, op::Put> + ) -> ReplyBuilder<'_, 'b, ReplyBuilderPut> where TryIntoKeyExpr: TryInto>, >>::Error: Into, IntoPayload: Into, { - let sample_builder = - SampleBuilder::put(key_expr, payload).qos(response::ext::QoSType::RESPONSE.into()); - ReplySampleBuilder { + ReplyBuilder { query: self, - sample_builder, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderPut { + payload: payload.into(), + encoding: Encoding::default(), + }, + timestamp: None, + source_info: SourceInfo::empty(), + attachment: None, } } @@ -165,19 +170,22 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply_del( + pub fn reply_del<'b, TryIntoKeyExpr>( &self, - key_expr: IntoKeyExpr, - ) -> ReplySampleBuilder<'_, op::Delete> + key_expr: TryIntoKeyExpr, + ) -> ReplyBuilder<'_, 'b, ReplyBuilderDelete> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - let sample_builder = - SampleBuilder::delete(key_expr).qos(response::ext::QoSType::RESPONSE.into()); - ReplySampleBuilder { + ReplyBuilder { query: self, - sample_builder, + key_expr: key_expr.try_into().map_err(Into::into), + qos: response::ext::QoSType::RESPONSE.into(), + kind: ReplyBuilderDelete, + timestamp: None, + source_info: SourceInfo::empty(), + attachment: None, } } @@ -243,28 +251,45 @@ impl AsyncResolve for ReplySample<'_> { } } -/// A builder returned by [`Query::reply()`](Query::reply) +#[derive(Debug)] +pub struct ReplyBuilderPut { + payload: super::Payload, + encoding: super::Encoding, +} +#[derive(Debug)] +pub struct ReplyBuilderDelete; + +/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del) #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct ReplySampleBuilder<'a, T> { +pub struct ReplyBuilder<'a, 'b, T> { query: &'a Query, - sample_builder: SampleBuilder, + key_expr: ZResult>, + kind: T, + timestamp: Option, + qos: QoSBuilder, + + #[cfg(feature = "unstable")] + source_info: SourceInfo, + + #[cfg(feature = "unstable")] + attachment: Option, } -impl TimestampBuilderTrait for ReplySampleBuilder<'_, T> { +impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { fn timestamp>>(self, timestamp: U) -> Self { Self { - sample_builder: self.sample_builder.timestamp(timestamp), + timestamp: timestamp.into(), ..self } } } -impl SampleBuilderTrait for ReplySampleBuilder<'_, T> { +impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { #[cfg(feature = "unstable")] fn source_info(self, source_info: SourceInfo) -> Self { Self { - sample_builder: self.sample_builder.source_info(source_info), + source_info, ..self } } @@ -272,69 +297,100 @@ impl SampleBuilderTrait for ReplySampleBuilder<'_, T> { #[cfg(feature = "unstable")] fn attachment>>(self, attachment: U) -> Self { Self { - sample_builder: self.sample_builder.attachment(attachment), + attachment: attachment.into(), ..self } } } -impl QoSBuilderTrait for ReplySampleBuilder<'_, T> { +impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { fn congestion_control(self, congestion_control: CongestionControl) -> Self { - Self { - sample_builder: self.sample_builder.congestion_control(congestion_control), - ..self - } + let qos = self.qos.congestion_control(congestion_control); + Self { qos, ..self } } fn priority(self, priority: Priority) -> Self { - Self { - sample_builder: self.sample_builder.priority(priority), - ..self - } + let qos = self.qos.priority(priority); + Self { qos, ..self } } fn express(self, is_express: bool) -> Self { - Self { - sample_builder: self.sample_builder.express(is_express), - ..self - } + let qos = self.qos.express(is_express); + Self { qos, ..self } } } -impl ValueBuilderTrait for ReplySampleBuilder<'_, op::Put> { +impl ValueBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { fn encoding>(self, encoding: T) -> Self { Self { - sample_builder: self.sample_builder.encoding(encoding), + kind: ReplyBuilderPut { + encoding: encoding.into(), + ..self.kind + }, ..self } } fn payload>(self, payload: T) -> Self { Self { - sample_builder: self.sample_builder.payload(payload), + kind: ReplyBuilderPut { + payload: payload.into(), + ..self.kind + }, ..self } } fn value>(self, value: T) -> Self { let Value { payload, encoding } = value.into(); Self { - sample_builder: self.sample_builder.payload(payload).encoding(encoding), + kind: ReplyBuilderPut { payload, encoding }, ..self } } } -impl<'a, T> Resolvable for ReplySampleBuilder<'a, T> { +impl Resolvable for ReplyBuilder<'_, '_, T> { type To = ZResult<()>; } -impl SyncResolve for ReplySampleBuilder<'_, T> { +impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> { + fn res_sync(self) -> ::To { + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::put(key_expr, self.kind.payload) + .encoding(self.kind.encoding) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + #[cfg(feature = "unstable")] + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> { fn res_sync(self) -> ::To { - self.query._reply_sample(self.sample_builder.into()) + let key_expr = self.key_expr?.into_owned(); + let sample = SampleBuilder::delete(key_expr) + .timestamp(self.timestamp) + .qos(self.qos.into()); + #[cfg(feature = "unstable")] + let sample = sample.source_info(self.source_info); + #[cfg(feature = "unstable")] + let sample = sample.attachment(self.attachment); + self.query._reply_sample(sample.into()) + } +} + +impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) } } -impl<'a, T> AsyncResolve for ReplySampleBuilder<'a, T> { +impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> { type Future = Ready; fn res_async(self) -> Self::Future { diff --git a/zenoh/src/sample/mod.rs b/zenoh/src/sample/mod.rs index 2b71105d5e..be80f8277e 100644 --- a/zenoh/src/sample/mod.rs +++ b/zenoh/src/sample/mod.rs @@ -590,6 +590,12 @@ impl From for QoSBuilder { } } +impl From for QoSBuilder { + fn from(qos: QoSType) -> Self { + QoSBuilder(QoS { inner: qos }) + } +} + impl From for QoS { fn from(builder: QoSBuilder) -> Self { builder.0