diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 9796109510..625494a757 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -23,7 +23,7 @@ use std::{ #[cfg(feature = "unstable")] use zenoh::pubsub::Reliability; use zenoh::{ - handlers::{locked, DefaultHandler, IntoHandler}, + handlers::{locked, Callback, DefaultHandler, IntoHandler}, internal::zlock, key_expr::KeyExpr, prelude::Wait, @@ -97,7 +97,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle handler: Handler, ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let QueryingSubscriberBuilder { session, @@ -230,7 +230,7 @@ impl QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> { impl Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, { type To = ZResult>; @@ -239,7 +239,7 @@ where impl Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -288,7 +288,7 @@ where impl IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -469,7 +469,7 @@ where handler: Handler, ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let FetchingSubscriberBuilder { session, @@ -574,7 +574,7 @@ impl< TryIntoSample, > Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, TryIntoSample: ExtractSample, { @@ -589,7 +589,7 @@ impl< > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -606,7 +606,7 @@ impl< > IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -651,7 +651,7 @@ where /// ``` pub struct FetchingSubscriber { subscriber: Subscriber<()>, - callback: Arc, + callback: Callback, state: Arc>, handler: Handler, } @@ -681,7 +681,7 @@ impl FetchingSubscriber { ) -> ZResult where KeySpace: Into, - InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send, + InputHandler: IntoHandler + Send, TryIntoSample: ExtractSample + Send + Sync, { let session_id = conf.session.zid(); @@ -698,7 +698,7 @@ impl FetchingSubscriber { move |s| { let state = &mut zlock!(state); if state.pending_fetches == 0 { - callback(s); + callback.call(s); } else { tracing::trace!( "Sample received while fetch in progress: push it to merge_queue" @@ -823,7 +823,7 @@ impl FetchingSubscriber { struct RepliesHandler { state: Arc>, - callback: Arc, + callback: Callback, } impl Drop for RepliesHandler { @@ -840,7 +840,7 @@ impl Drop for RepliesHandler { state.merge_queue.len() ); for s in state.merge_queue.drain() { - (self.callback)(s); + self.callback.call(s); } } } @@ -888,7 +888,7 @@ pub struct FetchBuilder< fetch: Fetch, phantom: std::marker::PhantomData, state: Arc>, - callback: Arc, + callback: Callback, } impl) -> ZResult<()>, TryIntoSample> @@ -923,10 +923,7 @@ where } } -fn register_handler( - state: Arc>, - callback: Arc, -) -> RepliesHandler { +fn register_handler(state: Arc>, callback: Callback) -> RepliesHandler { zlock!(state).pending_fetches += 1; // pending fetches will be decremented in RepliesHandler drop() RepliesHandler { state, callback } diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index f061779142..b96fc75dd2 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -34,7 +34,7 @@ use super::{ sample::{DataInfo, Locality, SampleKind}, subscriber::SubscriberKind, }; -use crate::api::session::WeakSession; +use crate::{api::session::WeakSession, handlers::Callback}; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; @@ -54,10 +54,10 @@ pub(crate) fn init(session: WeakSession) { &admin_key, true, Locality::SessionLocal, - Arc::new({ + Callback::new(Arc::new({ let session = session.clone(); move |q| on_admin_query(&session, q) - }), + })), ); } } diff --git a/zenoh/src/api/handlers/callback.rs b/zenoh/src/api/handlers/callback.rs index 7e609949ef..23c0454de5 100644 --- a/zenoh/src/api/handlers/callback.rs +++ b/zenoh/src/api/handlers/callback.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{Dyn, IntoHandler}; + +use std::sync::Arc; + +use crate::api::handlers::IntoHandler; /// A function that can transform a [`FnMut`]`(T)` to /// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). @@ -22,50 +25,76 @@ pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { move |x| zlock!(lock)(x) } -/// An immutable callback function. -pub type Callback<'a, T> = Dyn; +/// Callback type used by zenoh entities. +pub struct Callback(Arc); + +impl Clone for Callback { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Callback { + /// Instantiate a `Callback` from a callback function. + pub fn new(cb: Arc) -> Self { + Self(cb) + } + + /// Call the inner callback. + #[inline] + pub fn call(&self, arg: T) { + self.0(arg) + } +} + +impl IntoHandler for Callback { + type Handler = (); + fn into_handler(self) -> (Callback, Self::Handler) { + (self, ()) + } +} -impl<'a, T, F> IntoHandler<'a, T> for F +impl IntoHandler for F where - F: Fn(T) + Send + Sync + 'a, + F: Fn(T) + Send + Sync + 'static, { type Handler = (); - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (Callback::new(Arc::new(self)), ()) } } -impl<'a, T, F, H> IntoHandler<'a, T> for (F, H) +impl IntoHandler for (F, H) where - F: Fn(T) + Send + Sync + 'a, + F: Fn(T) + Send + Sync + 'static, { type Handler = H; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self.0), self.1) + fn into_handler(self) -> (Callback, Self::Handler) { + (self.0.into_handler().0, self.1) } } -impl<'a, T, H> IntoHandler<'a, T> for (Callback<'static, T>, H) { +impl IntoHandler for (Callback, H) { type Handler = H; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { self } } -impl IntoHandler<'static, T> for (flume::Sender, flume::Receiver) { +impl IntoHandler for (flume::Sender, flume::Receiver) { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; ( - Dyn::new(move |t| { + Callback::new(Arc::new(move |t| { if let Err(e) = sender.send(t) { tracing::error!("{}", e) } - }), + })), receiver, ) } @@ -97,14 +126,14 @@ where } } -impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop +impl IntoHandler for CallbackDrop where - OnEvent: Fn(Event) + Send + Sync + 'a, + OnEvent: Fn(Event) + Send + Sync + 'static, DropFn: FnMut() + Send + Sync + 'static, { type Handler = (); - fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) { - (Dyn::from(move |evt| (self.callback)(evt)), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (move |evt| (self.callback)(evt)).into_handler() } } diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index f0ae1a5257..db73aae0ef 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; + +use std::sync::Arc; + +use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; /// The default handler in Zenoh is a FIFO queue. @@ -34,27 +37,27 @@ impl Default for FifoChannel { } } -impl IntoHandler<'static, T> for FifoChannel { +impl IntoHandler for FifoChannel { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { flume::bounded(self.capacity).into_handler() } } -impl IntoHandler<'static, T> +impl IntoHandler for (std::sync::mpsc::SyncSender, std::sync::mpsc::Receiver) { type Handler = std::sync::mpsc::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - tracing::error!("{}", e) + Callback::new(Arc::new(move |t| { + if let Err(error) = sender.send(t.clone()) { + tracing::error!(%error) } - }), + })), receiver, ) } diff --git a/zenoh/src/api/handlers/mod.rs b/zenoh/src/api/handlers/mod.rs index 60ec658a4d..e4a706f172 100644 --- a/zenoh/src/api/handlers/mod.rs +++ b/zenoh/src/api/handlers/mod.rs @@ -23,19 +23,16 @@ pub use ring::*; use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; -/// An alias for `Arc`. -pub type Dyn = std::sync::Arc; - /// A type that can be converted into a [`Callback`]-Handler pair. /// /// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that, /// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`]. /// /// Any closure that accepts `T` can be converted into a pair of itself and `()`. -pub trait IntoHandler<'a, T> { +pub trait IntoHandler { type Handler; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler); + fn into_handler(self) -> (Callback, Self::Handler); } /// The default handler in Zenoh is a FIFO queue. @@ -43,10 +40,10 @@ pub trait IntoHandler<'a, T> { #[derive(Default)] pub struct DefaultHandler(FifoChannel); -impl IntoHandler<'static, T> for DefaultHandler { - type Handler = >::Handler; +impl IntoHandler for DefaultHandler { + type Handler = >::Handler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { self.0.into_handler() } } diff --git a/zenoh/src/api/handlers/ring.rs b/zenoh/src/api/handlers/ring.rs index 7b058d1905..87bb860213 100644 --- a/zenoh/src/api/handlers/ring.rs +++ b/zenoh/src/api/handlers/ring.rs @@ -21,8 +21,10 @@ use std::{ use zenoh_collections::RingBuffer; use zenoh_result::ZResult; -use super::{callback::Callback, Dyn, IntoHandler}; -use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; +use crate::api::{ + handlers::{callback::Callback, IntoHandler}, + session::API_DATA_RECEPTION_CHANNEL_SIZE, +}; /// A synchronous ring channel with a limited size that allows users to keep the last N data. pub struct RingChannel { @@ -140,10 +142,10 @@ impl RingChannelHandler { } } -impl IntoHandler<'static, T> for RingChannel { +impl IntoHandler for RingChannel { type Handler = RingChannelHandler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = flume::bounded(1); let inner = Arc::new(RingChannelInner { ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)), @@ -153,7 +155,7 @@ impl IntoHandler<'static, T> for RingChannel { ring: Arc::downgrade(&inner), }; ( - Dyn::new(move |t| match inner.ring.lock() { + Callback::new(Arc::new(move |t| match inner.ring.lock() { Ok(mut g) => { // Eventually drop the oldest element. g.push_force(t); @@ -161,7 +163,7 @@ impl IntoHandler<'static, T> for RingChannel { let _ = sender.try_send(()); } Err(e) => tracing::error!("{}", e), - }), + })), receiver, ) } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 02fd2bb482..c8a0b6f194 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -525,7 +525,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler> where - Handler: crate::handlers::IntoHandler<'static, Sample>, + Handler: crate::handlers::IntoHandler, { let LivelinessSubscriberBuilder { session, @@ -568,7 +568,7 @@ impl LivelinessSubscriberBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] impl<'a, Handler> Resolvable for LivelinessSubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -577,7 +577,7 @@ where #[zenoh_macros::unstable] impl Wait for LivelinessSubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -611,7 +611,7 @@ where #[zenoh_macros::unstable] impl IntoFuture for LivelinessSubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -732,7 +732,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let LivelinessGetBuilder { session, @@ -760,7 +760,7 @@ impl LivelinessGetBuilder<'_, '_, Handler> { impl Resolvable for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -768,7 +768,7 @@ where impl Wait for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -782,7 +782,7 @@ where impl IntoFuture for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 3f5a6ca912..dfaf2deb2e 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -723,7 +723,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, MatchingStatus>, + Handler: IntoHandler, { let MatchingListenerBuilder { publisher, @@ -756,7 +756,7 @@ impl MatchingListenerBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] impl Resolvable for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -765,7 +765,7 @@ where #[zenoh_macros::unstable] impl Wait for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -791,7 +791,7 @@ where #[zenoh_macros::unstable] impl IntoFuture for MatchingListenerBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -809,7 +809,7 @@ pub(crate) struct MatchingListenerState { pub(crate) current: Mutex, pub(crate) key_expr: KeyExpr<'static>, pub(crate) destination: Locality, - pub(crate) callback: Callback<'static, MatchingStatus>, + pub(crate) callback: Callback, } #[zenoh_macros::unstable] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index ae55d5ab8d..7cb8d5eb84 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -154,7 +154,7 @@ impl From for Result { #[cfg(feature = "unstable")] pub(crate) struct LivelinessQueryState { - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } pub(crate) struct QueryState { @@ -163,7 +163,7 @@ pub(crate) struct QueryState { pub(crate) parameters: Parameters<'static>, pub(crate) reception_mode: ConsolidationMode, pub(crate) replies: Option>, - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } impl QueryState { @@ -333,7 +333,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let SessionGetBuilder { session, @@ -444,7 +444,7 @@ pub enum ReplyKeyExpr { impl Resolvable for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -452,7 +452,7 @@ where impl Wait for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -483,7 +483,7 @@ where impl IntoFuture for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 46cbb8ccd7..97675336b7 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -53,6 +53,7 @@ use crate::{ value::Value, Id, }, + handlers::Callback, net::primitives::Primitives, Session, }; @@ -530,7 +531,7 @@ pub(crate) struct QueryableState { pub(crate) key_expr: WireExpr<'static>, pub(crate) complete: bool, pub(crate) origin: Locality, - pub(crate) callback: Arc, + pub(crate) callback: Callback, } impl fmt::Debug for QueryableState { @@ -690,7 +691,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Query>, + Handler: IntoHandler, { let QueryableBuilder { session, @@ -903,7 +904,7 @@ impl DerefMut for Queryable { impl Resolvable for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -911,7 +912,7 @@ where impl Wait for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -938,7 +939,7 @@ where impl IntoFuture for QueryableBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 0a41294548..73c0afcbdf 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -127,7 +127,7 @@ impl ScoutBuilder { #[inline] pub fn with(self, handler: Handler) -> ScoutBuilder where - Handler: IntoHandler<'static, Hello>, + Handler: IntoHandler, { let ScoutBuilder { what, @@ -144,7 +144,7 @@ impl ScoutBuilder { impl Resolvable for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -152,7 +152,7 @@ where impl Wait for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -163,7 +163,7 @@ where impl IntoFuture for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -286,7 +286,7 @@ impl Scout { fn _scout( what: WhatAmIMatcher, config: zenoh_config::Config, - callback: Callback<'static, Hello>, + callback: Callback, ) -> ZResult { tracing::trace!("scout({}, {})", what, &config); let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address); @@ -316,7 +316,7 @@ fn _scout( let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); async move { - callback(hello.into()); + callback.call(hello.into()); Loop::Continue } }); diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 6a939e9fa3..4dbad4e6b2 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1241,7 +1241,7 @@ impl SessionInner { self: &Arc, key_expr: &KeyExpr, origin: Locality, - callback: Callback<'static, Sample>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_subscriber({:?})", key_expr); @@ -1449,7 +1449,7 @@ impl SessionInner { key_expr: &WireExpr, complete: bool, origin: Locality, - callback: Callback<'static, Query>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_queryable({:?})", key_expr); @@ -1549,7 +1549,7 @@ impl SessionInner { key_expr: &KeyExpr, origin: Locality, history: bool, - callback: Callback<'static, Sample>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); trace!("declare_liveliness_subscriber({:?})", key_expr); @@ -1645,7 +1645,7 @@ impl SessionInner { pub(crate) fn declare_matches_listener_inner( &self, publisher: &Publisher, - callback: Callback<'static, MatchingStatus>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); let id = self.runtime.next_id(); @@ -1667,7 +1667,9 @@ impl SessionInner { .unwrap_or(true) { *current = true; - (listener_state.callback)(MatchingStatus { matching: true }); + listener_state + .callback + .call(MatchingStatus { matching: true }); } } Err(e) => tracing::error!("Error trying to acquire MathginListener lock: {}", e), @@ -1732,7 +1734,7 @@ impl SessionInner { if status.matching_subscribers() { *current = true; let callback = msub.callback.clone(); - (callback)(status) + callback.call(status) } } } @@ -1770,7 +1772,7 @@ impl SessionInner { if !status.matching_subscribers() { *current = false; let callback = msub.callback.clone(); - (callback)(status) + callback.call(status) } } } @@ -1857,26 +1859,22 @@ impl SessionInner { } }; drop(state); + let mut sample = info.clone().into_sample( + // SAFETY: the keyexpr is valid + unsafe { KeyExpr::from_str_unchecked("dummy") }, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - let sample = info.clone().into_sample( - key_expr, - payload.clone(), - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call(sample.clone()); } if let Some((cb, key_expr)) = last { - let sample = info.into_sample( - key_expr, - payload, - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call(sample); } } @@ -1893,7 +1891,7 @@ impl SessionInner { value: Option, attachment: Option, #[cfg(feature = "unstable")] source: SourceInfo, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!( "get({}, {:?}, {:?})", @@ -1929,10 +1927,10 @@ impl SessionInner { tracing::debug!("Timeout on query {}! Send error and close.", qid); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call(reply); } } - (query.callback)(Reply { + query.callback.call(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -2015,7 +2013,7 @@ impl SessionInner { self: &Arc, key_expr: &KeyExpr<'_>, timeout: Duration, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!("liveliness.get({}, {:?})", key_expr, timeout); let mut state = zwrite!(self.state); @@ -2032,7 +2030,7 @@ impl SessionInner { if let Some(query) = state.liveliness_queries.remove(&id) { std::mem::drop(state); tracing::debug!("Timeout on liveliness query {}! Send error and close.", id); - (query.callback)(Reply { + query.callback.call(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -2107,7 +2105,7 @@ impl SessionInner { } ) .map(|(id, qable)| (*id, qable.callback.clone())) - .collect::)>>(); + .collect::)>>(); (primitives, key_expr.into_owned(), queryables) } Err(err) => { @@ -2130,16 +2128,18 @@ impl SessionInner { primitives }, }); - for (eid, callback) in queryables { - callback(Query { - inner: query_inner.clone(), - eid, - value: body.as_ref().map(|b| Value { - payload: b.payload.clone().into(), - encoding: b.encoding.clone().into(), - }), - attachment: attachment.clone(), - }); + let mut query = Query { + inner: query_inner, + eid: 0, + value: body.map(|b| Value { + payload: b.payload.into(), + encoding: b.encoding.into(), + }), + attachment, + }; + for (eid, cb) in queryables { + query.eid = eid; + cb.call(query.clone()); } } } @@ -2251,7 +2251,7 @@ impl Primitives for WeakSession { replier_id: None, }; - (query.callback)(reply); + query.callback.call(reply); } } else { state.remote_tokens.insert(m.id, key_expr.clone()); @@ -2428,7 +2428,7 @@ impl Primitives for WeakSession { #[cfg(feature = "unstable")] replier_id: e.ext_sinfo.map(|info| info.id.zid), }; - callback(new_reply); + callback.call(new_reply); } None => { tracing::warn!("Received ReplyData for unknown Query: {}", msg.rid); @@ -2598,7 +2598,7 @@ impl Primitives for WeakSession { }; std::mem::drop(state); if let Some((callback, new_reply)) = callback { - callback(new_reply); + callback.call(new_reply); } } None => { @@ -2620,7 +2620,7 @@ impl Primitives for WeakSession { std::mem::drop(state); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call(reply); } } trace!("Close query {}", msg.rid); diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index a2a3024cf7..3cbe16044e 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -41,7 +41,7 @@ pub(crate) struct SubscriberState { pub(crate) remote_id: Id, pub(crate) key_expr: KeyExpr<'static>, pub(crate) origin: Locality, - pub(crate) callback: Callback<'static, Sample>, + pub(crate) callback: Callback, } impl fmt::Debug for SubscriberState { @@ -226,7 +226,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let SubscriberBuilder { session, @@ -304,7 +304,7 @@ impl SubscriberBuilder<'_, '_, Handler> { // Push mode impl Resolvable for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -312,7 +312,7 @@ where impl Wait for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -337,7 +337,7 @@ where impl IntoFuture for SubscriberBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To;