Skip to content

Commit

Permalink
support of TryIntoKeyexpr
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Apr 1, 2024
1 parent 2992d45 commit ab349b2
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 45 deletions.
2 changes: 1 addition & 1 deletion zenoh/src/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'a> KeyExpr<'a> {
/// # Safety
/// Key Expressions must follow some rules to be accepted by a Zenoh network.
/// Messages addressed with invalid key expressions will be dropped.
pub unsafe fn from_str_uncheckend(s: &'a str) -> Self {
pub unsafe fn from_str_unchecked(s: &'a str) -> Self {
keyexpr::from_str_unchecked(s).into()
}

Expand Down
144 changes: 100 additions & 44 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use crate::handlers::{locked, DefaultHandler};
use crate::net::primitives::Primitives;
use crate::prelude::*;
use crate::sample::builder::{
op, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait,
ValueBuilderTrait,
QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
};
use crate::sample::SourceInfo;
use crate::sample::{QoSBuilder, SourceInfo};
use crate::Id;
use crate::SessionRef;
use crate::Undeclarable;
Expand Down Expand Up @@ -132,17 +131,23 @@ impl Query {
&self,
key_expr: TryIntoKeyExpr,
payload: IntoPayload,
) -> ReplySampleBuilder<'_, op::Put>
) -> ReplyBuilder<'_, 'b, ReplyBuilderPut>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
IntoPayload: Into<Payload>,
{
let sample_builder =
SampleBuilder::put(key_expr, payload).qos(response::ext::QoSType::RESPONSE.into());
ReplySampleBuilder {
ReplyBuilder {
query: self,
sample_builder,
key_expr: key_expr.try_into().map_err(Into::into),
qos: response::ext::QoSType::RESPONSE.into(),
kind: ReplyBuilderPut {
payload: payload.into(),
encoding: Encoding::default(),
},
timestamp: None,
source_info: SourceInfo::empty(),
attachment: None,
}
}

Expand All @@ -165,19 +170,22 @@ impl Query {
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply_del<IntoKeyExpr>(
pub fn reply_del<'b, TryIntoKeyExpr>(
&self,
key_expr: IntoKeyExpr,
) -> ReplySampleBuilder<'_, op::Delete>
key_expr: TryIntoKeyExpr,
) -> ReplyBuilder<'_, 'b, ReplyBuilderDelete>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
let sample_builder =
SampleBuilder::delete(key_expr).qos(response::ext::QoSType::RESPONSE.into());
ReplySampleBuilder {
ReplyBuilder {
query: self,
sample_builder,
key_expr: key_expr.try_into().map_err(Into::into),
qos: response::ext::QoSType::RESPONSE.into(),
kind: ReplyBuilderDelete,
timestamp: None,
source_info: SourceInfo::empty(),
attachment: None,
}
}

Expand Down Expand Up @@ -243,98 +251,146 @@ impl AsyncResolve for ReplySample<'_> {
}
}

/// A builder returned by [`Query::reply()`](Query::reply)
#[derive(Debug)]
pub struct ReplyBuilderPut {
payload: super::Payload,
encoding: super::Encoding,
}
#[derive(Debug)]
pub struct ReplyBuilderDelete;

/// A builder returned by [`Query::reply()`](Query::reply) and [`Query::reply_del()`](Query::reply_del)
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct ReplySampleBuilder<'a, T> {
pub struct ReplyBuilder<'a, 'b, T> {
query: &'a Query,
sample_builder: SampleBuilder<T>,
key_expr: ZResult<KeyExpr<'b>>,
kind: T,
timestamp: Option<Timestamp>,
qos: QoSBuilder,

#[cfg(feature = "unstable")]
source_info: SourceInfo,

#[cfg(feature = "unstable")]
attachment: Option<Attachment>,
}

