diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 599c0e13be..58589bfe8f 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -111,7 +111,7 @@ impl Query { #[inline(always)] #[cfg(feature = "unstable")] #[doc(hidden)] - pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_> { + pub fn reply_sample(&self, sample: Sample) -> ReplyBuilder<'_, 'static> { let Sample { key_expr, payload, @@ -126,7 +126,7 @@ impl Query { } = sample; ReplyBuilder { query: self, - key_expr, + key_expr: Ok(key_expr), payload, kind, encoding, @@ -145,18 +145,19 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply( + pub fn reply<'b, TryIntoKeyExpr, IntoPayload>( &self, - key_expr: IntoKeyExpr, + key_expr: TryIntoKeyExpr, payload: IntoPayload, - ) -> ReplyBuilder<'_> + ) -> ReplyBuilder<'_, 'b> where - IntoKeyExpr: Into>, + TryIntoKeyExpr: TryInto>, + >>::Error: Into, IntoPayload: Into, { ReplyBuilder { query: self, - key_expr: key_expr.into(), + key_expr: key_expr.try_into().map_err(Into::into), payload: payload.into(), kind: SampleKind::Put, timestamp: None, @@ -187,13 +188,14 @@ impl Query { /// Unless the query has enabled disjoint replies (you can check this through [`Query::accepts_replies`]), /// replying on a disjoint key expression will result in an error when resolving the reply. #[inline(always)] - pub fn reply_del(&self, key_expr: IntoKeyExpr) -> ReplyBuilder<'_> + pub fn reply_del<'b, TryIntoKeyExpr>(&self, key_expr: TryIntoKeyExpr) -> ReplyBuilder<'_, 'b> where - IntoKeyExpr: Into>, + TryIntoKeyExpr: TryInto>, + >>::Error: Into, { ReplyBuilder { query: self, - key_expr: key_expr.into(), + key_expr: key_expr.try_into().map_err(Into::into), payload: Payload::empty(), kind: SampleKind::Delete, timestamp: None, @@ -248,9 +250,9 @@ impl fmt::Display for Query { /// A builder returned by [`Query::reply()`](Query::reply) or [`Query::reply()`](Query::reply). #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct ReplyBuilder<'a> { +pub struct ReplyBuilder<'a, 'b> { query: &'a Query, - key_expr: KeyExpr<'static>, + key_expr: ZResult>, payload: Payload, kind: SampleKind, encoding: Encoding, @@ -270,7 +272,7 @@ pub struct ReplyErrBuilder<'a> { value: Value, } -impl<'a> ReplyBuilder<'a> { +impl<'a, 'b> ReplyBuilder<'a, 'b> { #[zenoh_macros::unstable] pub fn with_attachment(mut self, attachment: Attachment) -> Self { self.attachment = Some(attachment); @@ -292,16 +294,17 @@ impl<'a> ReplyBuilder<'a> { } } -impl<'a> Resolvable for ReplyBuilder<'a> { +impl<'a, 'b> Resolvable for ReplyBuilder<'a, 'b> { type To = ZResult<()>; } -impl SyncResolve for ReplyBuilder<'_> { +impl<'a, 'b> SyncResolve for ReplyBuilder<'a, 'b> { fn res_sync(self) -> ::To { + let key_expr = self.key_expr?; if !self.query._accepts_any_replies().unwrap_or(false) - && !self.query.key_expr().intersects(&self.key_expr) + && !self.query.key_expr().intersects(&key_expr) { - bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", self.key_expr, self.query.key_expr()) + bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", &key_expr, self.query.key_expr()) } #[allow(unused_mut)] // will be unused if feature = "unstable" is not enabled let mut ext_sinfo = None; @@ -318,7 +321,7 @@ impl SyncResolve for ReplyBuilder<'_> { rid: self.query.inner.qid, wire_expr: WireExpr { scope: 0, - suffix: std::borrow::Cow::Owned(self.key_expr.into()), + suffix: std::borrow::Cow::Owned(key_expr.into()), mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { @@ -360,7 +363,7 @@ impl SyncResolve for ReplyBuilder<'_> { } } -impl<'a> AsyncResolve for ReplyBuilder<'a> { +impl<'a, 'b> AsyncResolve for ReplyBuilder<'a, 'b> { type Future = Ready; fn res_async(self) -> Self::Future { diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index c34d06690a..b90f0f568f 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -137,7 +137,7 @@ impl Task { tokio::select! { _ = token.cancelled() => break, query = queryable.recv_async() => { - query?.reply(KeyExpr::try_from(ke.to_owned())?, payload.clone()).res_async().await?; + query?.reply(ke.to_owned(), payload.clone()).res_async().await?; }, } } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 5e86499bc7..8c2d2e9937 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -164,10 +164,7 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re "ok_del" => { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { - ztimeout!(query - .reply_del(KeyExpr::try_from(key_expr).unwrap()) - .res_async()) - .unwrap() + ztimeout!(query.reply_del(key_expr).res_async()).unwrap() }) }); }