Skip to content

Commit

Permalink
fix: use weak everywhere!
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 3, 2024
1 parent 2efff1f commit 0ab9e00
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 32 deletions.
13 changes: 10 additions & 3 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::future::{IntoFuture, Ready};
use std::{
future::{IntoFuture, Ready},
sync::Arc,
};

use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::{core::CongestionControl, network::Mapping};
Expand Down Expand Up @@ -299,7 +302,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
// internal function for performing the publication
fn create_one_shot_publisher(self) -> ZResult<Publisher<'b>> {
Ok(Publisher {
session: self.session.clone().0,
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: Arc::downgrade(&self.session.0),
id: 0, // This is a one shot Publisher
key_expr: self.key_expr?,
encoding: self.encoding,
Expand Down Expand Up @@ -356,7 +361,9 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
.0
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.0.clone(),
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: Arc::downgrade(&self.session.0),
id,
key_expr,
encoding: self.encoding,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/api/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl<'a> IntoFuture for PeersZenohIdBuilder<'a> {
}
}

/// Struct returned by [`Session::info()`](crate::session::SessionDeclarations::info) which allows
/// Struct returned by [`Session::info()`](crate::Session::info) which allows
/// to access information about the current zenoh [`Session`](crate::Session).
///
/// # Examples
Expand Down
43 changes: 26 additions & 17 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::Arc,
sync::{Arc, Weak},
task::{Context, Poll},
};

Expand All @@ -38,6 +38,7 @@ use {
},
std::{collections::HashSet, sync::Mutex},
zenoh_config::wrappers::EntityGlobalId,
zenoh_config::ZenohId,
zenoh_protocol::core::EntityGlobalIdProto,
};

Expand Down Expand Up @@ -106,7 +107,9 @@ impl fmt::Debug for PublisherState {
/// ```
#[derive(Debug, Clone)]
pub struct Publisher<'a> {
pub(crate) session: Arc<SessionInner>,
#[cfg(feature = "unstable")]
pub(crate) session_id: ZenohId,
pub(crate) session: Weak<SessionInner>,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) encoding: Encoding,
Expand All @@ -120,6 +123,12 @@ pub struct Publisher<'a> {
}

impl<'a> Publisher<'a> {
fn session(&self) -> ZResult<Arc<SessionInner>> {
self.session
.upgrade()
.ok_or_else(|| zerror!("session closed").into())
}

/// Returns the [`EntityGlobalId`] of this Publisher.
///
/// # Examples
Expand All @@ -138,7 +147,7 @@ impl<'a> Publisher<'a> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.session.runtime.zid().into(),
zid: self.session_id.into(),
eid: self.id,
}
.into()
Expand Down Expand Up @@ -245,7 +254,7 @@ impl<'a> Publisher<'a> {
#[zenoh_macros::unstable]
pub fn matching_status(&self) -> impl Resolve<ZResult<MatchingStatus>> + '_ {
zenoh_core::ResolveFuture::new(async move {
self.session
self.session()?
.matching_status(self.key_expr(), self.destination)
})
}
Expand Down Expand Up @@ -301,14 +310,17 @@ impl<'a> Publisher<'a> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
let Ok(session) = self.session() else {
return Ok(());
};
#[cfg(feature = "unstable")]
{
let ids: Vec<Id> = zlock!(self.matching_listeners).drain().collect();
for id in ids {
self.session.undeclare_matches_listener_inner(id)?
session.undeclare_matches_listener_inner(id)?
}
}
self.session.undeclare_publisher_inner(self.id)
session.undeclare_publisher_inner(self.id)
}
}

Expand Down Expand Up @@ -415,19 +427,16 @@ impl Publisher<'_> {
attachment: Option<ZBytes>,
) -> ZResult<()> {
tracing::trace!("write({:?}, [...])", &self.key_expr);
let primitives = zread!(self.session.state)
.primitives
.as_ref()
.unwrap()
.clone();
let session = self.session()?;
let primitives = zread!(session.state).primitives()?;
let timestamp = if timestamp.is_none() {
self.session.runtime.new_timestamp()
session.runtime.new_timestamp()
} else {
timestamp
};
if self.destination != Locality::SessionLocal {
primitives.send_push(Push {
wire_expr: self.key_expr.to_wire(&self.session).to_owned(),
wire_expr: self.key_expr.to_wire(&session).to_owned(),
ext_qos: ext::QoSType::new(
self.priority.into(),
self.congestion_control,
Expand Down Expand Up @@ -475,9 +484,9 @@ impl Publisher<'_> {
)),
};

self.session.execute_subscriber_callbacks(
session.execute_subscriber_callbacks(
true,
&self.key_expr.to_wire(&self.session),
&self.key_expr.to_wire(&session),
Some(data_info),
payload.into(),
SubscriberKind::Subscriber,
Expand Down Expand Up @@ -759,7 +768,7 @@ where
let (callback, receiver) = self.handler.into_handler();
let state = self
.publisher
.session
.session()?
.declare_matches_listener_inner(self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
Ok(MatchingListener {
Expand Down Expand Up @@ -921,7 +930,7 @@ impl Wait for MatchingListenerUndeclaration<'_> {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
.session
.session()?
.undeclare_matches_listener_inner(self.subscriber.state.id)
}
}
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> {
/// A queryable that provides data through a [`Handler`](crate::handlers::IntoHandler).
///
/// Queryables can be created from a zenoh [`Session`](crate::Session)
/// with the [`declare_queryable`](crate::session::SessionDeclarations::declare_queryable) function
/// with the [`declare_queryable`](crate::Session::declare_queryable) function
/// and the [`with`](QueryableBuilder::with) function
/// of the resulting builder.
///
Expand Down Expand Up @@ -863,10 +863,10 @@ impl<Handler> Queryable<Handler> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.inner.undeclare_on_drop = false;
match self.inner.session.upgrade() {
Some(session) => session.close_queryable(self.inner.state.id),
None => Ok(()),
}
let Some(session) = self.inner.session.upgrade() else {
return Ok(());
};
session.close_queryable(self.inner.state.id)
}
}

Expand Down
1 change: 1 addition & 0 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,7 @@ impl SessionInner {
Ok(())
})
}

pub(crate) fn declare_prefix<'a>(
&'a self,
prefix: &'a str,
Expand Down
10 changes: 4 additions & 6 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,10 @@ impl<Handler> Subscriber<Handler> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.inner.undeclare_on_drop = false;
match self.inner.session.upgrade() {
Some(session) => {
session.undeclare_subscriber_inner(self.inner.state.id, self.inner.kind)
}
None => Ok(()),
}
let Some(session) = self.inner.session.upgrade() else {
return Ok(());
};
session.undeclare_subscriber_inner(self.inner.state.id, self.inner.kind)
}
}

Expand Down

0 comments on commit 0ab9e00

Please sign in to comment.