Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query.reply and reply_del, now accept TryIntoKeyExpr instead of IntoKeyExpr #878

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Query {
#[inline(always)]
#[cfg(feature = "unstable")]
#[doc(hidden)]
pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_> {
pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_, 'static> {
let Sample {
key_expr,
payload,
Expand All @@ -126,7 +126,7 @@ impl Query {
} = sample;
ReplyBuilder {
query: self,
key_expr,
key_expr: Ok(key_expr),
payload,
kind,
encoding,
Expand All @@ -145,18 +145,19 @@ impl Query {
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply<IntoKeyExpr, IntoPayload>(
pub fn reply<'b, TryIntoKeyExpr, IntoPayload>(
&self,
key_expr: IntoKeyExpr,
key_expr: TryIntoKeyExpr,
payload: IntoPayload,
) -> ReplyBuilder<'_>
) -> ReplyBuilder<'_, 'b>
where
IntoKeyExpr: Into<KeyExpr<'static>>,
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
IntoPayload: Into<Payload>,
{
ReplyBuilder {
query: self,
key_expr: key_expr.into(),
key_expr: key_expr.try_into().map_err(Into::into),
payload: payload.into(),
kind: SampleKind::Put,
timestamp: None,
Expand Down Expand Up @@ -187,13 +188,14 @@ impl Query {
/// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]),
/// replying on a disjoint key expression will result in an error when resolving the reply.
#[inline(always)]
pub fn reply_del<IntoKeyExpr>(&self, key_expr: IntoKeyExpr) -> ReplyBuilder<'_>
pub fn reply_del<'b, TryIntoKeyExpr>(&self, key_expr: TryIntoKeyExpr) -> ReplyBuilder<'_, 'b>
where
IntoKeyExpr: Into<KeyExpr<'static>>,
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ReplyBuilder {
query: self,
key_expr: key_expr.into(),
key_expr: key_expr.try_into().map_err(Into::into),
payload: Payload::empty(),
kind: SampleKind::Delete,
timestamp: None,
Expand Down Expand Up @@ -248,9 +250,9 @@ impl fmt::Display for Query {
/// A builder returned by [`Query::reply()`](Query::reply) or [`Query::reply()`](Query::reply).
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct ReplyBuilder<'a> {
pub struct ReplyBuilder<'a, 'b> {
query: &'a Query,
key_expr: KeyExpr<'static>,
key_expr: ZResult<KeyExpr<'b>>,
payload: Payload,
kind: SampleKind,
encoding: Encoding,
Expand All @@ -270,7 +272,7 @@ pub struct ReplyErrBuilder<'a> {
value: Value,
}

impl<'a> ReplyBuilder<'a> {
impl<'a, 'b> ReplyBuilder<'a, 'b> {
#[zenoh_macros::unstable]
pub fn with_attachment(mut self, attachment: Attachment) -> Self {
self.attachment = Some(attachment);
Expand All @@ -292,16 +294,17 @@ impl<'a> ReplyBuilder<'a> {
}
}

impl<'a> Resolvable for ReplyBuilder<'a> {
impl<'a, 'b> Resolvable for ReplyBuilder<'a, 'b> {
type To = ZResult<()>;
}

impl SyncResolve for ReplyBuilder<'_> {
impl<'a, 'b> SyncResolve for ReplyBuilder<'a, 'b> {
fn res_sync(self) -> <Self as Resolvable>::To {
let key_expr = self.key_expr?;
if !self.query._accepts_any_replies().unwrap_or(false)
&& !self.query.key_expr().intersects(&self.key_expr)
&& !self.query.key_expr().intersects(&key_expr)
{
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.key_expr, self.query.key_expr())
bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", &key_expr, self.query.key_expr())
}
#[allow(unused_mut)] // will be unused if feature = "unstable" is not enabled
let mut ext_sinfo = None;
Expand All @@ -318,7 +321,7 @@ impl SyncResolve for ReplyBuilder<'_> {
rid: self.query.inner.qid,
wire_expr: WireExpr {
scope: 0,
suffix: std::borrow::Cow::Owned(self.key_expr.into()),
suffix: std::borrow::Cow::Owned(key_expr.into()),
mapping: Mapping::Sender,
},
payload: ResponseBody::Reply(zenoh::Reply {
Expand Down Expand Up @@ -360,7 +363,7 @@ impl SyncResolve for ReplyBuilder<'_> {
}
}

impl<'a> AsyncResolve for ReplyBuilder<'a> {
impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Task {
tokio::select! {
_ = token.cancelled() => break,
query = queryable.recv_async() => {
query?.reply(KeyExpr::try_from(ke.to_owned())?, payload.clone()).res_async().await?;
query?.reply(ke.to_owned(), payload.clone()).res_async().await?;
},
}
}
Expand Down
5 changes: 1 addition & 4 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re
"ok_del" => {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
ztimeout!(query
.reply_del(KeyExpr::try_from(key_expr).unwrap())
.res_async())
.unwrap()
ztimeout!(query.reply_del(key_expr).res_async()).unwrap()
})
});
}
Expand Down