diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs index 74f2ada1f8..882764b8f9 100644 --- a/zenoh-ext/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/z_pub_cache.rs @@ -23,7 +23,7 @@ async fn main() { // Initiate logging env_logger::init(); - let (config, key_expr, value, history, prefix) = parse_args(); + let (config, key_expr, value, history, prefix, complete) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -31,7 +31,8 @@ async fn main() { println!("Declaring PublicationCache on {}", &key_expr); let mut publication_cache_builder = session .declare_publication_cache(&key_expr) - .history(history); + .history(history) + .queryable_complete(complete); if let Some(prefix) = prefix { publication_cache_builder = publication_cache_builder.queryable_prefix(prefix); } @@ -45,7 +46,7 @@ async fn main() { } } -fn parse_args() -> (Config, String, String, usize, Option) { +fn parse_args() -> (Config, String, String, usize, Option, bool) { let args = Command::new("zenoh-ext pub cache example") .arg( arg!(-m --mode [MODE] "The zenoh session mode (peer by default)") @@ -59,11 +60,12 @@ fn parse_args() -> (Config, String, String, usize, Option) { ) .arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!")) .arg( - arg!(-h --history [SIZE] "The number of publications to keep in cache") + arg!(-i --history [SIZE] "The number of publications to keep in cache") .default_value("1"), ) .arg(arg!(-x --prefix [STRING] "An optional queryable prefix")) .arg(arg!(-c --config [FILE] "A configuration file.")) + .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) .get_matches(); @@ -101,6 +103,7 @@ fn parse_args() -> (Config, String, String, usize, Option) { let value = args.get_one::("value").unwrap().to_string(); let history: usize = args.get_one::("history").unwrap().parse().unwrap(); let prefix = args.get_one::("prefix").map(|s| (*s).to_owned()); + let complete = args.get_flag("complete"); - (config, key_expr, value, history, prefix) + (config, key_expr, value, history, prefix, complete) } diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index dca488ba80..7440d80a53 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder}; pub use querying_subscriber::{ FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder, }; -pub use session_ext::{ArcSessionExt, SessionExt}; +pub use session_ext::SessionExt; pub use subscriber_ext::SubscriberBuilderExt; pub use subscriber_ext::SubscriberForward; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 7ae440b02c..cd5ed964ad 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -32,7 +32,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> { session: SessionRef<'a>, pub_key_expr: ZResult>, queryable_prefix: Option>>, - queryable_origin: Locality, + queryable_origin: Option, + complete: Option, history: usize, resources_limit: Option, } @@ -46,7 +47,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { session, pub_key_expr, queryable_prefix: None, - queryable_origin: Locality::default(), + queryable_origin: None, + complete: None, history: 1, resources_limit: None, } @@ -67,7 +69,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { #[zenoh_macros::unstable] #[inline] pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { - self.queryable_origin = origin; + self.queryable_origin = Some(origin); + self + } + + /// Set completeness option for the queryable. + pub fn queryable_complete(mut self, complete: bool) -> Self { + self.complete = Some(complete); self } @@ -137,28 +145,21 @@ impl<'a> PublicationCache<'a> { } // declare the local subscriber that will store the local publications - let (local_sub, queryable) = match conf.session.clone() { - SessionRef::Borrow(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - SessionRef::Shared(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - }; + let local_sub = conf + .session + .declare_subscriber(&key_expr) + .allowed_origin(Locality::SessionLocal) + .res_sync()?; + + // declare the queryable which returns the cached publications + let mut queryable = conf.session.declare_queryable(&queryable_key_expr); + if let Some(origin) = conf.queryable_origin { + queryable = queryable.allowed_origin(origin); + } + if let Some(complete) = conf.complete { + queryable = queryable.complete(complete); + } + let queryable = queryable.res_sync()?; // take local ownership of stuff to be moved into task let sub_recv = local_sub.receiver.clone(); diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 1083c111c4..4a7c4f2ded 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -680,33 +680,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> { // register fetch handler let handler = register_handler(state.clone(), callback.clone()); // declare subscriber - let subscriber = match conf.session.clone() { - SessionRef::Borrow(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, - SessionRef::Shared(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, + let subscriber = match conf.key_space.into() { + crate::KeySpace::User => conf + .session + .declare_subscriber(&key_expr) + .callback(sub_callback) + .reliability(conf.reliability) + .allowed_origin(conf.origin) + .res_sync()?, + crate::KeySpace::Liveliness => conf + .session + .liveliness() + .declare_subscriber(&key_expr) + .callback(sub_callback) + .res_sync()?, }; let fetch_subscriber = FetchingSubscriber { diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 3f9a428293..2a2c1df97b 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -18,67 +18,49 @@ use zenoh::prelude::KeyExpr; use zenoh::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) -pub trait SessionExt { - type PublicationCacheBuilder<'a, 'b, 'c> - where - Self: 'a; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, +pub trait SessionExt<'s, 'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &'s self, pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into; } -impl SessionExt for Session { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, +impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Borrow(self), - pub_key_expr.try_into().map_err(Into::into), - ) + PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into)) } } -impl SessionExt for T { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( +impl<'a> SessionExt<'a, 'a> for Session { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'a self, pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - ArcSessionExt::declare_publication_cache(self, pub_key_expr) + SessionRef::Borrow(self).declare_publication_cache(pub_key_expr) } } -pub trait ArcSessionExt { - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'static, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; -} - -impl ArcSessionExt for Arc { +impl<'s> SessionExt<'s, 'static> for Arc { /// Examples: /// ``` /// # async_std::task::block_on(async { /// use zenoh::prelude::r#async::*; /// use zenoh::config::ModeDependentValue::Unique; - /// use zenoh_ext::ArcSessionExt; + /// use zenoh_ext::SessionExt; /// /// let mut config = config::default(); /// config.timestamping.set_enabled(Some(Unique(true))); @@ -90,16 +72,13 @@ impl ArcSessionExt for Arc { /// # }) /// ``` fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'static, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Shared(self.clone()), - pub_key_expr.try_into().map_err(Into::into), - ) + SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr) } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 8989cc1cb7..d52c446d3d 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -284,6 +284,69 @@ pub enum SessionRef<'a> { Shared(Arc), } +impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + SubscriberBuilder { + session: self.clone(), + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + reliability: Reliability::default(), + mode: PushMode, + origin: Locality::default(), + handler: DefaultHandler, + } + } + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> QueryableBuilder<'a, 'b, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + QueryableBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + complete: false, + origin: Locality::default(), + handler: DefaultHandler, + } + } + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> PublisherBuilder<'a, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + PublisherBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + congestion_control: CongestionControl::default(), + priority: Priority::default(), + destination: Locality::default(), + } + } + #[zenoh_macros::unstable] + fn liveliness(&'s self) -> Liveliness<'a> { + Liveliness { + session: self.clone(), + } + } + fn info(&'s self) -> SessionInfo<'a> { + SessionInfo { + session: self.clone(), + } + } +} + impl Deref for SessionRef<'_> { type Target = Session; @@ -499,43 +562,13 @@ impl Session { pub fn config(&self) -> &Notifier { self.runtime.config() } +} - /// Get informations about the zenoh [`Session`](Session). - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let info = session.info(); - /// # }) - /// ``` - pub fn info(&self) -> SessionInfo { - SessionInfo { - session: SessionRef::Borrow(self), - } +impl<'a> SessionDeclarations<'a, 'a> for Session { + fn info(&self) -> SessionInfo { + SessionRef::Borrow(self).info() } - - /// Create a [`Subscriber`](Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session.declare_subscriber("key/expression").res().await.unwrap(); - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// # }) - /// ``` - pub fn declare_subscriber<'a, 'b, TryIntoKeyExpr>( + fn declare_subscriber<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> @@ -543,39 +576,9 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SubscriberBuilder { - session: SessionRef::Borrow(self), - key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - reliability: Reliability::default(), - mode: PushMode, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_subscriber(key_expr) } - - /// Create a [`Queryable`](Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let queryable = session.declare_queryable("key/expression").res().await.unwrap(); - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply(Ok(Sample::try_from( - /// "key/expression", - /// "value", - /// ).unwrap())).res().await.unwrap(); - /// } - /// # }) - /// ``` - pub fn declare_queryable<'a, 'b, TryIntoKeyExpr>( + fn declare_queryable<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'a, 'b, DefaultHandler> @@ -583,35 +586,9 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - QueryableBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - complete: false, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_queryable(key_expr) } - - /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let publisher = session.declare_publisher("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// publisher.put("value").res().await.unwrap(); - /// # }) - /// ``` - pub fn declare_publisher<'a, 'b, TryIntoKeyExpr>( + fn declare_publisher<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'a, 'b> @@ -619,15 +596,15 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - congestion_control: CongestionControl::default(), - priority: Priority::default(), - destination: Locality::default(), - } + SessionRef::Borrow(self).declare_publisher(key_expr) + } + #[zenoh_macros::unstable] + fn liveliness(&'a self) -> Liveliness { + SessionRef::Borrow(self).liveliness() } +} +impl Session { /// Informs Zenoh that you intend to use `key_expr` multiple times and that it should optimize its transmission. /// /// The returned `KeyExpr`'s internal structure may differ from what you would have obtained through a simple @@ -808,29 +785,6 @@ impl Session { handler: DefaultHandler, } } - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # }) - /// ``` - #[zenoh_macros::unstable] - pub fn liveliness(&self) -> Liveliness { - Liveliness { - session: SessionRef::Borrow(self), - } - } } impl Session { @@ -1970,7 +1924,7 @@ impl Session { } } -impl SessionDeclarations for Arc { +impl<'s> SessionDeclarations<'s, 'static> for Arc { /// Create a [`Subscriber`](Subscriber) for the given key expression. /// /// # Arguments @@ -1995,7 +1949,7 @@ impl SessionDeclarations for Arc { /// # }) /// ``` fn declare_subscriber<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'static, 'b, PushMode, DefaultHandler> where @@ -2040,7 +1994,7 @@ impl SessionDeclarations for Arc { /// # }) /// ``` fn declare_queryable<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'static, 'b, DefaultHandler> where @@ -2076,7 +2030,7 @@ impl SessionDeclarations for Arc { /// # }) /// ``` fn declare_publisher<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'static, 'b> where @@ -2109,11 +2063,17 @@ impl SessionDeclarations for Arc { /// # }) /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static> { + fn liveliness(&'s self) -> Liveliness<'static> { Liveliness { session: SessionRef::Shared(self.clone()), } } + + fn info(&'s self) -> SessionInfo<'static> { + SessionInfo { + session: SessionRef::Shared(self.clone()), + } + } } impl Primitives for Session { @@ -2533,14 +2493,14 @@ impl fmt::Debug for Session { } } -/// Functions to create zenoh entities with `'static` lifetime. +/// Functions to create zenoh entities /// /// This trait contains functions to create zenoh entities like /// [`Subscriber`](crate::subscriber::Subscriber), and -/// [`Queryable`](crate::queryable::Queryable) with a `'static` lifetime. -/// This is useful to move zenoh entities to several threads and tasks. +/// [`Queryable`](crate::queryable::Queryable) /// -/// This trait is implemented for `Arc`. +/// This trait is implemented by [`Session`](crate::session::Session) itself and +/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](crate::session::Arc) /// /// # Examples /// ```no_run @@ -2559,7 +2519,7 @@ impl fmt::Debug for Session { /// }).await; /// # }) /// ``` -pub trait SessionDeclarations { +pub trait SessionDeclarations<'s, 'a> { /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. /// /// # Arguments @@ -2583,13 +2543,13 @@ pub trait SessionDeclarations { /// }).await; /// # }) /// ``` - fn declare_subscriber<'a, TryIntoKeyExpr>( - &self, + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'static, 'a, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. /// @@ -2618,13 +2578,13 @@ pub trait SessionDeclarations { /// }).await; /// # }) /// ``` - fn declare_queryable<'a, TryIntoKeyExpr>( - &self, + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'static, 'a, DefaultHandler> + ) -> QueryableBuilder<'a, 'b, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. /// @@ -2645,13 +2605,13 @@ pub trait SessionDeclarations { /// publisher.put("value").res().await.unwrap(); /// # }) /// ``` - fn declare_publisher<'a, TryIntoKeyExpr>( - &self, + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'static, 'a> + ) -> PublisherBuilder<'a, 'b> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. /// @@ -2670,7 +2630,19 @@ pub trait SessionDeclarations { /// # }) /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static>; + fn liveliness(&'s self) -> Liveliness<'a>; + /// Get informations about the zenoh [`Session`](Session). + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let info = session.info(); + /// # }) + /// ``` + fn info(&'s self) -> SessionInfo<'a>; } impl crate::net::primitives::EPrimitives for Session {