From 2efff1f91cf5dd8f23f8dc541f812c73175315ac Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 3 Sep 2024 05:52:25 +0200 Subject: [PATCH] feat!: make session an arc-like object The refactoring is quite deep, so this is the first (dirty) iteration which passes the tests. --- examples/examples/z_forward.rs | 2 +- examples/examples/z_get_liveliness.rs | 2 +- examples/examples/z_info.rs | 2 +- examples/examples/z_liveliness.rs | 2 +- examples/examples/z_pong.rs | 2 +- examples/examples/z_pull.rs | 2 +- examples/examples/z_queryable.rs | 2 +- examples/examples/z_queryable_shm.rs | 1 - examples/examples/z_storage.rs | 1 - examples/examples/z_sub.rs | 2 +- examples/examples/z_sub_liveliness.rs | 2 +- examples/examples/z_sub_shm.rs | 2 +- plugins/zenoh-plugin-example/src/lib.rs | 1 - .../zenoh-plugin-rest/examples/z_serve_sse.rs | 1 - plugins/zenoh-plugin-rest/src/lib.rs | 2 +- .../src/replica/mod.rs | 2 +- .../src/replica/storage.rs | 2 +- zenoh-ext/examples/examples/z_query_sub.rs | 2 +- zenoh-ext/src/publication_cache.rs | 25 +- zenoh-ext/src/querying_subscriber.rs | 24 +- zenoh-ext/src/session_ext.rs | 59 +- zenoh-ext/src/subscriber_ext.rs | 2 +- zenoh-ext/tests/liveliness.rs | 4 +- zenoh/src/api/admin.rs | 52 +- zenoh/src/api/builders/publisher.rs | 70 +- zenoh/src/api/info.rs | 29 +- zenoh/src/api/key_expr.rs | 14 +- zenoh/src/api/liveliness.rs | 81 +- zenoh/src/api/publisher.rs | 215 +--- zenoh/src/api/query.rs | 1 + zenoh/src/api/queryable.rs | 73 +- zenoh/src/api/session.rs | 1060 ++++++----------- zenoh/src/api/subscriber.rs | 85 +- zenoh/src/lib.rs | 5 +- zenoh/src/prelude.rs | 4 +- zenoh/tests/events.rs | 1 - zenoh/tests/liveliness.rs | 12 +- zenoh/tests/routing.rs | 6 +- 38 files changed, 672 insertions(+), 1182 deletions(-) diff --git a/examples/examples/z_forward.rs b/examples/examples/z_forward.rs index be9df7e2b0..1f6969766f 100644 --- a/examples/examples/z_forward.rs +++ b/examples/examples/z_forward.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; use zenoh_ext::SubscriberForward; diff --git a/examples/examples/z_get_liveliness.rs b/examples/examples/z_get_liveliness.rs index 53f7abc92a..d0040fcea4 100644 --- a/examples/examples/z_get_liveliness.rs +++ b/examples/examples/z_get_liveliness.rs @@ -14,7 +14,7 @@ use std::time::Duration; use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_info.rs b/examples/examples/z_info.rs index aa40ef62d4..606cdcbd16 100644 --- a/examples/examples/z_info.rs +++ b/examples/examples/z_info.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{prelude::*, session::ZenohId}; +use zenoh::session::ZenohId; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_liveliness.rs b/examples/examples/z_liveliness.rs index bf8890a267..1c51c2fce6 100644 --- a/examples/examples/z_liveliness.rs +++ b/examples/examples/z_liveliness.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_pong.rs b/examples/examples/z_pong.rs index 86b31d41f3..8794d6bc7a 100644 --- a/examples/examples/z_pong.rs +++ b/examples/examples/z_pong.rs @@ -21,7 +21,7 @@ fn main() { let (config, express) = parse_args(); - let session = zenoh::open(config).wait().unwrap().into_arc(); + let session = zenoh::open(config).wait().unwrap(); // The key expression to read the data from let key_expr_ping = keyexpr::new("test/ping").unwrap(); diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index 6716ef8cc5..1239f7347f 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -14,7 +14,7 @@ use std::time::Duration; use clap::Parser; -use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 4b950a0a33..905f8d50c9 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs index e92efbdc38..6801457d5a 100644 --- a/examples/examples/z_queryable_shm.rs +++ b/examples/examples/z_queryable_shm.rs @@ -15,7 +15,6 @@ use clap::Parser; use zenoh::{ bytes::ZBytes, key_expr::KeyExpr, - prelude::*, shm::{ zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder, POSIX_PROTOCOL_ID, diff --git a/examples/examples/z_storage.rs b/examples/examples/z_storage.rs index f812c78094..360e00f9d1 100644 --- a/examples/examples/z_storage.rs +++ b/examples/examples/z_storage.rs @@ -19,7 +19,6 @@ use clap::Parser; use futures::select; use zenoh::{ key_expr::{keyexpr, KeyExpr}, - prelude::*, sample::{Sample, SampleKind}, Config, }; diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 7f3a93c5fb..eca53a7849 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, Config}; +use zenoh::{key_expr::KeyExpr, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_sub_liveliness.rs b/examples/examples/z_sub_liveliness.rs index bb91c9f491..0b70d20786 100644 --- a/examples/examples/z_sub_liveliness.rs +++ b/examples/examples/z_sub_liveliness.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config}; +use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index f45dab099d..1f35a2636e 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -14,7 +14,7 @@ use clap::Parser; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh::shm::zshm; -use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*}; +use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr}; use zenoh_examples::CommonArgs; #[tokio::main] diff --git a/plugins/zenoh-plugin-example/src/lib.rs b/plugins/zenoh-plugin-example/src/lib.rs index b7c494946d..086e23aaa8 100644 --- a/plugins/zenoh-plugin-example/src/lib.rs +++ b/plugins/zenoh-plugin-example/src/lib.rs @@ -36,7 +36,6 @@ use zenoh::{ key_expr::{keyexpr, KeyExpr}, prelude::ZResult, sample::Sample, - session::SessionDeclarations, }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs index aefdfd4f86..fbd0269498 100644 --- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs +++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs @@ -18,7 +18,6 @@ use zenoh::{ config::Config, key_expr::keyexpr, qos::{CongestionControl, QoSBuilderTrait}, - session::SessionDeclarations, }; const HTML: &str = r#" diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index eb65a991d6..289fc9e055 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -47,7 +47,7 @@ use zenoh::{ prelude::*, query::{Parameters, QueryConsolidation, Reply, Selector, ZenohParameters}, sample::{Sample, SampleKind}, - session::{Session, SessionDeclarations}, + session::Session, }; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index ecb8815153..4918c17987 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -24,7 +24,7 @@ use std::{ use flume::{Receiver, Sender}; use futures::{pin_mut, select, FutureExt}; use tokio::{sync::RwLock, time::interval}; -use zenoh::{key_expr::keyexpr, prelude::*}; +use zenoh::key_expr::keyexpr; use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig}; use crate::{backends_mgt::StoreIntercept, storages_mgt::StorageMessage}; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index d3e34f064c..62fa72ef92 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -37,7 +37,7 @@ use zenoh::{ }, query::{ConsolidationMode, QueryTarget}, sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait}, - session::{Session, SessionDeclarations}, + session::Session, time::{Timestamp, NTP64}, Result as ZResult, }; diff --git a/zenoh-ext/examples/examples/z_query_sub.rs b/zenoh-ext/examples/examples/z_query_sub.rs index c819a2a831..1c1a3eab27 100644 --- a/zenoh-ext/examples/examples/z_query_sub.rs +++ b/zenoh-ext/examples/examples/z_query_sub.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use clap::{arg, Parser}; -use zenoh::{config::Config, prelude::*, query::ReplyKeyExpr}; +use zenoh::{config::Config, query::ReplyKeyExpr}; use zenoh_ext::*; use zenoh_ext_examples::CommonArgs; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 9c1536c2a1..af548a3b6c 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -25,14 +25,13 @@ use zenoh::{ pubsub::FlumeSubscriber, query::{Query, Queryable, ZenohParameters}, sample::{Locality, Sample}, - session::{SessionDeclarations, SessionRef}, - Error, Resolvable, Resolve, Result as ZResult, + Error, Resolvable, Resolve, Result as ZResult, Session, }; /// The builder of PublicationCache, allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct PublicationCacheBuilder<'a, 'b, 'c> { - session: SessionRef<'a>, + session: &'a Session, pub_key_expr: ZResult>, queryable_prefix: Option>>, queryable_origin: Option, @@ -43,7 +42,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> { impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { pub(crate) fn new( - session: SessionRef<'a>, + session: &'a Session, pub_key_expr: ZResult>, ) -> PublicationCacheBuilder<'a, 'b, 'c> { PublicationCacheBuilder { @@ -95,8 +94,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { } } -impl<'a> Resolvable for PublicationCacheBuilder<'a, '_, '_> { - type To = ZResult>; +impl Resolvable for PublicationCacheBuilder<'_, '_, '_> { + type To = ZResult; } impl Wait for PublicationCacheBuilder<'_, '_, '_> { @@ -105,7 +104,7 @@ impl Wait for PublicationCacheBuilder<'_, '_, '_> { } } -impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> { +impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> { type Output = ::To; type IntoFuture = Ready<::To>; @@ -114,14 +113,14 @@ impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> { } } -pub struct PublicationCache<'a> { - local_sub: FlumeSubscriber<'a>, - _queryable: Queryable<'a, flume::Receiver>, +pub struct PublicationCache { + local_sub: FlumeSubscriber, + _queryable: Queryable>, task: TerminatableTask, } -impl<'a> PublicationCache<'a> { - fn new(conf: PublicationCacheBuilder<'a, '_, '_>) -> ZResult> { +impl PublicationCache { + fn new(conf: PublicationCacheBuilder<'_, '_, '_>) -> ZResult { let key_expr = conf.pub_key_expr?; // the queryable_prefix (optional), and the key_expr for PublicationCache's queryable ("[]/") let (queryable_prefix, queryable_key_expr): (Option, KeyExpr) = @@ -258,7 +257,7 @@ impl<'a> PublicationCache<'a> { /// Undeclare this [`PublicationCache`]`. #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a { + pub fn undeclare(self) -> impl Resolve> { ResolveFuture::new(async move { let PublicationCache { _queryable, diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 224abfde87..893cbcbad0 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -28,9 +28,8 @@ use zenoh::{ pubsub::{Reliability, Subscriber}, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector}, sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait}, - session::{SessionDeclarations, SessionRef}, time::Timestamp, - Error, Resolvable, Resolve, Result as ZResult, + Error, Resolvable, Resolve, Result as ZResult, Session, }; use crate::ExtractSample; @@ -38,7 +37,7 @@ use crate::ExtractSample; /// The builder of [`FetchingSubscriber`], allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, pub(crate) reliability: Reliability, @@ -224,7 +223,7 @@ where Handler: IntoHandler<'static, Sample>, Handler::Handler: Send, { - type To = ZResult>; + type To = ZResult>; } impl Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> @@ -362,7 +361,7 @@ pub struct FetchingSubscriberBuilder< > where TryIntoSample: ExtractSample, { - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, pub(crate) reliability: Reliability, @@ -548,7 +547,7 @@ where Handler::Handler: Send, TryIntoSample: ExtractSample, { - type To = ZResult>; + type To = ZResult>; } impl< @@ -620,28 +619,29 @@ where /// } /// # } /// ``` -pub struct FetchingSubscriber<'a, Handler> { - subscriber: Subscriber<'a, ()>, +pub struct FetchingSubscriber { + subscriber: Subscriber<()>, callback: Arc, state: Arc>, handler: Handler, } -impl std::ops::Deref for FetchingSubscriber<'_, Handler> { +impl std::ops::Deref for FetchingSubscriber { type Target = Handler; fn deref(&self) -> &Self::Target { &self.handler } } -impl std::ops::DerefMut for FetchingSubscriber<'_, Handler> { +impl std::ops::DerefMut for FetchingSubscriber { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.handler } } -impl<'a, Handler> FetchingSubscriber<'a, Handler> { +impl FetchingSubscriber { fn new< + 'a, KeySpace, InputHandler, Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, @@ -724,7 +724,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { /// Undeclare this [`FetchingSubscriber`]`. #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a { + pub fn undeclare(self) -> impl Resolve> { self.subscriber.undeclare() } diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 606f00743b..21f2fc5c6e 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -11,54 +11,13 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{convert::TryInto, sync::Arc}; -use zenoh::{ - key_expr::KeyExpr, - session::{Session, SessionRef}, - Error, -}; +use zenoh::{key_expr::KeyExpr, session::Session, Error}; use super::PublicationCacheBuilder; /// Some extensions to the [`zenoh::Session`](zenoh::Session) pub trait SessionExt<'s, 'a> { - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &'s self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'a, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; -} - -impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> { - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &'s self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'a, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into)) - } -} - -impl<'a> SessionExt<'a, 'a> for Session { - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &'a self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'a, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - SessionRef::Borrow(self).declare_publication_cache(pub_key_expr) - } -} - -impl<'s> SessionExt<'s, 'static> for Arc { /// Examples: /// ``` /// # #[tokio::main] @@ -69,7 +28,7 @@ impl<'s> SessionExt<'s, 'static> for Arc { /// /// let mut config = zenoh::config::default(); /// config.timestamping.set_enabled(Some(Unique(true))); - /// let session = zenoh::open(config).await.unwrap().into_arc(); + /// let session = zenoh::open(config).await.unwrap(); /// let publication_cache = session.declare_publication_cache("key/expression").await.unwrap(); /// tokio::task::spawn(async move { /// publication_cache.key_expr(); @@ -79,11 +38,21 @@ impl<'s> SessionExt<'s, 'static> for Arc { fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'s self, pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'static, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; +} + +impl<'a> SessionExt<'a, 'a> for Session { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &'a self, + pub_key_expr: TryIntoKeyExpr, + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr) + PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) } } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a7356f86dc..434ec15234 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -32,7 +32,7 @@ pub trait SubscriberForward<'a, S> { type Output; fn forward(&'a mut self, sink: S) -> Self::Output; } -impl<'a, S> SubscriberForward<'a, S> for Subscriber<'_, flume::Receiver> +impl<'a, S> SubscriberForward<'a, S> for Subscriber> where S: futures::sink::Sink, { diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index 637d07ba57..51c8a79cd3 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -21,7 +21,7 @@ use zenoh::{ async fn test_liveliness_querying_subscriber_clique() { use std::time::Duration; - use zenoh::{internal::ztimeout, prelude::*}; + use zenoh::internal::ztimeout; use zenoh_ext::SubscriberBuilderExt; const TIMEOUT: Duration = Duration::from_secs(60); @@ -99,7 +99,7 @@ async fn test_liveliness_querying_subscriber_clique() { async fn test_liveliness_querying_subscriber_brokered() { use std::time::Duration; - use zenoh::{internal::ztimeout, prelude::*}; + use zenoh::internal::ztimeout; use zenoh_ext::SubscriberBuilderExt; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index e794c87db5..1973f79b9b 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -14,7 +14,7 @@ use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, - sync::Arc, + sync::{Arc, Weak}, }; use zenoh_core::{Result as ZResult, Wait}; @@ -30,9 +30,9 @@ use super::{ key_expr::KeyExpr, queryable::Query, sample::{DataInfo, Locality, SampleKind}, - session::Session, subscriber::SubscriberKind, }; +use crate::api::session::SessionInner; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; @@ -42,8 +42,8 @@ lazy_static::lazy_static!( static ref KE_LINK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("link") }; ); -pub(crate) fn init(session: &Session) { - if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) { +pub(crate) fn init(session: &Arc) { + if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) { let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR) .to_wire(session) .to_owned(); @@ -53,14 +53,18 @@ pub(crate) fn init(session: &Session) { true, Locality::SessionLocal, Arc::new({ - let session = session.clone(); - move |q| super::admin::on_admin_query(&session, q) + let session = Arc::downgrade(session); + move |q| { + if let Some(session) = Weak::upgrade(&session) { + on_admin_query(&session, q) + } + } }), ); } } -pub(crate) fn on_admin_query(session: &Session, query: Query) { +pub(crate) fn on_admin_query(session: &SessionInner, query: Query) { fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) { let zid = peer.zid.to_string(); if let Ok(zid) = keyexpr::new(&zid) { @@ -102,7 +106,7 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) { } } - if let Ok(own_zid) = keyexpr::new(&session.zid().to_string()) { + if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) { for transport in zenoh_runtime::ZRuntime::Net .block_in_place(session.runtime.manager().get_transports_unicast()) { @@ -122,14 +126,12 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) { #[derive(Clone)] pub(crate) struct Handler { - pub(crate) session: Arc, + pub(crate) session: Weak, } impl Handler { - pub(crate) fn new(session: Session) -> Self { - Self { - session: Arc::new(session), - } + pub(crate) fn new(session: Weak) -> Self { + Self { session } } } @@ -155,7 +157,10 @@ impl TransportMulticastEventHandler for Handler { &self, peer: zenoh_transport::TransportPeer, ) -> ZResult> { - if let Ok(own_zid) = keyexpr::new(&self.session.zid().to_string()) { + let Some(session) = Weak::upgrade(&self.session) else { + bail!("session closed"); + }; + if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) { if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) { let expr = WireExpr::from( &(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid), @@ -165,7 +170,7 @@ impl TransportMulticastEventHandler for Handler { encoding: Some(Encoding::APPLICATION_JSON), ..Default::default() }; - self.session.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( true, &expr, Some(info), @@ -196,7 +201,7 @@ impl TransportMulticastEventHandler for Handler { pub(crate) struct PeerHandler { pub(crate) expr: WireExpr<'static>, - pub(crate) session: Arc, + pub(crate) session: Weak, } impl TransportPeerEventHandler for PeerHandler { @@ -205,13 +210,16 @@ impl TransportPeerEventHandler for PeerHandler { } fn new_link(&self, link: zenoh_link::Link) { + let Some(session) = Weak::upgrade(&self.session) else { + return; + }; let mut s = DefaultHasher::new(); link.hash(&mut s); let info = DataInfo { encoding: Some(Encoding::APPLICATION_JSON), ..Default::default() }; - self.session.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( true, &self .expr @@ -225,13 +233,16 @@ impl TransportPeerEventHandler for PeerHandler { } fn del_link(&self, link: zenoh_link::Link) { + let Some(session) = Weak::upgrade(&self.session) else { + return; + }; let mut s = DefaultHasher::new(); link.hash(&mut s); let info = DataInfo { kind: SampleKind::Delete, ..Default::default() }; - self.session.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( true, &self .expr @@ -247,11 +258,14 @@ impl TransportPeerEventHandler for PeerHandler { fn closing(&self) {} fn closed(&self) { + let Some(session) = Weak::upgrade(&self.session) else { + return; + }; let info = DataInfo { kind: SampleKind::Delete, ..Default::default() }; - self.session.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( true, &self.expr, Some(info), diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 666b4378e0..43480e4df1 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -18,16 +18,18 @@ use zenoh_protocol::{core::CongestionControl, network::Mapping}; #[cfg(feature = "unstable")] use crate::api::sample::SourceInfo; -use crate::api::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, +use crate::{ + api::{ + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, + }, + bytes::{OptionZBytes, ZBytes}, + encoding::Encoding, + key_expr::KeyExpr, + publisher::{Priority, Publisher}, + sample::{Locality, SampleKind}, }, - bytes::{OptionZBytes, ZBytes}, - encoding::Encoding, - key_expr::KeyExpr, - publisher::{Priority, Publisher}, - sample::{Locality, SampleKind}, - session::SessionRef, + Session, }; pub type SessionPutBuilder<'a, 'b> = @@ -232,8 +234,8 @@ impl IntoFuture for PublicationBuilder, PublicationBuil /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] -pub struct PublisherBuilder<'a, 'b: 'a> { - pub(crate) session: SessionRef<'a>, +pub struct PublisherBuilder<'a, 'b> { + pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) encoding: Encoding, pub(crate) congestion_control: CongestionControl, @@ -245,7 +247,7 @@ pub struct PublisherBuilder<'a, 'b: 'a> { impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { fn clone(&self) -> Self { Self { - session: self.session.clone(), + session: self.session, key_expr: match &self.key_expr { Ok(k) => Ok(k.clone()), Err(e) => Err(zerror!("Cloned KE Error: {}", e).into()), @@ -295,9 +297,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } // internal function for performing the publication - fn create_one_shot_publisher(self) -> ZResult> { + fn create_one_shot_publisher(self) -> ZResult> { Ok(Publisher { - session: self.session, + session: self.session.clone().0, id: 0, // This is a one shot Publisher key_expr: self.key_expr?, encoding: self.encoding, @@ -313,15 +315,15 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> { - type To = ZResult>; + type To = ZResult>; } impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { fn wait(self) -> ::To { let mut key_expr = self.key_expr?; - if !key_expr.is_fully_optimized(&self.session) { - let session_id = self.session.id; - let expr_id = self.session.declare_prefix(key_expr.as_str()).wait(); + if !key_expr.is_fully_optimized(&self.session.0) { + let session_id = self.session.0.id; + let expr_id = self.session.0.declare_prefix(key_expr.as_str()).wait()?; let prefix_len = key_expr .len() .try_into() @@ -349,21 +351,23 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { } } } - self.session - .declare_publisher_inner(key_expr.clone(), self.destination) - .map(|id| Publisher { - session: self.session, - id, - key_expr, - encoding: self.encoding, - congestion_control: self.congestion_control, - priority: self.priority, - is_express: self.is_express, - destination: self.destination, - #[cfg(feature = "unstable")] - matching_listeners: Default::default(), - undeclare_on_drop: true, - }) + let id = self + .session + .0 + .declare_publisher_inner(key_expr.clone(), self.destination)?; + Ok(Publisher { + session: self.session.0.clone(), + id, + key_expr, + encoding: self.encoding, + congestion_control: self.congestion_control, + priority: self.priority, + is_express: self.is_express, + destination: self.destination, + #[cfg(feature = "unstable")] + matching_listeners: Default::default(), + undeclare_on_drop: true, + }) } } diff --git a/zenoh/src/api/info.rs b/zenoh/src/api/info.rs index 32bed0eb53..88f8dd57b7 100644 --- a/zenoh/src/api/info.rs +++ b/zenoh/src/api/info.rs @@ -19,7 +19,7 @@ use zenoh_config::wrappers::ZenohId; use zenoh_core::{Resolvable, Wait}; use zenoh_protocol::core::WhatAmI; -use super::session::SessionRef; +use crate::net::runtime::Runtime; /// A builder retuned by [`SessionInfo::zid()`](SessionInfo::zid) that allows /// to access the [`ZenohId`] of the current zenoh [`Session`](crate::Session). @@ -35,9 +35,8 @@ use super::session::SessionRef; /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] pub struct ZenohIdBuilder<'a> { - pub(crate) session: SessionRef<'a>, + runtime: &'a Runtime, } impl<'a> Resolvable for ZenohIdBuilder<'a> { @@ -46,7 +45,7 @@ impl<'a> Resolvable for ZenohIdBuilder<'a> { impl<'a> Wait for ZenohIdBuilder<'a> { fn wait(self) -> Self::To { - self.session.runtime.zid() + self.runtime.zid() } } @@ -75,9 +74,8 @@ impl<'a> IntoFuture for ZenohIdBuilder<'a> { /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] pub struct RoutersZenohIdBuilder<'a> { - pub(crate) session: SessionRef<'a>, + runtime: &'a Runtime, } impl<'a> Resolvable for RoutersZenohIdBuilder<'a> { @@ -88,7 +86,7 @@ impl<'a> Wait for RoutersZenohIdBuilder<'a> { fn wait(self) -> Self::To { Box::new( zenoh_runtime::ZRuntime::Application - .block_in_place(self.session.runtime.manager().get_transports_unicast()) + .block_in_place(self.runtime.manager().get_transports_unicast()) .into_iter() .filter_map(|s| { s.get_whatami() @@ -125,9 +123,8 @@ impl<'a> IntoFuture for RoutersZenohIdBuilder<'a> { /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[derive(Debug)] pub struct PeersZenohIdBuilder<'a> { - pub(crate) session: SessionRef<'a>, + runtime: &'a Runtime, } impl<'a> Resolvable for PeersZenohIdBuilder<'a> { @@ -138,7 +135,7 @@ impl<'a> Wait for PeersZenohIdBuilder<'a> { fn wait(self) -> ::To { Box::new( zenoh_runtime::ZRuntime::Application - .block_in_place(self.session.runtime.manager().get_transports_unicast()) + .block_in_place(self.runtime.manager().get_transports_unicast()) .into_iter() .filter_map(|s| { s.get_whatami() @@ -173,11 +170,11 @@ impl<'a> IntoFuture for PeersZenohIdBuilder<'a> { /// let zid = info.zid().await; /// # } /// ``` -pub struct SessionInfo<'a> { - pub(crate) session: SessionRef<'a>, +pub struct SessionInfo { + pub(crate) runtime: Runtime, } -impl SessionInfo<'_> { +impl SessionInfo { /// Return the [`ZenohId`] of the current zenoh [`Session`](crate::Session). /// /// # Examples @@ -192,7 +189,7 @@ impl SessionInfo<'_> { /// ``` pub fn zid(&self) -> ZenohIdBuilder<'_> { ZenohIdBuilder { - session: self.session.clone(), + runtime: &self.runtime, } } @@ -212,7 +209,7 @@ impl SessionInfo<'_> { /// ``` pub fn routers_zid(&self) -> RoutersZenohIdBuilder<'_> { RoutersZenohIdBuilder { - session: self.session.clone(), + runtime: &self.runtime, } } @@ -231,7 +228,7 @@ impl SessionInfo<'_> { /// ``` pub fn peers_zid(&self) -> PeersZenohIdBuilder<'_> { PeersZenohIdBuilder { - session: self.session.clone(), + runtime: &self.runtime, } } } diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 0f0d13a69c..6ed0cbcea3 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -25,7 +25,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use super::session::{Session, UndeclarableSealed}; +use super::session::{Session, SessionInner, UndeclarableSealed}; use crate::net::primitives::Primitives; #[derive(Clone, Debug)] @@ -492,7 +492,7 @@ impl<'a> KeyExpr<'a> { //pub(crate) fn is_optimized(&self, session: &Session) -> bool { // matches!(&self.0, KeyExprInner::Wire { expr_id, session_id, .. } | KeyExprInner::BorrowedWire { expr_id, session_id, .. } if *expr_id != 0 && session.id == *session_id) //} - pub(crate) fn is_fully_optimized(&self, session: &Session) -> bool { + pub(crate) fn is_fully_optimized(&self, session: &SessionInner) -> bool { match &self.0 { KeyExprInner::Wire { key_expr, @@ -509,7 +509,7 @@ impl<'a> KeyExpr<'a> { _ => false, } } - pub(crate) fn to_wire(&'a self, session: &Session) -> WireExpr<'a> { + pub(crate) fn to_wire(&'a self, session: &SessionInner) -> WireExpr<'a> { match &self.0 { KeyExprInner::Wire { key_expr, @@ -594,7 +594,7 @@ impl Wait for KeyExprUndeclaration<'_> { session_id, .. } if *prefix_len as usize == key_expr.len() => { - if *session_id == session.id { + if *session_id == session.0.id { *expr_id } else { return Err(zerror!("Failed to undeclare {}, as it was declared by an other Session", expr).into()) @@ -607,7 +607,7 @@ impl Wait for KeyExprUndeclaration<'_> { session_id, .. } if *prefix_len as usize == key_expr.len() => { - if *session_id == session.id { + if *session_id == session.0.id { *expr_id } else { return Err(zerror!("Failed to undeclare {}, as it was declared by an other Session", expr).into()) @@ -616,10 +616,10 @@ impl Wait for KeyExprUndeclaration<'_> { _ => return Err(zerror!("Failed to undeclare {}, make sure you use the result of `Session::declare_keyexpr` to call `Session::undeclare`", expr).into()), }; tracing::trace!("undeclare_keyexpr({:?})", expr_id); - let mut state = zwrite!(session.state); + let mut state = zwrite!(session.0.state); state.local_resources.remove(&expr_id); - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_declare(zenoh_protocol::network::Declare { interest_id: None, diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index f22de66b02..50bcf9da67 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -16,7 +16,7 @@ use std::{ convert::TryInto, future::{IntoFuture, Ready}, mem::size_of, - sync::Arc, + sync::{Arc, Weak}, time::Duration, }; @@ -29,11 +29,11 @@ use super::{ key_expr::KeyExpr, query::Reply, sample::{Locality, Sample}, - session::{Session, SessionRef, UndeclarableSealed}, + session::{Session, UndeclarableSealed}, subscriber::{Subscriber, SubscriberInner}, Id, }; -use crate::api::session::MaybeWeakSessionRef; +use crate::api::session::SessionInner; /// A structure with functions to declare a /// [`LivelinessToken`](LivelinessToken), query @@ -97,7 +97,7 @@ use crate::api::session::MaybeWeakSessionRef; /// ``` #[zenoh_macros::unstable] pub struct Liveliness<'a> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, } #[zenoh_macros::unstable] @@ -132,7 +132,7 @@ impl<'a> Liveliness<'a> { >>::Error: Into, { LivelinessTokenBuilder { - session: self.session.clone(), + session: self.session, key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), } } @@ -169,7 +169,7 @@ impl<'a> Liveliness<'a> { >>::Error: Into, { LivelinessSubscriberBuilder { - session: self.session.clone(), + session: self.session, key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), handler: DefaultHandler::default(), } @@ -207,11 +207,11 @@ impl<'a> Liveliness<'a> { { let key_expr = key_expr.try_into().map_err(Into::into); let timeout = { - let conf = self.session.runtime.config().lock(); + let conf = self.session.0.runtime.config().lock(); Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) }; LivelinessGetBuilder { - session: &self.session, + session: self.session, key_expr, timeout, handler: DefaultHandler::default(), @@ -239,13 +239,13 @@ impl<'a> Liveliness<'a> { #[zenoh_macros::unstable] #[derive(Debug)] pub struct LivelinessTokenBuilder<'a, 'b> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, } #[zenoh_macros::unstable] -impl<'a> Resolvable for LivelinessTokenBuilder<'a, '_> { - type To = ZResult>; +impl Resolvable for LivelinessTokenBuilder<'_, '_> { + type To = ZResult; } #[zenoh_macros::unstable] @@ -254,13 +254,13 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { fn wait(self) -> ::To { let session = self.session; let key_expr = self.key_expr?.into_owned(); - let undeclare_on_drop = true; session + .0 .declare_liveliness_inner(&key_expr) .map(|tok_state| LivelinessToken { - session: MaybeWeakSessionRef::new(session, !undeclare_on_drop), + session: Arc::downgrade(&self.session.0), state: tok_state, - undeclare_on_drop, + undeclare_on_drop: true, }) } } @@ -310,8 +310,8 @@ pub(crate) struct LivelinessTokenState { /// ``` #[zenoh_macros::unstable] #[derive(Debug)] -pub struct LivelinessToken<'a> { - session: MaybeWeakSessionRef<'a>, +pub struct LivelinessToken { + session: Weak, state: Arc, undeclare_on_drop: bool, } @@ -336,22 +336,22 @@ pub struct LivelinessToken<'a> { /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[zenoh_macros::unstable] -pub struct LivelinessTokenUndeclaration<'a>(LivelinessToken<'a>); +pub struct LivelinessTokenUndeclaration(LivelinessToken); #[zenoh_macros::unstable] -impl Resolvable for LivelinessTokenUndeclaration<'_> { +impl Resolvable for LivelinessTokenUndeclaration { type To = ZResult<()>; } #[zenoh_macros::unstable] -impl Wait for LivelinessTokenUndeclaration<'_> { +impl Wait for LivelinessTokenUndeclaration { fn wait(mut self) -> ::To { self.0.undeclare_impl() } } #[zenoh_macros::unstable] -impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> { +impl IntoFuture for LivelinessTokenUndeclaration { type Output = ::To; type IntoFuture = Ready<::To>; @@ -361,7 +361,7 @@ impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> { } #[zenoh_macros::unstable] -impl<'a> LivelinessToken<'a> { +impl LivelinessToken { /// Undeclare the [`LivelinessToken`]. /// /// # Examples @@ -381,7 +381,7 @@ impl<'a> LivelinessToken<'a> { /// # } /// ``` #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a { + pub fn undeclare(self) -> impl Resolve> { UndeclarableSealed::undeclare_inner(self, ()) } @@ -396,8 +396,8 @@ impl<'a> LivelinessToken<'a> { } #[zenoh_macros::unstable] -impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> { - type Undeclaration = LivelinessTokenUndeclaration<'a>; +impl UndeclarableSealed<()> for LivelinessToken { + type Undeclaration = LivelinessTokenUndeclaration; fn undeclare_inner(self, _: ()) -> Self::Undeclaration { LivelinessTokenUndeclaration(self) @@ -405,7 +405,7 @@ impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> { } #[zenoh_macros::unstable] -impl Drop for LivelinessToken<'_> { +impl Drop for LivelinessToken { fn drop(&mut self) { if self.undeclare_on_drop { if let Err(error) = self.undeclare_impl() { @@ -435,7 +435,7 @@ impl Drop for LivelinessToken<'_> { #[zenoh_macros::unstable] #[derive(Debug)] pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> { - pub session: SessionRef<'a>, + pub session: &'a Session, pub key_expr: ZResult>, pub handler: Handler, } @@ -558,7 +558,7 @@ where Handler: IntoHandler<'static, Sample> + Send, Handler::Handler: Send, { - type To = ZResult>; + type To = ZResult>; } #[zenoh_macros::unstable] @@ -574,22 +574,20 @@ where let key_expr = self.key_expr?; let session = self.session; let (callback, handler) = self.handler.into_handler(); - let undeclare_on_drop = size_of::() > 0; session + .0 .declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback) - .map(|sub_state| { - Subscriber { - inner: SubscriberInner { - #[cfg(feature = "unstable")] - session_id: session.zid(), - session: MaybeWeakSessionRef::new(session, !undeclare_on_drop), - state: sub_state, - kind: SubscriberKind::LivelinessSubscriber, - // `size_of::() == 0` means callback-only subscriber - undeclare_on_drop, - }, - handler, - } + .map(|sub_state| Subscriber { + inner: SubscriberInner { + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: Arc::downgrade(&self.session.0), + state: sub_state, + kind: SubscriberKind::LivelinessSubscriber, + // `size_of::() == 0` means callback-only subscriber + undeclare_on_drop: size_of::() > 0, + }, + handler, }) } } @@ -775,6 +773,7 @@ where fn wait(self) -> ::To { let (callback, receiver) = self.handler.into_handler(); self.session + .0 .liveliness_query(&self.key_expr?, self.timeout, callback) .map(|_| receiver) } diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 50f811da3e..55c9360348 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -17,6 +17,7 @@ use std::{ fmt, future::{IntoFuture, Ready}, pin::Pin, + sync::Arc, task::{Context, Poll}, }; @@ -35,10 +36,7 @@ use { handlers::{Callback, DefaultHandler, IntoHandler}, sample::SourceInfo, }, - std::{ - collections::HashSet, - sync::{Arc, Mutex}, - }, + std::{collections::HashSet, sync::Mutex}, zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto, }; @@ -52,10 +50,10 @@ use super::{ encoding::Encoding, key_expr::KeyExpr, sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind}, - session::{SessionRef, UndeclarableSealed}, + session::UndeclarableSealed, }; use crate::{ - api::{subscriber::SubscriberKind, Id}, + api::{session::SessionInner, subscriber::SubscriberKind, Id}, net::primitives::Primitives, }; @@ -75,35 +73,6 @@ impl fmt::Debug for PublisherState { } } -#[zenoh_macros::unstable] -#[derive(Clone)] -pub enum PublisherRef<'a> { - Borrow(&'a Publisher<'a>), - Shared(Arc>), -} - -#[zenoh_macros::unstable] -impl<'a> std::ops::Deref for PublisherRef<'a> { - type Target = Publisher<'a>; - - fn deref(&self) -> &Self::Target { - match self { - PublisherRef::Borrow(b) => b, - PublisherRef::Shared(s) => s, - } - } -} - -#[zenoh_macros::unstable] -impl std::fmt::Debug for PublisherRef<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - PublisherRef::Borrow(b) => Publisher::fmt(b, f), - PublisherRef::Shared(s) => Publisher::fmt(s, f), - } - } -} - /// A publisher that allows to send data through a stream. /// /// Publishers are automatically undeclared when dropped. @@ -114,7 +83,7 @@ impl std::fmt::Debug for PublisherRef<'_> { /// # async fn main() { /// use zenoh::prelude::*; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); +/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.put("value").await.unwrap(); /// # } @@ -129,7 +98,7 @@ impl std::fmt::Debug for PublisherRef<'_> { /// use futures::StreamExt; /// use zenoh::prelude::*; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); +/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let mut subscriber = session.declare_subscriber("key/expression").await.unwrap(); /// let publisher = session.declare_publisher("another/key/expression").await.unwrap(); /// subscriber.stream().map(Ok).forward(publisher).await.unwrap(); @@ -137,7 +106,7 @@ impl std::fmt::Debug for PublisherRef<'_> { /// ``` #[derive(Debug, Clone)] pub struct Publisher<'a> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: Arc, pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) encoding: Encoding, @@ -169,7 +138,7 @@ impl<'a> Publisher<'a> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.session.zid().into(), + zid: self.session.runtime.zid().into(), eid: self.id, } .into() @@ -198,42 +167,6 @@ impl<'a> Publisher<'a> { self.priority } - /// Consumes the given `Publisher`, returning a thread-safe reference-counting - /// pointer to it (`Arc`). This is equivalent to `Arc::new(Publisher)`. - /// - /// This is useful to share ownership of the `Publisher` between several threads - /// and tasks. It also allows to create [`MatchingListener`] with static - /// lifetime that can be moved to several threads and tasks. - /// - /// Note: the given zenoh `Publisher` will be undeclared when the last reference to - /// it is dropped. - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); - /// let matching_listener = publisher.matching_listener().await.unwrap(); - /// - /// tokio::task::spawn(async move { - /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// } - /// }).await; - /// # } - /// ``` - #[zenoh_macros::unstable] - pub fn into_arc(self) -> std::sync::Arc { - std::sync::Arc::new(self) - } - /// Put data. /// /// # Examples @@ -242,7 +175,7 @@ impl<'a> Publisher<'a> { /// # async fn main() { /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.put("value").await.unwrap(); /// # } @@ -273,7 +206,7 @@ impl<'a> Publisher<'a> { /// # async fn main() { /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.delete().await.unwrap(); /// # } @@ -300,7 +233,7 @@ impl<'a> Publisher<'a> { /// # async fn main() { /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_subscribers: bool = publisher /// .matching_status() @@ -341,9 +274,9 @@ impl<'a> Publisher<'a> { /// # } /// ``` #[zenoh_macros::unstable] - pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { + pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, 'a, DefaultHandler> { MatchingListenerBuilder { - publisher: PublisherRef::Borrow(self), + publisher: self, handler: DefaultHandler::default(), } } @@ -379,94 +312,6 @@ impl<'a> Publisher<'a> { } } -/// Functions to create zenoh entities with `'static` lifetime. -/// -/// This trait contains functions to create zenoh entities like -/// [`MatchingListener`] with a `'static` lifetime. -/// This is useful to move zenoh entities to several threads and tasks. -/// -/// This trait is implemented for `Arc`. -/// -/// # Examples -/// ```no_run -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::prelude::*; -/// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); -/// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); -/// let matching_listener = publisher.matching_listener().await.unwrap(); -/// -/// tokio::task::spawn(async move { -/// while let Ok(matching_status) = matching_listener.recv_async().await { -/// if matching_status.matching_subscribers() { -/// println!("Publisher has matching subscribers."); -/// } else { -/// println!("Publisher has NO MORE matching subscribers."); -/// } -/// } -/// }).await; -/// # } -/// ``` -#[zenoh_macros::unstable] -pub trait PublisherDeclarations { - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); - /// let matching_listener = publisher.matching_listener().await.unwrap(); - /// - /// tokio::task::spawn(async move { - /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// } - /// }).await; - /// # } - /// ``` - #[zenoh_macros::unstable] - fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler>; -} - -#[zenoh_macros::unstable] -impl PublisherDeclarations for std::sync::Arc> { - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression").await.unwrap().into_arc(); - /// let matching_listener = publisher.matching_listener().await.unwrap(); - /// - /// tokio::task::spawn(async move { - /// while let Ok(matching_status) = matching_listener.recv_async().await { - /// if matching_status.matching_subscribers() { - /// println!("Publisher has matching subscribers."); - /// } else { - /// println!("Publisher has NO MORE matching subscribers."); - /// } - /// } - /// }).await; - /// # } - /// ``` - #[zenoh_macros::unstable] - fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> { - MatchingListenerBuilder { - publisher: PublisherRef::Shared(self.clone()), - handler: DefaultHandler::default(), - } - } -} - impl<'a> UndeclarableSealed<()> for Publisher<'a> { type Undeclaration = PublisherUndeclaration<'a>; @@ -742,7 +587,7 @@ impl TryFrom for Priority { /// # async fn main() { /// use zenoh::prelude::*; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); +/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_status = publisher.matching_status().await.unwrap(); /// # } @@ -763,7 +608,7 @@ impl MatchingStatus { /// # async fn main() { /// use zenoh::prelude::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_subscribers: bool = publisher /// .matching_status() @@ -780,13 +625,13 @@ impl MatchingStatus { /// A builder for initializing a [`MatchingListener`]. #[zenoh_macros::unstable] #[derive(Debug)] -pub struct MatchingListenerBuilder<'a, Handler> { - pub(crate) publisher: PublisherRef<'a>, +pub struct MatchingListenerBuilder<'a, 'b, Handler> { + pub(crate) publisher: &'a Publisher<'b>, pub handler: Handler, } #[zenoh_macros::unstable] -impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { +impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// Receive the MatchingStatuses for this listener with a callback. /// /// # Examples @@ -812,7 +657,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback(self, callback: Callback) -> MatchingListenerBuilder<'a, Callback> + pub fn callback(self, callback: Callback) -> MatchingListenerBuilder<'a, 'b, Callback> where Callback: Fn(MatchingStatus) + Send + Sync + 'static, { @@ -849,7 +694,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { pub fn callback_mut( self, callback: CallbackMut, - ) -> MatchingListenerBuilder<'a, impl Fn(MatchingStatus) + Send + Sync + 'static> + ) -> MatchingListenerBuilder<'a, 'b, impl Fn(MatchingStatus) + Send + Sync + 'static> where CallbackMut: FnMut(MatchingStatus) + Send + Sync + 'static, { @@ -882,7 +727,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler> + pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, 'b, Handler> where Handler: IntoHandler<'static, MatchingStatus>, { @@ -895,7 +740,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { } #[zenoh_macros::unstable] -impl<'a, Handler> Resolvable for MatchingListenerBuilder<'a, Handler> +impl<'a, 'b, Handler> Resolvable for MatchingListenerBuilder<'a, 'b, Handler> where Handler: IntoHandler<'static, MatchingStatus> + Send, Handler::Handler: Send, @@ -904,7 +749,7 @@ where } #[zenoh_macros::unstable] -impl<'a, Handler> Wait for MatchingListenerBuilder<'a, Handler> +impl<'a, 'b, Handler> Wait for MatchingListenerBuilder<'a, 'b, Handler> where Handler: IntoHandler<'static, MatchingStatus> + Send, Handler::Handler: Send, @@ -915,11 +760,11 @@ where let state = self .publisher .session - .declare_matches_listener_inner(&self.publisher, callback)?; + .declare_matches_listener_inner(self.publisher, callback)?; zlock!(self.publisher.matching_listeners).insert(state.id); Ok(MatchingListener { listener: MatchingListenerInner { - publisher: self.publisher, + publisher: self.publisher.clone(), state, }, receiver, @@ -928,7 +773,7 @@ where } #[zenoh_macros::unstable] -impl<'a, Handler> IntoFuture for MatchingListenerBuilder<'a, Handler> +impl<'a, 'b, Handler> IntoFuture for MatchingListenerBuilder<'a, 'b, Handler> where Handler: IntoHandler<'static, MatchingStatus> + Send, Handler::Handler: Send, @@ -952,8 +797,8 @@ pub(crate) struct MatchingListenerState { } #[zenoh_macros::unstable] -impl std::fmt::Debug for MatchingListenerState { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { +impl fmt::Debug for MatchingListenerState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("MatchingListener") .field("id", &self.id) .field("key_expr", &self.key_expr) @@ -963,7 +808,7 @@ impl std::fmt::Debug for MatchingListenerState { #[zenoh_macros::unstable] pub(crate) struct MatchingListenerInner<'a> { - pub(crate) publisher: PublisherRef<'a>, + pub(crate) publisher: Publisher<'a>, pub(crate) state: Arc, } @@ -1096,7 +941,7 @@ mod tests { use zenoh_config::Config; use zenoh_core::Wait; - use crate::api::{sample::SampleKind, session::SessionDeclarations}; + use crate::api::sample::SampleKind; #[cfg(feature = "internal")] #[test] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 2a1016db5f..bea028ff97 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -489,6 +489,7 @@ where parameters, } = self.selector?; self.session + .0 .query( &key_expr, ¶meters, diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 56b26fa36e..37a5b6bc21 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -16,7 +16,7 @@ use std::{ future::{IntoFuture, Ready}, mem::size_of, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Weak}, }; use tracing::error; @@ -50,11 +50,12 @@ use crate::{ publisher::Priority, sample::{Locality, QoSBuilder, Sample, SampleKind}, selector::Selector, - session::{MaybeWeakSessionRef, SessionRef, UndeclarableSealed}, + session::{SessionInner, UndeclarableSealed}, value::Value, Id, }, net::primitives::Primitives, + Session, }; pub(crate) struct QueryInner { @@ -539,10 +540,10 @@ impl fmt::Debug for QueryableState { } #[derive(Debug)] -pub(crate) struct QueryableInner<'a> { +pub(crate) struct QueryableInner { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, - pub(crate) session: MaybeWeakSessionRef<'a>, + pub(crate) session: Weak, pub(crate) state: Arc, // Queryable is undeclared on drop unless its handler is a ZST, i.e. it is callback-only pub(crate) undeclare_on_drop: bool, @@ -562,19 +563,19 @@ pub(crate) struct QueryableInner<'a> { /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -pub struct QueryableUndeclaration<'a, Handler>(Queryable<'a, Handler>); +pub struct QueryableUndeclaration(Queryable); -impl Resolvable for QueryableUndeclaration<'_, Handler> { +impl Resolvable for QueryableUndeclaration { type To = ZResult<()>; } -impl Wait for QueryableUndeclaration<'_, Handler> { +impl Wait for QueryableUndeclaration { fn wait(mut self) -> ::To { self.0.undeclare_impl() } } -impl IntoFuture for QueryableUndeclaration<'_, Handler> { +impl IntoFuture for QueryableUndeclaration { type Output = ::To; type IntoFuture = Ready<::To>; @@ -598,7 +599,7 @@ impl IntoFuture for QueryableUndeclaration<'_, Handler> { #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[derive(Debug)] pub struct QueryableBuilder<'a, 'b, Handler> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, pub(crate) key_expr: ZResult>, pub(crate) complete: bool, pub(crate) origin: Locality, @@ -792,12 +793,12 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> { /// ``` #[non_exhaustive] #[derive(Debug)] -pub struct Queryable<'a, Handler> { - pub(crate) inner: QueryableInner<'a>, +pub struct Queryable { + pub(crate) inner: QueryableInner, pub(crate) handler: Handler, } -impl<'a, Handler> Queryable<'a, Handler> { +impl Queryable { /// Returns the [`EntityGlobalId`] of this Queryable. /// /// # Examples @@ -852,9 +853,9 @@ impl<'a, Handler> Queryable<'a, Handler> { /// # } /// ``` #[inline] - pub fn undeclare(self) -> impl Resolve> + 'a + pub fn undeclare(self) -> impl Resolve> where - Handler: Send + 'a, + Handler: Send, { UndeclarableSealed::undeclare_inner(self, ()) } @@ -869,7 +870,7 @@ impl<'a, Handler> Queryable<'a, Handler> { } } -impl Drop for Queryable<'_, Handler> { +impl Drop for Queryable { fn drop(&mut self) { if self.inner.undeclare_on_drop { if let Err(error) = self.undeclare_impl() { @@ -879,15 +880,15 @@ impl Drop for Queryable<'_, Handler> { } } -impl<'a, Handler: Send + 'a> UndeclarableSealed<()> for Queryable<'a, Handler> { - type Undeclaration = QueryableUndeclaration<'a, Handler>; +impl UndeclarableSealed<()> for Queryable { + type Undeclaration = QueryableUndeclaration; fn undeclare_inner(self, _: ()) -> Self::Undeclaration { QueryableUndeclaration(self) } } -impl Deref for Queryable<'_, Handler> { +impl Deref for Queryable { type Target = Handler; fn deref(&self) -> &Self::Target { @@ -895,21 +896,21 @@ impl Deref for Queryable<'_, Handler> { } } -impl DerefMut for Queryable<'_, Handler> { +impl DerefMut for Queryable { fn deref_mut(&mut self) -> &mut Self::Target { self.handler_mut() } } -impl<'a, Handler> Resolvable for QueryableBuilder<'a, '_, Handler> +impl Resolvable for QueryableBuilder<'_, '_, Handler> where Handler: IntoHandler<'static, Query> + Send, Handler::Handler: Send, { - type To = ZResult>; + type To = ZResult>; } -impl<'a, Handler> Wait for QueryableBuilder<'a, '_, Handler> +impl Wait for QueryableBuilder<'_, '_, Handler> where Handler: IntoHandler<'static, Query> + Send, Handler::Handler: Send, @@ -917,31 +918,29 @@ where fn wait(self) -> ::To { let session = self.session; let (callback, receiver) = self.handler.into_handler(); - let undeclare_on_drop = size_of::() > 0; session + .0 .declare_queryable_inner( - &self.key_expr?.to_wire(&session), + &self.key_expr?.to_wire(&session.0), self.complete, self.origin, callback, ) - .map(|qable_state| { - Queryable { - inner: QueryableInner { - #[cfg(feature = "unstable")] - session_id: session.zid(), - session: MaybeWeakSessionRef::new(session, !undeclare_on_drop), - state: qable_state, - // `size_of::() == 0` means callback-only queryable - undeclare_on_drop, - }, - handler: receiver, - } + .map(|qable_state| Queryable { + inner: QueryableInner { + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: Arc::downgrade(&self.session.0), + state: qable_state, + // `size_of::() == 0` means callback-only queryable + undeclare_on_drop: size_of::() > 0, + }, + handler: receiver, }) } } -impl<'a, Handler> IntoFuture for QueryableBuilder<'a, '_, Handler> +impl IntoFuture for QueryableBuilder<'_, '_, Handler> where Handler: IntoHandler<'static, Query> + Send, Handler::Handler: Send, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 1c9e7357f2..63284d50d2 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -16,7 +16,6 @@ use std::{ convert::TryInto, fmt, future::{IntoFuture, Ready}, - ops::Deref, sync::{ atomic::{AtomicU16, Ordering}, Arc, RwLock, Weak, @@ -175,6 +174,14 @@ impl SessionState { } impl SessionState { + #[inline] + pub(crate) fn primitives(&self) -> ZResult> { + self.primitives + .as_ref() + .cloned() + .ok_or_else(|| zerror!("session closed").into()) + } + #[inline] fn get_local_res(&self, id: &ExprId) -> Option<&Resource> { self.local_resources.get(id) @@ -360,122 +367,6 @@ impl Resource { } } -#[derive(Clone)] -pub enum SessionRef<'a> { - Borrow(&'a Session), - Shared(Arc), -} - -impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { - fn declare_subscriber<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - SubscriberBuilder { - session: self.clone(), - key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, - origin: Locality::default(), - handler: DefaultHandler::default(), - } - } - fn declare_queryable<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'a, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - QueryableBuilder { - session: self.clone(), - key_expr: key_expr.try_into().map_err(Into::into), - complete: false, - origin: Locality::default(), - handler: DefaultHandler::default(), - } - } - fn declare_publisher<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'a, 'b> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - PublisherBuilder { - session: self.clone(), - key_expr: key_expr.try_into().map_err(Into::into), - encoding: Encoding::default(), - congestion_control: CongestionControl::DEFAULT, - priority: Priority::DEFAULT, - is_express: false, - destination: Locality::default(), - } - } - #[zenoh_macros::unstable] - fn liveliness(&'s self) -> Liveliness<'a> { - Liveliness { - session: self.clone(), - } - } - fn info(&'s self) -> SessionInfo<'a> { - SessionInfo { - session: self.clone(), - } - } -} - -impl Deref for SessionRef<'_> { - type Target = Session; - - fn deref(&self) -> &Self::Target { - match self { - SessionRef::Borrow(b) => b, - SessionRef::Shared(s) => s, - } - } -} - -impl fmt::Debug for SessionRef<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SessionRef::Borrow(b) => Session::fmt(b, f), - SessionRef::Shared(s) => Session::fmt(s, f), - } - } -} - -#[derive(Debug, Clone)] -pub(crate) enum MaybeWeakSessionRef<'a> { - Borrow(&'a Session), - Arc(Arc), - Weak(Weak), -} - -impl<'a> MaybeWeakSessionRef<'a> { - pub(crate) fn new(session: SessionRef<'a>, downgrade: bool) -> Self { - match session { - SessionRef::Borrow(s) => Self::Borrow(s), - SessionRef::Shared(s) if downgrade => Self::Weak(Arc::downgrade(&s)), - SessionRef::Shared(s) => Self::Arc(s), - } - } - - pub(crate) fn upgrade(&self) -> Option> { - match self { - Self::Borrow(s) => Some(SessionRef::Borrow(s)), - Self::Arc(s) => Some(SessionRef::Shared(s.clone())), - Self::Weak(s) => s.upgrade().map(SessionRef::Shared), - } - } -} - /// A trait implemented by types that can be undeclared. pub trait UndeclarableSealed { type Undeclaration: Resolve> + Send; @@ -501,113 +392,60 @@ pub trait Undeclarable: UndeclarableSealed {} impl Undeclarable for T where T: UndeclarableSealed {} -/// A zenoh session. -/// -pub struct Session { +pub(crate) struct SessionInner { pub(crate) runtime: Runtime, - pub(crate) state: Arc>, + pub(crate) state: RwLock, pub(crate) id: u16, - close_on_drop: bool, owns_runtime: bool, task_controller: TaskController, } +impl fmt::Debug for SessionInner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Session") + .field("id", &self.runtime.zid()) + .finish() + } +} + +/// A zenoh session. +/// +#[derive(Clone)] +pub struct Session(pub(crate) Arc); + static SESSION_ID_COUNTER: AtomicU16 = AtomicU16::new(0); impl Session { pub(crate) fn init( runtime: Runtime, aggregated_subscribers: Vec, aggregated_publishers: Vec, + owns_runtime: bool, ) -> impl Resolve { ResolveClosure::new(move || { let router = runtime.router(); - let state = Arc::new(RwLock::new(SessionState::new( + let state = RwLock::new(SessionState::new( aggregated_subscribers, aggregated_publishers, - ))); - let session = Session { + )); + let session = Arc::new(SessionInner { runtime: runtime.clone(), - state: state.clone(), + state, id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst), - close_on_drop: true, - owns_runtime: false, + owns_runtime, task_controller: TaskController::default(), - }; + }); - runtime.new_handler(Arc::new(admin::Handler::new(session.clone()))); + runtime.new_handler(Arc::new(admin::Handler::new(Arc::downgrade(&session)))); - let primitives = Some(router.new_primitives(Arc::new(session.clone()))); - zwrite!(state).primitives = primitives; + let primitives = Some(router.new_primitives(Arc::new(Arc::downgrade(&session)))); + zwrite!(session.state).primitives = primitives; admin::init(&session); - session + Session(session) }) } - /// Consumes the given `Session`, returning a thread-safe reference-counting - /// pointer to it (`Arc`). This is equivalent to `Arc::new(session)`. - /// - /// This is useful to share ownership of the `Session` between several threads - /// and tasks. It also allows to create [`Subscriber`](crate::pubsub::Subscriber) and - /// [`Queryable`](crate::query::Queryable) with static lifetime that can be moved to several - /// threads and tasks - /// - /// Note: the given zenoh `Session` will be closed when the last reference to - /// it is dropped. - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .await - /// .unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # } - /// ``` - pub fn into_arc(self) -> Arc { - Arc::new(self) - } - - /// Consumes and leaks the given `Session`, returning a `'static` mutable - /// reference to it. The given `Session` will live for the remainder of - /// the program's life. Dropping the returned reference will cause a memory - /// leak. - /// - /// This is useful to move entities (like [`Subscriber`](crate::pubsub::Subscriber)) which - /// lifetimes are bound to the session lifetime in several threads or tasks. - /// - /// Note: the given zenoh `Session` cannot be closed any more. At process - /// termination the zenoh session will terminate abruptly. If possible prefer - /// using [`Session::into_arc()`](Session::into_arc). - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::Session::leak(zenoh::open(zenoh::config::peer()).await.unwrap()); - /// let subscriber = session.declare_subscriber("key/expression").await.unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # } - /// ``` - pub fn leak(s: Self) -> &'static mut Self { - Box::leak(Box::new(s)) - } - /// Returns the identifier of the current session. `zid()` is a convenient shortcut. /// See [`Session::info()`](`Session::info()`) and [`SessionInfo::zid()`](`SessionInfo::zid()`) for more details. pub fn zid(&self) -> ZenohId { @@ -615,7 +453,7 @@ impl Session { } pub fn hlc(&self) -> Option<&HLC> { - self.runtime.hlc() + self.0.runtime.hlc() } /// Close the zenoh [`Session`](Session). @@ -633,23 +471,8 @@ impl Session { /// session.close().await.unwrap(); /// # } /// ``` - pub fn close(mut self) -> impl Resolve> { - ResolveFuture::new(async move { - trace!("close()"); - // set the flag first to avoid double panic if this function panic - self.close_on_drop = false; - self.task_controller.terminate_all(Duration::from_secs(10)); - if self.owns_runtime { - self.runtime.close().await?; - } - let mut state = zwrite!(self.state); - // clean up to break cyclic references from self.state to itself - let primitives = state.primitives.take(); - state.queryables.clear(); - drop(state); - primitives.as_ref().unwrap().send_close(); - Ok(()) - }) + pub fn close(&self) -> impl Resolve> + '_ { + self.0.close() } pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve> + 'a @@ -689,7 +512,7 @@ impl Session { /// # } /// ``` pub fn config(&self) -> &Notifier { - self.runtime.config() + self.0.runtime.config() } /// Get a new Timestamp from a Zenoh session [`Session`](Session). @@ -714,49 +537,173 @@ impl Session { // Called in the case that the runtime is not initialized with an hlc // UNIX_EPOCH is Returns a Timespec::zero(), Unwrap Should be permissable here let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().into(); - Timestamp::new(now, self.runtime.zid().into()) + Timestamp::new(now, self.0.runtime.zid().into()) } } } } -impl<'a> SessionDeclarations<'a, 'a> for Session { - fn info(&self) -> SessionInfo { - SessionRef::Borrow(self).info() +impl Session { + /// Get information about the zenoh [`Session`](Session). + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let info = session.info(); + /// # } + /// ``` + pub fn info(&self) -> SessionInfo { + SessionInfo { + runtime: self.0.runtime.clone(), + } } - fn declare_subscriber<'b, TryIntoKeyExpr>( - &'a self, + + /// Create a [`Subscriber`](crate::pubsub::Subscriber) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The resourkey expression to subscribe to + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let subscriber = session.declare_subscriber("key/expression") + /// .await + /// .unwrap(); + /// tokio::task::spawn(async move { + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {:?}", sample); + /// } + /// }).await; + /// # } + /// ``` + pub fn declare_subscriber<'b, TryIntoKeyExpr>( + &self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, DefaultHandler> + ) -> SubscriberBuilder<'_, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SessionRef::Borrow(self).declare_subscriber(key_expr) + SubscriberBuilder { + session: self, + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + origin: Locality::default(), + handler: DefaultHandler::default(), + } } - fn declare_queryable<'b, TryIntoKeyExpr>( - &'a self, + + /// Create a [`Queryable`](crate::query::Queryable) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching the queries the + /// [`Queryable`](crate::query::Queryable) will reply to + /// + /// # Examples + /// ```no_run + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let queryable = session.declare_queryable("key/expression") + /// .await + /// .unwrap(); + /// tokio::task::spawn(async move { + /// while let Ok(query) = queryable.recv_async().await { + /// query.reply( + /// "key/expression", + /// "value", + /// ).await.unwrap(); + /// } + /// }).await; + /// # } + /// ``` + pub fn declare_queryable<'b, TryIntoKeyExpr>( + &self, key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'a, 'b, DefaultHandler> + ) -> QueryableBuilder<'_, 'b, DefaultHandler> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SessionRef::Borrow(self).declare_queryable(key_expr) + QueryableBuilder { + session: self, + key_expr: key_expr.try_into().map_err(Into::into), + complete: false, + origin: Locality::default(), + handler: DefaultHandler::default(), + } } - fn declare_publisher<'b, TryIntoKeyExpr>( - &'a self, + + /// Create a [`Publisher`](crate::pubsub::Publisher) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching resources to write + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let publisher = session.declare_publisher("key/expression") + /// .await + /// .unwrap(); + /// publisher.put("value").await.unwrap(); + /// # } + /// ``` + pub fn declare_publisher<'b, TryIntoKeyExpr>( + &self, key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'a, 'b> + ) -> PublisherBuilder<'_, 'b> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SessionRef::Borrow(self).declare_publisher(key_expr) + PublisherBuilder { + session: self, + key_expr: key_expr.try_into().map_err(Into::into), + encoding: Encoding::default(), + congestion_control: CongestionControl::DEFAULT, + priority: Priority::DEFAULT, + is_express: false, + destination: Locality::default(), + } } + + /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let liveliness = session + /// .liveliness() + /// .declare_token("key/expression") + /// .await + /// .unwrap(); + /// # } + /// ``` #[zenoh_macros::unstable] - fn liveliness(&'a self) -> Liveliness { - SessionRef::Borrow(self).liveliness() + pub fn liveliness(&self) -> Liveliness<'_> { + Liveliness { session: self } } } @@ -785,11 +732,11 @@ impl Session { >>::Error: Into, { let key_expr: ZResult = key_expr.try_into().map_err(Into::into); - let sid = self.id; + let sid = self.0.id; ResolveClosure::new(move || { let key_expr: KeyExpr = key_expr?; let prefix_len = key_expr.len() as u32; - let expr_id = self.declare_prefix(key_expr.as_str()).wait(); + let expr_id = self.0.declare_prefix(key_expr.as_str()).wait()?; let key_expr = match key_expr.0 { KeyExprInner::Borrowed(key_expr) | KeyExprInner::BorrowedWire { key_expr, .. } => { KeyExpr(KeyExprInner::BorrowedWire { @@ -925,7 +872,7 @@ impl Session { { let selector = selector.try_into().map_err(Into::into); let timeout = { - let conf = self.runtime.config().lock(); + let conf = self.0.runtime.config().lock(); Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())) }; let qos: QoS = request::ext::QoSType::REQUEST.into(); @@ -947,17 +894,6 @@ impl Session { } impl Session { - pub(crate) fn clone(&self) -> Self { - Self { - runtime: self.runtime.clone(), - state: self.state.clone(), - id: self.id, - close_on_drop: false, - owns_runtime: self.owns_runtime, - task_controller: self.task_controller.clone(), - } - } - #[allow(clippy::new_ret_no_self)] pub(super) fn new( config: Config, @@ -975,28 +911,47 @@ impl Session { } let mut runtime = runtime.build().await?; - let mut session = Self::init( + let session = Self::init( runtime.clone(), aggregated_subscribers, aggregated_publishers, + true, ) .await; - session.owns_runtime = true; runtime.start().await?; Ok(session) }) } - - pub(crate) fn declare_prefix<'a>(&'a self, prefix: &'a str) -> impl Resolve + 'a { +} +impl SessionInner { + fn close(&self) -> impl Resolve> + '_ { + ResolveFuture::new(async move { + let Some(primitives) = zwrite!(self.state).primitives.take() else { + return Ok(()); + }; + trace!("close()"); + self.task_controller.terminate_all(Duration::from_secs(10)); + if self.owns_runtime { + self.runtime.close().await?; + } + primitives.send_close(); + Ok(()) + }) + } + pub(crate) fn declare_prefix<'a>( + &'a self, + prefix: &'a str, + ) -> impl Resolve> + 'a { ResolveClosure::new(move || { trace!("declare_prefix({:?})", prefix); let mut state = zwrite!(self.state); + let primitives = state.primitives()?; match state .local_resources .iter() .find(|(_expr_id, res)| res.name() == prefix) { - Some((expr_id, _res)) => *expr_id, + Some((expr_id, _res)) => Ok(*expr_id), None => { let expr_id = state.expr_id_counter.fetch_add(1, Ordering::SeqCst); let mut res = Resource::new(Box::from(prefix)); @@ -1013,7 +968,6 @@ impl Session { } } state.local_resources.insert(expr_id, res); - let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { interest_id: None, @@ -1029,7 +983,7 @@ impl Session { }, }), }); - expr_id + Ok(expr_id) } } }) @@ -1086,7 +1040,7 @@ impl Session { state.publishers.insert(id, pub_state); if let Some(res) = declared_pub { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_interest(Interest { id, @@ -1111,7 +1065,7 @@ impl Session { if !state.publishers.values().any(|p| { p.destination != Locality::SessionLocal && p.remote_id == pub_state.remote_id }) { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_interest(Interest { id: pub_state.remote_id, @@ -1131,7 +1085,7 @@ impl Session { } pub(crate) fn declare_subscriber_inner( - &self, + self: &Arc, key_expr: &KeyExpr, origin: Locality, callback: Callback<'static, Sample>, @@ -1215,7 +1169,7 @@ impl Session { } if let Some(key_expr) = declared_sub { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); // If key_expr is a pure Expr, remap it to optimal Rid or RidWithSuffix // let key_expr = if !key_expr.is_optimized(self) { @@ -1262,7 +1216,11 @@ impl Session { Ok(sub_state) } - pub(crate) fn undeclare_subscriber_inner(&self, sid: Id, kind: SubscriberKind) -> ZResult<()> { + pub(crate) fn undeclare_subscriber_inner( + self: &Arc, + sid: Id, + kind: SubscriberKind, + ) -> ZResult<()> { let mut state = zwrite!(self.state); if let Some(sub_state) = state.subscribers_mut(kind).remove(&sid) { trace!("undeclare_subscriber({:?})", sub_state); @@ -1289,7 +1247,7 @@ impl Session { if !state.subscribers(kind).values().any(|s| { s.origin != Locality::SessionLocal && s.remote_id == sub_state.remote_id }) { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_declare(Declare { interest_id: None, @@ -1312,7 +1270,7 @@ impl Session { } else { #[cfg(feature = "unstable")] if kind == SubscriberKind::LivelinessSubscriber { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_interest(Interest { @@ -1354,7 +1312,7 @@ impl Session { state.queryables.insert(id, qable_state.clone()); if origin != Locality::SessionLocal { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); let qabl_info = QueryableInfoType { complete, @@ -1380,7 +1338,7 @@ impl Session { if let Some(qable_state) = state.queryables.remove(&qid) { trace!("undeclare_queryable({:?})", qable_state); if qable_state.origin != Locality::SessionLocal { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_declare(Declare { interest_id: None, @@ -1415,7 +1373,7 @@ impl Session { }); state.tokens.insert(tok_state.id, tok_state.clone()); - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_declare(Declare { interest_id: None, @@ -1477,7 +1435,7 @@ impl Session { } } - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_interest(Interest { @@ -1502,7 +1460,7 @@ impl Session { let key_expr = &tok_state.key_expr; let twin_tok = state.tokens.values().any(|s| s.key_expr == *key_expr); if !twin_tok { - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); primitives.send_declare(Declare { interest_id: None, @@ -1593,7 +1551,7 @@ impl Session { } #[zenoh_macros::unstable] - pub(crate) fn update_status_up(&self, state: &SessionState, key_expr: &KeyExpr) { + pub(crate) fn update_status_up(self: &Arc, state: &SessionState, key_expr: &KeyExpr) { for msub in state.matching_listeners.values() { if key_expr.intersects(&msub.key_expr) { // Cannot hold session lock when calling tables (matching_status()) @@ -1631,7 +1589,7 @@ impl Session { } #[zenoh_macros::unstable] - pub(crate) fn update_status_down(&self, state: &SessionState, key_expr: &KeyExpr) { + pub(crate) fn update_status_down(self: &Arc, state: &SessionState, key_expr: &KeyExpr) { for msub in state.matching_listeners.values() { if key_expr.intersects(&msub.key_expr) { // Cannot hold session lock when calling tables (matching_status()) @@ -1747,7 +1705,7 @@ impl Session { #[allow(clippy::too_many_arguments)] pub(crate) fn query( - &self, + self: &Arc, key_expr: &KeyExpr<'_>, parameters: &Parameters<'_>, target: QueryTarget, @@ -1782,13 +1740,13 @@ impl Session { let token = self.task_controller.get_cancellation_token(); self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let state = self.state.clone(); + let session = self.clone(); #[cfg(feature = "unstable")] let zid = self.runtime.zid(); async move { tokio::select! { _ = tokio::time::sleep(timeout) => { - let mut state = zwrite!(state); + let mut state = zwrite!(session.state); if let Some(query) = state.queries.remove(&qid) { std::mem::drop(state); tracing::debug!("Timeout on query {}! Send error and close.", qid); @@ -1823,7 +1781,7 @@ impl Session { }, ); - let primitives = state.primitives.as_ref().unwrap().clone(); + let primitives = state.primitives()?; drop(state); if destination != Locality::SessionLocal { @@ -1877,7 +1835,7 @@ impl Session { #[cfg(feature = "unstable")] pub(crate) fn liveliness_query( - &self, + self: &Arc, key_expr: &KeyExpr<'_>, timeout: Duration, callback: Callback<'static, Reply>, @@ -1888,12 +1846,12 @@ impl Session { let token = self.task_controller.get_cancellation_token(); self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let state = self.state.clone(); + let session = self.clone(); let zid = self.runtime.zid(); async move { tokio::select! { _ = tokio::time::sleep(timeout) => { - let mut state = zwrite!(state); + let mut state = zwrite!(session.state); if let Some(query) = state.liveliness_queries.remove(&id) { std::mem::drop(state); tracing::debug!("Timeout on liveliness query {}! Send error and close.", id); @@ -1915,264 +1873,115 @@ impl Session { .liveliness_queries .insert(id, LivelinessQueryState { callback }); - let primitives = state.primitives.as_ref().unwrap().clone(); - drop(state); - - primitives.send_interest(Interest { - id, - mode: InterestMode::Current, - options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS, - wire_expr: Some(wexpr.clone()), - ext_qos: request::ext::QoSType::DEFAULT, - ext_tstamp: None, - ext_nodeid: request::ext::NodeIdType::DEFAULT, - }); - - Ok(()) - } - - #[allow(clippy::too_many_arguments)] - pub(crate) fn handle_query( - &self, - local: bool, - key_expr: &WireExpr, - parameters: &str, - qid: RequestId, - _target: TargetType, - _consolidation: Consolidation, - body: Option, - attachment: Option, - ) { - let (primitives, key_expr, queryables) = { - let state = zread!(self.state); - match state.wireexpr_to_keyexpr(key_expr, local) { - Ok(key_expr) => { - let queryables = state - .queryables - .iter() - .filter( - |(_, queryable)| - (queryable.origin == Locality::Any - || (local == (queryable.origin == Locality::SessionLocal))) - && - match state.local_wireexpr_to_expr(&queryable.key_expr) { - Ok(qablname) => { - qablname.intersects(&key_expr) - } - Err(err) => { - error!( - "{}. Internal error (queryable key_expr to key_expr failed).", - err - ); - false - } - } - ) - .map(|(id, qable)| (*id, qable.callback.clone())) - .collect::)>>(); - ( - state.primitives.as_ref().unwrap().clone(), - key_expr.into_owned(), - queryables, - ) - } - Err(err) => { - error!("Received Query for unknown key_expr: {}", err); - return; - } - } - }; - - let zid = self.runtime.zid(); - - let query_inner = Arc::new(QueryInner { - key_expr, - parameters: parameters.to_owned().into(), - qid, - zid: zid.into(), - primitives: if local { - Arc::new(self.clone()) - } else { - primitives - }, - }); - for (eid, callback) in queryables { - callback(Query { - inner: query_inner.clone(), - eid, - value: body.as_ref().map(|b| Value { - payload: b.payload.clone().into(), - encoding: b.encoding.clone().into(), - }), - attachment: attachment.clone(), - }); - } - } -} - -impl<'s> SessionDeclarations<'s, 'static> for Arc { - /// Create a [`Subscriber`](crate::pubsub::Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The resourkey expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .await - /// .unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # } - /// ``` - fn declare_subscriber<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'static, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - SubscriberBuilder { - session: SessionRef::Shared(self.clone()), - key_expr: key_expr.try_into().map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, - origin: Locality::default(), - handler: DefaultHandler::default(), - } - } - - /// Create a [`Queryable`](crate::query::Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](crate::query::Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let queryable = session.declare_queryable("key/expression") - /// .await - /// .unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply( - /// "key/expression", - /// "value", - /// ).await.unwrap(); - /// } - /// }).await; - /// # } - /// ``` - fn declare_queryable<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'static, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - QueryableBuilder { - session: SessionRef::Shared(self.clone()), - key_expr: key_expr.try_into().map_err(Into::into), - complete: false, - origin: Locality::default(), - handler: DefaultHandler::default(), - } - } + let primitives = state.primitives()?; + drop(state); - /// Create a [`Publisher`](crate::pubsub::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression") - /// .await - /// .unwrap(); - /// publisher.put("value").await.unwrap(); - /// # } - /// ``` - fn declare_publisher<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'static, 'b> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - PublisherBuilder { - session: SessionRef::Shared(self.clone()), - key_expr: key_expr.try_into().map_err(Into::into), - encoding: Encoding::default(), - congestion_control: CongestionControl::DEFAULT, - priority: Priority::DEFAULT, - is_express: false, - destination: Locality::default(), - } - } + primitives.send_interest(Interest { + id, + mode: InterestMode::Current, + options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS, + wire_expr: Some(wexpr.clone()), + ext_qos: request::ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: request::ext::NodeIdType::DEFAULT, + }); - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .await - /// .unwrap(); - /// # } - /// ``` - #[zenoh_macros::unstable] - fn liveliness(&'s self) -> Liveliness<'static> { - Liveliness { - session: SessionRef::Shared(self.clone()), - } + Ok(()) } - fn info(&'s self) -> SessionInfo<'static> { - SessionInfo { - session: SessionRef::Shared(self.clone()), + #[allow(clippy::too_many_arguments)] + pub(crate) fn handle_query( + self: &Arc, + local: bool, + key_expr: &WireExpr, + parameters: &str, + qid: RequestId, + _target: TargetType, + _consolidation: Consolidation, + body: Option, + attachment: Option, + ) { + let (primitives, key_expr, queryables) = { + let state = zread!(self.state); + let Ok(primitives) = state.primitives() else { + return; + }; + match state.wireexpr_to_keyexpr(key_expr, local) { + Ok(key_expr) => { + let queryables = state + .queryables + .iter() + .filter( + |(_, queryable)| + (queryable.origin == Locality::Any + || (local == (queryable.origin == Locality::SessionLocal))) + && + match state.local_wireexpr_to_expr(&queryable.key_expr) { + Ok(qablname) => { + qablname.intersects(&key_expr) + } + Err(err) => { + error!( + "{}. Internal error (queryable key_expr to key_expr failed).", + err + ); + false + } + } + ) + .map(|(id, qable)| (*id, qable.callback.clone())) + .collect::)>>(); + (primitives, key_expr.into_owned(), queryables) + } + Err(err) => { + error!("Received Query for unknown key_expr: {}", err); + return; + } + } + }; + + let zid = self.runtime.zid(); + + let query_inner = Arc::new(QueryInner { + key_expr, + parameters: parameters.to_owned().into(), + qid, + zid: zid.into(), + primitives: if local { + Arc::new(Arc::downgrade(self)) + } else { + primitives + }, + }); + for (eid, callback) in queryables { + callback(Query { + inner: query_inner.clone(), + eid, + value: body.as_ref().map(|b| Value { + payload: b.payload.clone().into(), + encoding: b.encoding.clone().into(), + }), + attachment: attachment.clone(), + }); } } } -impl Primitives for Session { +impl Primitives for Weak { fn send_interest(&self, msg: zenoh_protocol::network::Interest) { + if self.upgrade().is_none() { + return; + } trace!("recv Interest {} {:?}", msg.id, msg.wire_expr); } fn send_declare(&self, msg: zenoh_protocol::network::Declare) { + let Some(session) = self.upgrade() else { + return; + }; match msg.body { zenoh_protocol::network::DeclareBody::DeclareKeyExpr(m) => { trace!("recv DeclareKeyExpr {} {:?}", m.id, m.wire_expr); - let state = &mut zwrite!(self.state); + let state = &mut zwrite!(session.state); match state.remote_key_to_expr(&m.wire_expr) { Ok(key_expr) => { let mut res_node = ResourceNode::new(key_expr.clone().into()); @@ -2204,14 +2013,14 @@ impl Primitives for Session { trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr); #[cfg(feature = "unstable")] { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); match state .wireexpr_to_keyexpr(&m.wire_expr, false) .map(|e| e.into_owned()) { Ok(expr) => { state.remote_subscribers.insert(m.id, expr.clone()); - self.update_status_up(&state, &expr); + session.update_status_up(&state, &expr); } Err(err) => { tracing::error!( @@ -2226,9 +2035,9 @@ impl Primitives for Session { trace!("recv UndeclareSubscriber {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); if let Some(expr) = state.remote_subscribers.remove(&m.id) { - self.update_status_down(&state, &expr); + session.update_status_down(&state, &expr); } else { tracing::error!("Received Undeclare Subscriber for unknown id: {}", m.id); } @@ -2244,7 +2053,7 @@ impl Primitives for Session { trace!("recv DeclareToken {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); match state .wireexpr_to_keyexpr(&m.wire_expr, false) .map(|e| e.into_owned()) @@ -2276,7 +2085,7 @@ impl Primitives for Session { drop(state); - self.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( false, &m.wire_expr, None, @@ -2297,7 +2106,7 @@ impl Primitives for Session { trace!("recv UndeclareToken {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); if let Some(key_expr) = state.remote_tokens.remove(&m.id) { drop(state); @@ -2306,9 +2115,9 @@ impl Primitives for Session { ..Default::default() }; - self.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( false, - &key_expr.to_wire(self), + &key_expr.to_wire(&session), Some(data_info), ZBuf::default(), SubscriberKind::LivelinessSubscriber, @@ -2328,9 +2137,9 @@ impl Primitives for Session { ..Default::default() }; - self.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( false, - &key_expr.to_wire(self), + &key_expr.to_wire(&session), Some(data_info), ZBuf::default(), SubscriberKind::LivelinessSubscriber, @@ -2353,7 +2162,7 @@ impl Primitives for Session { #[cfg(feature = "unstable")] if let Some(interest_id) = msg.interest_id { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); let _ = state.liveliness_queries.remove(&interest_id); } } @@ -2361,6 +2170,9 @@ impl Primitives for Session { } fn send_push(&self, msg: Push) { + let Some(session) = self.upgrade() else { + return; + }; trace!("recv Push {:?}", msg); match msg.payload { PushBody::Put(m) => { @@ -2372,7 +2184,7 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( false, &msg.wire_expr, Some(info), @@ -2390,7 +2202,7 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( false, &msg.wire_expr, Some(info), @@ -2403,9 +2215,12 @@ impl Primitives for Session { } fn send_request(&self, msg: Request) { + let Some(session) = self.upgrade() else { + return; + }; trace!("recv Request {:?}", msg); match msg.payload { - RequestBody::Query(m) => self.handle_query( + RequestBody::Query(m) => session.handle_query( false, &msg.wire_expr, &m.parameters, @@ -2419,10 +2234,13 @@ impl Primitives for Session { } fn send_response(&self, msg: Response) { + let Some(session) = self.upgrade() else { + return; + }; trace!("recv Response {:?}", msg); match msg.payload { ResponseBody::Err(e) => { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); match state.queries.get_mut(&msg.rid) { Some(query) => { let callback = query.callback.clone(); @@ -2444,7 +2262,7 @@ impl Primitives for Session { } } ResponseBody::Reply(m) => { - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); let key_expr = match state.remote_key_to_expr(&msg.wire_expr) { Ok(key) => key.into_owned(), Err(e) => { @@ -2612,8 +2430,11 @@ impl Primitives for Session { } fn send_response_final(&self, msg: ResponseFinal) { + let Some(session) = self.upgrade() else { + return; + }; trace!("recv ResponseFinal {:?}", msg); - let mut state = zwrite!(self.state); + let mut state = zwrite!(session.state); match state.queries.get_mut(&msg.rid) { Some(query) => { query.nb_final -= 1; @@ -2635,178 +2456,28 @@ impl Primitives for Session { } fn send_close(&self) { + if self.upgrade().is_none() { + return; + } trace!("recv Close"); } } -impl Drop for Session { +impl Drop for SessionInner { fn drop(&mut self) { - if self.close_on_drop { - let _ = self.clone().close().wait(); + if let Err(error) = self.close().wait() { + tracing::error!(error); } } } impl fmt::Debug for Session { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Session").field("id", &self.zid()).finish() + self.0.fmt(f) } } -/// Functions to create zenoh entities -/// -/// This trait contains functions to create zenoh entities like -/// [`Subscriber`](crate::pubsub::Subscriber), and -/// [`Queryable`](crate::query::Queryable) -/// -/// This trait is implemented by [`Session`](crate::session::Session) itself and -/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](std::sync::Arc) -/// -/// # Examples -/// ```no_run -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::prelude::*; -/// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); -/// let subscriber = session.declare_subscriber("key/expression") -/// .await -/// .unwrap(); -/// tokio::task::spawn(async move { -/// while let Ok(sample) = subscriber.recv_async().await { -/// println!("Received: {:?}", sample); -/// } -/// }).await; -/// # } -/// ``` -pub trait SessionDeclarations<'s, 'a> { - /// Create a [`Subscriber`](crate::pubsub::Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The resourkey expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .await - /// .unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # } - /// ``` - fn declare_subscriber<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Queryable`](crate::query::Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](crate::query::Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let queryable = session.declare_queryable("key/expression") - /// .await - /// .unwrap(); - /// tokio::task::spawn(async move { - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply( - /// "key/expression", - /// "value", - /// ).await.unwrap(); - /// } - /// }).await; - /// # } - /// ``` - fn declare_queryable<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'a, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Publisher`](crate::pubsub::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression") - /// .await - /// .unwrap(); - /// publisher.put("value").await.unwrap(); - /// # } - /// ``` - fn declare_publisher<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'a, 'b> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap().into_arc(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .await - /// .unwrap(); - /// # } - /// ``` - #[zenoh_macros::unstable] - fn liveliness(&'s self) -> Liveliness<'a>; - /// Get information about the zenoh [`Session`](Session). - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::prelude::*; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); - /// let info = session.info(); - /// # } - /// ``` - fn info(&'s self) -> SessionInfo<'a>; -} - -impl crate::net::primitives::EPrimitives for Session { +impl crate::net::primitives::EPrimitives for Weak { #[inline] fn send_interest(&self, ctx: crate::net::routing::RoutingContext) { (self as &dyn Primitives).send_interest(ctx.msg) @@ -3006,6 +2677,7 @@ impl Wait for InitBuilder { self.runtime, self.aggregated_subscribers, self.aggregated_publishers, + false, ) .wait()) } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 5bca8abcfa..1d15f71757 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -17,7 +17,7 @@ use std::{ future::{IntoFuture, Ready}, mem::size_of, ops::{Deref, DerefMut}, - sync::Arc, + sync::{Arc, Weak}, }; use tracing::error; @@ -30,15 +30,18 @@ use { zenoh_protocol::core::EntityGlobalIdProto, }; -use crate::api::{ - handlers::{locked, Callback, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - sample::{Locality, Sample}, - session::{MaybeWeakSessionRef, SessionRef, UndeclarableSealed}, - Id, -}; #[cfg(feature = "unstable")] use crate::pubsub::Reliability; +use crate::{ + api::{ + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::{Locality, Sample}, + session::{SessionInner, UndeclarableSealed}, + Id, + }, + Session, +}; pub(crate) struct SubscriberState { pub(crate) id: Id, @@ -58,10 +61,10 @@ impl fmt::Debug for SubscriberState { } #[derive(Debug)] -pub(crate) struct SubscriberInner<'a> { +pub(crate) struct SubscriberInner { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, - pub(crate) session: MaybeWeakSessionRef<'a>, + pub(crate) session: Weak, pub(crate) state: Arc, pub(crate) kind: SubscriberKind, // Subscriber is undeclared on drop unless its handler is a ZST, i.e. it is callback-only @@ -85,19 +88,19 @@ pub(crate) struct SubscriberInner<'a> { /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -pub struct SubscriberUndeclaration<'a, Handler>(Subscriber<'a, Handler>); +pub struct SubscriberUndeclaration(Subscriber); -impl Resolvable for SubscriberUndeclaration<'_, Handler> { +impl Resolvable for SubscriberUndeclaration { type To = ZResult<()>; } -impl Wait for SubscriberUndeclaration<'_, Handler> { +impl Wait for SubscriberUndeclaration { fn wait(mut self) -> ::To { self.0.undeclare_impl() } } -impl IntoFuture for SubscriberUndeclaration<'_, Handler> { +impl IntoFuture for SubscriberUndeclaration { type Output = ::To; type IntoFuture = Ready<::To>; @@ -126,9 +129,9 @@ impl IntoFuture for SubscriberUndeclaration<'_, Handler> { #[derive(Debug)] pub struct SubscriberBuilder<'a, 'b, Handler> { #[cfg(feature = "unstable")] - pub session: SessionRef<'a>, + pub session: &'a Session, #[cfg(not(feature = "unstable"))] - pub(crate) session: SessionRef<'a>, + pub(crate) session: &'a Session, #[cfg(feature = "unstable")] pub key_expr: ZResult>, @@ -304,7 +307,7 @@ where Handler: IntoHandler<'static, Sample> + Send, Handler::Handler: Send, { - type To = ZResult>; + type To = ZResult>; } impl<'a, Handler> Wait for SubscriberBuilder<'a, '_, Handler> @@ -316,8 +319,8 @@ where let key_expr = self.key_expr?; let session = self.session; let (callback, receiver) = self.handler.into_handler(); - let undeclare_on_drop = size_of::() > 0; session + .0 .declare_subscriber_inner( &key_expr, self.origin, @@ -329,19 +332,17 @@ where #[cfg(not(feature = "unstable"))] &SubscriberInfo::default(), ) - .map(|sub_state| { - Subscriber { - inner: SubscriberInner { - #[cfg(feature = "unstable")] - session_id: session.zid(), - session: MaybeWeakSessionRef::new(session, !undeclare_on_drop), - state: sub_state, - kind: SubscriberKind::Subscriber, - // `size_of::() == 0` means callback-only subscriber - undeclare_on_drop, - }, - handler: receiver, - } + .map(|sub_state| Subscriber { + inner: SubscriberInner { + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: Arc::downgrade(&session.0), + state: sub_state, + kind: SubscriberKind::Subscriber, + // `size_of::() == 0` means callback-only subscriber + undeclare_on_drop: size_of::() > 0, + }, + handler: receiver, }) } } @@ -408,12 +409,12 @@ where /// ``` #[non_exhaustive] #[derive(Debug)] -pub struct Subscriber<'a, Handler> { - pub(crate) inner: SubscriberInner<'a>, +pub struct Subscriber { + pub(crate) inner: SubscriberInner, pub(crate) handler: Handler, } -impl<'a, Handler> Subscriber<'a, Handler> { +impl Subscriber { /// Returns the [`EntityGlobalId`] of this Subscriber. /// /// # Examples @@ -473,9 +474,9 @@ impl<'a, Handler> Subscriber<'a, Handler> { /// # } /// ``` #[inline] - pub fn undeclare(self) -> SubscriberUndeclaration<'a, Handler> + pub fn undeclare(self) -> SubscriberUndeclaration where - Handler: Send + 'a, + Handler: Send, { self.undeclare_inner(()) } @@ -492,7 +493,7 @@ impl<'a, Handler> Subscriber<'a, Handler> { } } -impl Drop for Subscriber<'_, Handler> { +impl Drop for Subscriber { fn drop(&mut self) { if self.inner.undeclare_on_drop { if let Err(error) = self.undeclare_impl() { @@ -502,29 +503,29 @@ impl Drop for Subscriber<'_, Handler> { } } -impl<'a, Handler: Send + 'a> UndeclarableSealed<()> for Subscriber<'a, Handler> { - type Undeclaration = SubscriberUndeclaration<'a, Handler>; +impl<'a, Handler: Send + 'a> UndeclarableSealed<()> for Subscriber { + type Undeclaration = SubscriberUndeclaration; fn undeclare_inner(self, _: ()) -> Self::Undeclaration { SubscriberUndeclaration(self) } } -impl Deref for Subscriber<'_, Handler> { +impl Deref for Subscriber { type Target = Handler; fn deref(&self) -> &Self::Target { self.handler() } } -impl DerefMut for Subscriber<'_, Handler> { +impl DerefMut for Subscriber { fn deref_mut(&mut self) -> &mut Self::Target { self.handler_mut() } } /// A [`Subscriber`] that provides data through a `flume` channel. -pub type FlumeSubscriber<'a> = Subscriber<'a, flume::Receiver>; +pub type FlumeSubscriber = Subscriber>; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SubscriberKind { diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 1b563bb4e4..aacc52eefa 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -191,15 +191,13 @@ pub mod session { pub use zenoh_config::wrappers::{EntityGlobalId, ZenohId}; pub use zenoh_protocol::core::EntityId; - #[zenoh_macros::unstable] - pub use crate::api::session::SessionRef; #[zenoh_macros::internal] pub use crate::api::session::{init, InitBuilder}; pub use crate::api::{ builders::publisher::{SessionDeleteBuilder, SessionPutBuilder}, info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, SessionInfo, ZenohIdBuilder}, query::SessionGetBuilder, - session::{open, OpenBuilder, Session, SessionDeclarations, Undeclarable}, + session::{open, OpenBuilder, Session, Undeclarable}, }; } @@ -238,7 +236,6 @@ pub mod pubsub { #[zenoh_macros::unstable] pub use crate::api::publisher::{ MatchingListener, MatchingListenerBuilder, MatchingListenerUndeclaration, MatchingStatus, - PublisherDeclarations, PublisherRef, }; pub use crate::api::{ builders::publisher::{ diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 373d56c65a..022a2d63cb 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -25,8 +25,6 @@ //! ``` mod _prelude { - #[zenoh_macros::unstable] - pub use crate::api::publisher::PublisherDeclarations; #[zenoh_macros::unstable] pub use crate::api::selector::ZenohParameters; pub use crate::{ @@ -34,7 +32,7 @@ mod _prelude { builders::sample::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, }, - session::{SessionDeclarations, Undeclarable}, + session::Undeclarable, }, config::ValidatedMap, Error as ZError, Resolvable, Resolve, Result as ZResult, diff --git a/zenoh/tests/events.rs b/zenoh/tests/events.rs index 11a6e18b53..e3a4d61656 100644 --- a/zenoh/tests/events.rs +++ b/zenoh/tests/events.rs @@ -52,7 +52,6 @@ async fn close_session(session: Session) { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_events() { - use zenoh::prelude::SessionDeclarations; let session = open_session(&["tcp/127.0.0.1:18447"], &[]).await; let zid = session.zid(); let sub1 = diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 4d964cc1cf..c67a1deb6d 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -19,7 +19,7 @@ use zenoh_core::ztimeout; async fn test_liveliness_subscriber_clique() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); @@ -83,7 +83,7 @@ async fn test_liveliness_subscriber_clique() { async fn test_liveliness_query_clique() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); @@ -140,7 +140,7 @@ async fn test_liveliness_query_clique() { async fn test_liveliness_subscriber_brokered() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -219,7 +219,7 @@ async fn test_liveliness_subscriber_brokered() { async fn test_liveliness_query_brokered() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); @@ -290,7 +290,7 @@ async fn test_liveliness_query_brokered() { async fn test_liveliness_subscriber_local() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -333,7 +333,7 @@ async fn test_liveliness_subscriber_local() { async fn test_liveliness_query_local() { use std::time::Duration; - use zenoh::{config, prelude::*, sample::SampleKind}; + use zenoh::{config, sample::SampleKind}; use zenoh_config::WhatAmI; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 07971b7853..1023584c70 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -56,7 +56,7 @@ enum Task { impl Task { async fn run( &self, - session: Arc, + session: Session, remaining_checkpoints: Arc, token: CancellationToken, ) -> Result<()> { @@ -386,7 +386,7 @@ impl Recipe { // In case of client can't connect to some peers/routers loop { if let Ok(session) = ztimeout!(zenoh::open(config.clone())) { - break session.into_arc(); + break session; } else { tokio::time::sleep(Duration::from_secs(1)).await; } @@ -421,7 +421,7 @@ impl Recipe { // node_task_tracker.wait().await; // Close the session once all the task associated with the node are done. - ztimeout!(Arc::try_unwrap(session).unwrap().close())?; + ztimeout!(session.close())?; println!("Node: {} is closed.", &node.name); Result::Ok(())