From 0ab9e009a41d910ca6f2b939ecd6af30ad044b27 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Tue, 3 Sep 2024 17:39:20 +0200 Subject: [PATCH] fix: use weak everywhere! --- zenoh/src/api/builders/publisher.rs | 13 +++++++-- zenoh/src/api/info.rs | 2 +- zenoh/src/api/publisher.rs | 43 +++++++++++++++++------------ zenoh/src/api/queryable.rs | 10 +++---- zenoh/src/api/session.rs | 1 + zenoh/src/api/subscriber.rs | 10 +++---- 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 43480e4df1..d45b24f1c9 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -11,7 +11,10 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::future::{IntoFuture, Ready}; +use std::{ + future::{IntoFuture, Ready}, + sync::Arc, +}; use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_protocol::{core::CongestionControl, network::Mapping}; @@ -299,7 +302,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { // internal function for performing the publication fn create_one_shot_publisher(self) -> ZResult> { Ok(Publisher { - session: self.session.clone().0, + #[cfg(feature = "unstable")] + session_id: self.session.0.runtime.zid(), + session: Arc::downgrade(&self.session.0), id: 0, // This is a one shot Publisher key_expr: self.key_expr?, encoding: self.encoding, @@ -356,7 +361,9 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { .0 .declare_publisher_inner(key_expr.clone(), self.destination)?; Ok(Publisher { - session: self.session.0.clone(), + #[cfg(feature = "unstable")] + session_id: self.session.0.runtime.zid(), + session: Arc::downgrade(&self.session.0), id, key_expr, encoding: self.encoding, diff --git a/zenoh/src/api/info.rs b/zenoh/src/api/info.rs index 88f8dd57b7..0993663996 100644 --- a/zenoh/src/api/info.rs +++ b/zenoh/src/api/info.rs @@ -156,7 +156,7 @@ impl<'a> IntoFuture for PeersZenohIdBuilder<'a> { } } -/// Struct returned by [`Session::info()`](crate::session::SessionDeclarations::info) which allows +/// Struct returned by [`Session::info()`](crate::Session::info) which allows /// to access information about the current zenoh [`Session`](crate::Session). /// /// # Examples diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 55c9360348..5500ea3165 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -17,7 +17,7 @@ use std::{ fmt, future::{IntoFuture, Ready}, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, task::{Context, Poll}, }; @@ -38,6 +38,7 @@ use { }, std::{collections::HashSet, sync::Mutex}, zenoh_config::wrappers::EntityGlobalId, + zenoh_config::ZenohId, zenoh_protocol::core::EntityGlobalIdProto, }; @@ -106,7 +107,9 @@ impl fmt::Debug for PublisherState { /// ``` #[derive(Debug, Clone)] pub struct Publisher<'a> { - pub(crate) session: Arc, + #[cfg(feature = "unstable")] + pub(crate) session_id: ZenohId, + pub(crate) session: Weak, pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) encoding: Encoding, @@ -120,6 +123,12 @@ pub struct Publisher<'a> { } impl<'a> Publisher<'a> { + fn session(&self) -> ZResult> { + self.session + .upgrade() + .ok_or_else(|| zerror!("session closed").into()) + } + /// Returns the [`EntityGlobalId`] of this Publisher. /// /// # Examples @@ -138,7 +147,7 @@ impl<'a> Publisher<'a> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.session.runtime.zid().into(), + zid: self.session_id.into(), eid: self.id, } .into() @@ -245,7 +254,7 @@ impl<'a> Publisher<'a> { #[zenoh_macros::unstable] pub fn matching_status(&self) -> impl Resolve> + '_ { zenoh_core::ResolveFuture::new(async move { - self.session + self.session()? .matching_status(self.key_expr(), self.destination) }) } @@ -301,14 +310,17 @@ impl<'a> Publisher<'a> { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.undeclare_on_drop = false; + let Ok(session) = self.session() else { + return Ok(()); + }; #[cfg(feature = "unstable")] { let ids: Vec = zlock!(self.matching_listeners).drain().collect(); for id in ids { - self.session.undeclare_matches_listener_inner(id)? + session.undeclare_matches_listener_inner(id)? } } - self.session.undeclare_publisher_inner(self.id) + session.undeclare_publisher_inner(self.id) } } @@ -415,19 +427,16 @@ impl Publisher<'_> { attachment: Option, ) -> ZResult<()> { tracing::trace!("write({:?}, [...])", &self.key_expr); - let primitives = zread!(self.session.state) - .primitives - .as_ref() - .unwrap() - .clone(); + let session = self.session()?; + let primitives = zread!(session.state).primitives()?; let timestamp = if timestamp.is_none() { - self.session.runtime.new_timestamp() + session.runtime.new_timestamp() } else { timestamp }; if self.destination != Locality::SessionLocal { primitives.send_push(Push { - wire_expr: self.key_expr.to_wire(&self.session).to_owned(), + wire_expr: self.key_expr.to_wire(&session).to_owned(), ext_qos: ext::QoSType::new( self.priority.into(), self.congestion_control, @@ -475,9 +484,9 @@ impl Publisher<'_> { )), }; - self.session.execute_subscriber_callbacks( + session.execute_subscriber_callbacks( true, - &self.key_expr.to_wire(&self.session), + &self.key_expr.to_wire(&session), Some(data_info), payload.into(), SubscriberKind::Subscriber, @@ -759,7 +768,7 @@ where let (callback, receiver) = self.handler.into_handler(); let state = self .publisher - .session + .session()? .declare_matches_listener_inner(self.publisher, callback)?; zlock!(self.publisher.matching_listeners).insert(state.id); Ok(MatchingListener { @@ -921,7 +930,7 @@ impl Wait for MatchingListenerUndeclaration<'_> { zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher - .session + .session()? .undeclare_matches_listener_inner(self.subscriber.state.id) } } diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 37a5b6bc21..590f864218 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -736,7 +736,7 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> { /// A queryable that provides data through a [`Handler`](crate::handlers::IntoHandler). /// /// Queryables can be created from a zenoh [`Session`](crate::Session) -/// with the [`declare_queryable`](crate::session::SessionDeclarations::declare_queryable) function +/// with the [`declare_queryable`](crate::Session::declare_queryable) function /// and the [`with`](QueryableBuilder::with) function /// of the resulting builder. /// @@ -863,10 +863,10 @@ impl Queryable { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.inner.undeclare_on_drop = false; - match self.inner.session.upgrade() { - Some(session) => session.close_queryable(self.inner.state.id), - None => Ok(()), - } + let Some(session) = self.inner.session.upgrade() else { + return Ok(()); + }; + session.close_queryable(self.inner.state.id) } } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 63284d50d2..417100f22a 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -938,6 +938,7 @@ impl SessionInner { Ok(()) }) } + pub(crate) fn declare_prefix<'a>( &'a self, prefix: &'a str, diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 1d15f71757..6d3170c39d 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -484,12 +484,10 @@ impl Subscriber { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.inner.undeclare_on_drop = false; - match self.inner.session.upgrade() { - Some(session) => { - session.undeclare_subscriber_inner(self.inner.state.id, self.inner.kind) - } - None => Ok(()), - } + let Some(session) = self.inner.session.upgrade() else { + return Ok(()); + }; + session.undeclare_subscriber_inner(self.inner.state.id, self.inner.kind) } }