From 83bc4edb56a76f98d4f692db12c458a13bfcaf16 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Wed, 11 Dec 2024 13:38:44 +0100 Subject: [PATCH] feat: add background to advanced subscriber (#1651) * Expose and use ke macro * Fix SourceInfo publication * Add AdvancedPublisher AdvancedSubscriber and AdvancedSubscriber * Fix doctests * Fix doc warnings * Remove debug trace * Add history test * Fix periodic queries * Remove debug trace * Lower test debug level * Add retransmission tests * Liveliness sub callback shoud increase pending queries counter * Liveliness sub callback shoud spawn periodic queries when enbaled * Add late_joiner test * Only treat pending samples when there are no more pending queries * Apply proper sequencing for history * Improve AdvancedSubscriber * Code reorg * Code reorg * Fix deduplication * Subscribe to liveliness tokens with history * Update builders * Add examples * Fix rustdoc * Move stuff in State * Code reorg * Add smaple_miss_callback * Add sample miss test * Update z_advanced_sub example * Explicit use in examples * Update API * Fix rustdoc * Allow sample miss detection when recovery disabled * Add miss_sample_callback to DataSubscriberBuilderExt * Add sample_miss_detection to PublisherBuilderExt * Add test_advanced_sample_miss test * Deliver sample even when no miss callback * Replace sample_miss_callback with sample_miss_listener * Fix clippy warnings * Fix tests * Add HistoryConf max_samples option * Add HistoryConf max_age option * Use BTreeMap * Add meta_keyexpr option * Add late_joiner_detection and meta_keyexpr options on Subcriber side * Renaming * Fix compilation issues * Remove AdvancedCache from public API * Update Session admin to match AdvancedSub * Gather constants * Fix doc build * Renaming * Mark PublicationCache and QueryingSubscriber as deprecated and remove related examples * Remove z_pub_cache and z_query_sub entries from zenoh-ext examples README * Add z_advanced_pub and z_advanced_sub to zenoh-ext examples Cargo.toml * Add CacheConfig replies_qos option * Call cache directly from publisher * Update doc * Add missing unstable tags * Add missing unstable tags * Add missing unstable tags * Add unstable tag everywhere * Add missing AdvancedSubscriber methods * Fix WeakSession::Session internal function * Expose missing SampleMissListener and related structs * Add AdvancedPublisherBuilderExt::advanced function * Add missing AdvancedPublisherBuilder functions * Fix doctests * Expose Miss struct * impl QoSBuilderTrait for AdvancedPublisherBuilder * Propagate PublisherBuilder values to AdvancedPublisherBuilder * Rename AdvancedSubscriber::close() * Add unstable tags * Add AdvancedSubscriber::detect_publishers function * Remove debug println * Renaming * Add unstable tags * Use std Range * Spawn Timer in a tokio runtime * Fix panic when last_delivered is None * Release lock before calling get * Update key mapping * Improve doc * fix: fix callback API (#1647) * Update doc * feat: add background to advanced subscriber * Fix ke_liveliness * fix: add missing mut * Fix doc * Fix doc * Fix zenoh-ext Cargo.toml --------- Co-authored-by: OlivierHecart --- zenoh-ext/src/advanced_subscriber.rs | 82 ++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 16 deletions(-) diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e0d9ed7e3a..e922c4ecb2 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -189,7 +189,7 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> { { AdvancedSubscriberBuilder { session: self.session, - key_expr: self.key_expr.map(|s| s.into_owned()), + key_expr: self.key_expr, origin: self.origin, retransmission: self.retransmission, query_target: self.query_target, @@ -203,7 +203,30 @@ impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, DefaultHandler> { } #[zenoh_macros::unstable] -impl<'a, 'c, Handler> AdvancedSubscriberBuilder<'a, '_, 'c, Handler> { +impl<'a, 'b, 'c> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback> { + /// Register the subscriber callback to be run in background until the session is closed. + /// + /// Background builder doesn't return a `AdvancedSubscriber` object anymore. + pub fn background(self) -> AdvancedSubscriberBuilder<'a, 'b, 'c, Callback, true> { + AdvancedSubscriberBuilder { + session: self.session, + key_expr: self.key_expr, + origin: self.origin, + retransmission: self.retransmission, + query_target: self.query_target, + query_timeout: self.query_timeout, + history: self.history, + liveliness: self.liveliness, + meta_key_expr: self.meta_key_expr, + handler: self.handler, + } + } +} + +#[zenoh_macros::unstable] +impl<'a, 'c, Handler, const BACKGROUND: bool> + AdvancedSubscriberBuilder<'a, '_, 'c, Handler, BACKGROUND> +{ /// Restrict the matching publications that will be receive by this [`Subscriber`] /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -323,6 +346,35 @@ where } } +#[zenoh_macros::unstable] +impl Resolvable for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + type To = ZResult<()>; +} + +#[zenoh_macros::unstable] +impl Wait for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + #[zenoh_macros::unstable] + fn wait(self) -> ::To { + let mut sub = AdvancedSubscriber::new(self.with_static_keys())?; + sub.subscriber.set_background(true); + if let Some(mut liveliness_sub) = sub.liveliness_subscriber.take() { + liveliness_sub.set_background(true); + } + Ok(()) + } +} + +#[zenoh_macros::unstable] +impl IntoFuture for AdvancedSubscriberBuilder<'_, '_, '_, Callback, true> { + type Output = ::To; + type IntoFuture = Ready<::To>; + + #[zenoh_macros::unstable] + fn into_future(self) -> Self::IntoFuture { + std::future::ready(self.wait()) + } +} + #[zenoh_macros::unstable] struct Period { timer: Timer, @@ -343,6 +395,7 @@ struct State { query_timeout: Duration, callback: Callback, miss_handlers: HashMap>, + token: Option, } #[zenoh_macros::unstable] @@ -387,8 +440,7 @@ pub struct AdvancedSubscriber { statesref: Arc>, subscriber: Subscriber<()>, receiver: Receiver, - _liveliness_subscriber: Option>, - _token: Option, + liveliness_subscriber: Option>, } #[zenoh_macros::unstable] @@ -573,6 +625,7 @@ impl AdvancedSubscriber { query_timeout: conf.query_timeout, callback: callback.clone(), miss_handlers: HashMap::new(), + token: None, })); let sub_callback = { @@ -868,7 +921,7 @@ impl AdvancedSubscriber { None }; - let token = if conf.liveliness { + if conf.liveliness { let prefix = KE_ADV_PREFIX / KE_SUB / &subscriber.id().zid().into_keyexpr() @@ -878,22 +931,19 @@ impl AdvancedSubscriber { // We need this empty chunk because af a routing matching bug _ => prefix / KE_EMPTY / KE_AT, }; - Some( - conf.session - .liveliness() - .declare_token(prefix / &key_expr) - .wait()?, - ) - } else { - None - }; + let token = conf + .session + .liveliness() + .declare_token(prefix / &key_expr) + .wait()?; + zlock!(statesref).token = Some(token) + } let reliable_subscriber = AdvancedSubscriber { statesref, subscriber, receiver, - _liveliness_subscriber: liveliness_subscriber, - _token: token, + liveliness_subscriber, }; Ok(reliable_subscriber)