diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 3cab950678..874fe456b7 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -297,6 +297,36 @@ impl<'a> Publisher<'a> { self } + /// Consumes the given `Publisher`, returning a thread-safe reference-counting + /// pointer to it (`Arc`). This is equivalent to `Arc::new(Publisher)`. + /// + /// This is useful to share ownership of the `Publisher` between several threads + /// and tasks. It also alows to create [`MatchingListener`] with static + /// lifetime that can be moved to several threads and tasks. + /// + /// Note: the given zenoh `Publisher` will be undeclared when the last reference to + /// it is dropped. + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.is_matching() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` #[zenoh_macros::unstable] pub fn into_arc(self) -> std::sync::Arc { std::sync::Arc::new(self) @@ -365,7 +395,10 @@ impl<'a> Publisher<'a> { self._write(SampleKind::Delete, Value::empty()) } - /// Return true if there exist subscribers matching this Publisher's key expression. + /// Return the [`MatchingStatus`] of the publisher. + /// + /// [`MatchingStatus::is_matching`] will return true if there exist Subscribers + /// matching the Publisher's key expression and false otherwise. /// /// # Examples /// ``` @@ -374,7 +407,7 @@ impl<'a> Publisher<'a> { /// /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); - /// publisher.matching_status().res().await.unwrap().is_matching(); + /// let is_matching: bool = publisher.matching_status().res().await.unwrap().is_matching(); /// # }) /// ``` #[zenoh_macros::unstable] @@ -398,6 +431,11 @@ impl<'a> Publisher<'a> { zenoh_core::ResolveFuture::new(async move { Ok(MatchingStatus { matching }) }) } + /// Return a [`MatchingListener`] for this Publisher. + /// + /// The [`MatchingListener`] that will send a notification each time the [`MatchingStatus`] of + /// the Publisher changes. + /// /// # Examples /// ```no_run /// # async_std::task::block_on(async { @@ -440,6 +478,34 @@ impl<'a> Publisher<'a> { } } +/// Functions to create zenoh entities with `'static` lifetime. +/// +/// This trait contains functions to create zenoh entities like +/// [`MatchingListener`] with a `'static` lifetime. +/// This is useful to move zenoh entities to several threads and tasks. +/// +/// This trait is implemented for `Arc`. +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); +/// let matching_listener = publisher.matching_listener().res().await.unwrap(); +/// +/// async_std::task::spawn(async move { +/// while let Ok(matching_status) = matching_listener.recv_async().await { +/// if matching_status.is_matching() { +/// println!("Publisher has matching subscribers."); +/// } else { +/// println!("Publisher has NO MORE matching subscribers."); +/// } +/// } +/// }).await; +/// # }) +/// ``` #[zenoh_macros::unstable] pub trait PublisherDeclarations { /// # Examples @@ -848,6 +914,7 @@ impl From for zenoh_protocol::core::Priority { } } +/// A struct that indicates if there exist Subscribers matching the Publisher's key expression. #[zenoh_macros::unstable] #[derive(Copy, Clone, Debug)] pub struct MatchingStatus { @@ -856,11 +923,13 @@ pub struct MatchingStatus { #[zenoh_macros::unstable] impl MatchingStatus { + /// Return true if there exist Subscribers matching the Publisher's key expression. pub fn is_matching(&self) -> bool { self.matching } } +/// A builder for initializing a [`MatchingListener`]. #[zenoh_macros::unstable] #[derive(Debug)] pub struct MatchingListenerBuilder<'a, Handler> { @@ -870,6 +939,29 @@ pub struct MatchingListenerBuilder<'a, Handler> { #[zenoh_macros::unstable] impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { + /// Receive the MatchingStatuses for this listener with a callback. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback(|matching_status| { + /// if matching_status.is_matching() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// }) + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` #[inline] #[zenoh_macros::unstable] pub fn callback(self, callback: Callback) -> MatchingListenerBuilder<'a, Callback> @@ -886,6 +978,24 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { } } + /// Receive the MatchingStatuses for this listener with a mutable callback. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let mut n = 0; + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback_mut(move |_matching_status| { n += 1; }) + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` #[inline] #[zenoh_macros::unstable] pub fn callback_mut( @@ -898,6 +1008,30 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { self.callback(crate::handlers::locked(callback)) } + /// Receive the MatchingStatuses for this listener with a [`Handler`](crate::prelude::IntoCallbackReceiverPair). + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .with(flume::bounded(32)) + /// .res() + /// .await + /// .unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.is_matching() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # }) + /// ``` #[inline] #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler> @@ -988,6 +1122,9 @@ impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListene } } +/// A listener that sends notifications when the [`MatchingStatus`] of a +/// publisher changes. +/// /// # Examples /// ```no_run /// # async_std::task::block_on(async { @@ -1013,6 +1150,22 @@ pub struct MatchingListener<'a, Receiver> { #[zenoh_macros::unstable] impl<'a, Receiver> MatchingListener<'a, Receiver> { + /// Close a [`MatchingListener`]. + /// + /// MatchingListeners are automatically closed when dropped, but you may want to use this function to handle errors or + /// close the MatchingListener asynchronously. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// matching_listener.undeclare().res().await.unwrap(); + /// # }) + /// ``` #[inline] pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { self.subscriber.undeclare()