impl<T> TimestampBuilderTrait for ReplySampleBuilder<'_, T> {
impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
fn timestamp<U: Into<Option<Timestamp>>>(self, timestamp: U) -> Self {
Self {
sample_builder: self.sample_builder.timestamp(timestamp),
timestamp: timestamp.into(),
..self
}
}
}

impl<T> SampleBuilderTrait for ReplySampleBuilder<'_, T> {
impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
fn source_info(self, source_info: SourceInfo) -> Self {
Self {
sample_builder: self.sample_builder.source_info(source_info),
source_info,
..self
}
}

#[cfg(feature = "unstable")]
fn attachment<U: Into<Option<Attachment>>>(self, attachment: U) -> Self {
Self {
sample_builder: self.sample_builder.attachment(attachment),
attachment: attachment.into(),
..self
}
}
}

impl<T> QoSBuilderTrait for ReplySampleBuilder<'_, T> {
impl<T> QoSBuilderTrait for ReplyBuilder<'_, '_, T> {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
sample_builder: self.sample_builder.congestion_control(congestion_control),
..self
}
let qos = self.qos.congestion_control(congestion_control);
Self { qos, ..self }
}

fn priority(self, priority: Priority) -> Self {
Self {
sample_builder: self.sample_builder.priority(priority),
..self
}
let qos = self.qos.priority(priority);
Self { qos, ..self }
}

fn express(self, is_express: bool) -> Self {
Self {
sample_builder: self.sample_builder.express(is_express),
..self
}
let qos = self.qos.express(is_express);
Self { qos, ..self }
}
}

impl ValueBuilderTrait for ReplySampleBuilder<'_, op::Put> {
impl ValueBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
sample_builder: self.sample_builder.encoding(encoding),
kind: ReplyBuilderPut {
encoding: encoding.into(),
..self.kind
},
..self
}
}

fn payload<T: Into<Payload>>(self, payload: T) -> Self {
Self {
sample_builder: self.sample_builder.payload(payload),
kind: ReplyBuilderPut {
payload: payload.into(),
..self.kind
},
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self {
sample_builder: self.sample_builder.payload(payload).encoding(encoding),
kind: ReplyBuilderPut { payload, encoding },
..self
}
}
}

impl<'a, T> Resolvable for ReplySampleBuilder<'a, T> {
impl<T> Resolvable for ReplyBuilder<'_, '_, T> {
type To = ZResult<()>;
}

impl<T> SyncResolve for ReplySampleBuilder<'_, T> {
impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> {
fn res_sync(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?.into_owned();
let sample = SampleBuilder::put(key_expr, self.kind.payload)
.encoding(self.kind.encoding)
.timestamp(self.timestamp)
.qos(self.qos.into());
#[cfg(feature = "unstable")]
let sample = sample.source_info(self.source_info);
#[cfg(feature = "unstable")]
let sample = sample.attachment(self.attachment);
self.query._reply_sample(sample.into())
}
}

impl SyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
fn res_sync(self) -> <Self as Resolvable>::To {
self.query._reply_sample(self.sample_builder.into())
let key_expr = self.key_expr?.into_owned();
let sample = SampleBuilder::delete(key_expr)
.timestamp(self.timestamp)
.qos(self.qos.into());
#[cfg(feature = "unstable")]
let sample = sample.source_info(self.source_info);
#[cfg(feature = "unstable")]
let sample = sample.attachment(self.attachment);
self.query._reply_sample(sample.into())
}
}

impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderPut> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

impl<'a, T> AsyncResolve for ReplySampleBuilder<'a, T> {
impl AsyncResolve for ReplyBuilder<'_, '_, ReplyBuilderDelete> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/sample/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,12 @@ impl From<QoS> for QoSBuilder {
}
}

impl From<QoSType> for QoSBuilder {
fn from(qos: QoSType) -> Self {
QoSBuilder(QoS { inner: qos })
}
}

impl From<QoSBuilder> for QoS {
fn from(builder: QoSBuilder) -> Self {
builder.0
Expand Down

0 comments on commit ab349b2

Please sign in to comment.