Skip to content

Commit

Permalink
feat: don't undeclare objects on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Aug 26, 2024
1 parent 8b027e9 commit 1bd2700
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 279 deletions.
2 changes: 0 additions & 2 deletions examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ async fn main() {
println!("Press CTRL-C to undeclare LivelinessToken and quit...");
std::thread::park();

// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
token.undeclare().await.unwrap();
}

Expand Down
4 changes: 1 addition & 3 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
2 changes: 0 additions & 2 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -362,7 +361,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,9 @@ impl<'a> KeyExpr<'a> {
}
}

impl<'a> UndeclarableSealed<&'a Session, KeyExprUndeclaration<'a>> for KeyExpr<'a> {
fn undeclare_inner(self, session: &'a Session) -> KeyExprUndeclaration<'a> {
impl<'a> UndeclarableSealed<&'a Session> for KeyExpr<'a> {
type Res = KeyExprUndeclaration<'a>;
fn undeclare_inner(self, session: &'a Session) -> Self::Res {
KeyExprUndeclaration {
session,
expr: self,
Expand Down
54 changes: 16 additions & 38 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
subscriber::{Subscriber, SubscriberInner},
Id,
};
use crate::api::session::WeakSessionRef;

/// A structure with functions to declare a
/// [`LivelinessToken`](LivelinessToken), query
Expand Down Expand Up @@ -254,9 +255,8 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
session
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
session,
session: session.into(),
state: tok_state,
undeclare_on_drop: true,
})
}
}
Expand All @@ -283,13 +283,11 @@ pub(crate) struct LivelinessTokenState {
///
/// A declared liveliness token will be seen as alive by any other Zenoh
/// application in the system that monitors it while the liveliness token
/// is not undeclared or dropped, while the Zenoh application that declared
/// is not undeclared, while the Zenoh application that declared
/// it is alive (didn't stop or crashed) and while the Zenoh application
/// that declared the token has Zenoh connectivity with the Zenoh application
/// that monitors it.
///
/// `LivelinessTokens` are automatically undeclared when dropped.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand All @@ -307,9 +305,8 @@ pub(crate) struct LivelinessTokenState {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) session: WeakSessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
Expand Down Expand Up @@ -344,9 +341,10 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
let Some(session) = self.token.session.upgrade() else {
return Ok(());
};
session.undeclare_liveliness(self.token.state.id)
}
}

Expand All @@ -362,11 +360,7 @@ impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> {

#[zenoh_macros::unstable]
impl<'a> LivelinessToken<'a> {
/// Undeclare a [`LivelinessToken`].
///
/// LivelinessTokens are automatically closed when dropped,
/// but you may want to use this function to handle errors or
/// undeclare the LivelinessToken asynchronously.
/// Undeclare the [`LivelinessToken`].
///
/// # Examples
/// ```
Expand All @@ -388,31 +382,14 @@ impl<'a> LivelinessToken<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
UndeclarableSealed::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> {
fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> {
LivelinessTokenUndeclaration { token: self }
}
}
impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> {
type Res = LivelinessTokenUndeclaration<'a>;

#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
}
fn undeclare_inner(self, _: ()) -> Self::Res {
LivelinessTokenUndeclaration { token: self }
}
}

Expand Down Expand Up @@ -579,10 +556,11 @@ where
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.map(|sub_state| Subscriber {
subscriber: SubscriberInner {
session,
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: session.into(),
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: true,
},
handler,
})
Expand Down
75 changes: 24 additions & 51 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl fmt::Debug for PublisherState {
#[derive(Clone)]
pub enum PublisherRef<'a> {
Borrow(&'a Publisher<'a>),
Shared(std::sync::Arc<Publisher<'static>>),
Shared(Arc<Publisher<'static>>),
}

#[zenoh_macros::unstable]
Expand All @@ -105,8 +105,6 @@ impl std::fmt::Debug for PublisherRef<'_> {

/// A publisher that allows to send data through a stream.
///
/// Publishers are automatically undeclared when dropped.
///
/// # Examples
/// ```
/// # #[tokio::main]
Expand Down Expand Up @@ -146,7 +144,6 @@ pub struct Publisher<'a> {
pub(crate) destination: Locality,
#[cfg(feature = "unstable")]
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
}

impl<'a> Publisher<'a> {
Expand Down Expand Up @@ -347,7 +344,7 @@ impl<'a> Publisher<'a> {
}
}

/// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
/// Undeclare the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
///
/// # Examples
/// ```
Expand Down Expand Up @@ -462,8 +459,10 @@ impl PublisherDeclarations for std::sync::Arc<Publisher<'static>> {
}
}

impl<'a> UndeclarableSealed<(), PublisherUndeclaration<'a>> for Publisher<'a> {
fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> {
impl<'a> UndeclarableSealed<()> for Publisher<'a> {
type Res = PublisherUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
PublisherUndeclaration { publisher: self }
}
}
Expand Down Expand Up @@ -491,9 +490,7 @@ impl Resolvable for PublisherUndeclaration<'_> {
}

impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.publisher.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
#[cfg(feature = "unstable")]
self.publisher.undeclare_matching_listeners()?;
self.publisher
Expand All @@ -513,11 +510,8 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
}
}

Expand Down Expand Up @@ -922,7 +916,6 @@ where
listener: MatchingListenerInner {
publisher: self.publisher,
state,
undeclare_on_drop: true,
},
receiver,
})
Expand All @@ -947,7 +940,7 @@ where
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerState {
pub(crate) id: Id,
pub(crate) current: std::sync::Mutex<bool>,
pub(crate) current: Mutex<bool>,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
pub(crate) callback: Callback<'static, MatchingStatus>,
Expand All @@ -966,8 +959,7 @@ impl std::fmt::Debug for MatchingListenerState {
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
undeclare_on_drop: bool,
pub(crate) state: Arc<MatchingListenerState>,
}

#[zenoh_macros::unstable]
Expand All @@ -979,15 +971,20 @@ impl<'a> MatchingListenerInner<'a> {
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
impl<'a> UndeclarableSealed<()> for MatchingListenerInner<'a> {
type Res = MatchingListenerUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
MatchingListenerUndeclaration { subscriber: self }
}
}

/// A listener that sends notifications when the [`MatchingStatus`] of a
/// publisher changes.
///
/// Matching litsteners run in background until the publisher is undeclared.
/// They can be manually undeclared, but will not be undeclared on drop.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand All @@ -1014,10 +1011,7 @@ pub struct MatchingListener<'a, Receiver> {

#[zenoh_macros::unstable]
impl<'a, Receiver> MatchingListener<'a, Receiver> {
/// Close a [`MatchingListener`].
///
/// MatchingListeners are automatically closed when dropped, but you may want to use this function to handle errors or
/// close the MatchingListener asynchronously.
/// Undeclare the [`MatchingListener`].
///
/// # Examples
/// ```
Expand All @@ -1035,19 +1029,13 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
}

/// Make the matching listener run in background, until the publisher is undeclared.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// The matching listener will be undeclared as part of publisher undeclaration.
self.listener.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
impl<'a, T> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListener<'a, T> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
impl<'a, T> UndeclarableSealed<()> for MatchingListener<'a, T> {
type Res = MatchingListenerUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
UndeclarableSealed::undeclare_inner(self.listener, ())
}
}
Expand Down Expand Up @@ -1079,9 +1067,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.subscriber.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
Expand All @@ -1100,19 +1086,6 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
}
}

#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
.session
.undeclare_matches_listener_inner(self.state.id);
}
}
}

#[cfg(test)]
mod tests {
use zenoh_config::Config;
Expand Down
Loading

0 comments on commit 1bd2700

Please sign in to comment.