Skip to content

Commit

Permalink
feat: use pseudo-weak session with the same perf than arc
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 4, 2024
1 parent 5dfd6f3 commit f46c50e
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 159 deletions.
2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::time::Duration;

use zenoh::{config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{config::Config, key_expr::KeyExpr};

#[tokio::main]
async fn main() {
Expand Down
1 change: 0 additions & 1 deletion ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration};
use zenoh::{
config::Config,
key_expr::KeyExpr,
prelude::*,
query::{QueryTarget, Selector},
};

Expand Down
46 changes: 15 additions & 31 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
sync::{Arc, Weak},
sync::Arc,
};

use zenoh_core::{Result as ZResult, Wait};
Expand All @@ -34,7 +34,7 @@ use super::{
sample::{DataInfo, Locality, SampleKind},
subscriber::SubscriberKind,
};
use crate::api::session::SessionInner;
use crate::api::session::WeakSession;

lazy_static::lazy_static!(
static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
Expand All @@ -44,29 +44,25 @@ lazy_static::lazy_static!(
static ref KE_LINK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("link") };
);

pub(crate) fn init(session: &Arc<SessionInner>) {
pub(crate) fn init(session: WeakSession) {
if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) {
let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR)
.to_wire(session)
.to_wire(&session)
.to_owned();

let _admin_qabl = session.declare_queryable_inner(
&admin_key,
true,
Locality::SessionLocal,
Arc::new({
let session = Arc::downgrade(session);
move |q| {
if let Some(session) = Weak::upgrade(&session) {
on_admin_query(&session, q)
}
}
let session = session.clone();
move |q| on_admin_query(&session, q)
}),
);
}
}

pub(crate) fn on_admin_query(session: &SessionInner, query: Query) {
pub(crate) fn on_admin_query(session: &WeakSession, query: Query) {
fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) {
let zid = peer.zid.to_string();
if let Ok(zid) = keyexpr::new(&zid) {
Expand Down Expand Up @@ -128,11 +124,11 @@ pub(crate) fn on_admin_query(session: &SessionInner, query: Query) {

#[derive(Clone)]
pub(crate) struct Handler {
pub(crate) session: Weak<SessionInner>,
pub(crate) session: WeakSession,
}

impl Handler {
pub(crate) fn new(session: Weak<SessionInner>) -> Self {
pub(crate) fn new(session: WeakSession) -> Self {
Self { session }
}
}
Expand All @@ -159,10 +155,7 @@ impl TransportMulticastEventHandler for Handler {
&self,
peer: zenoh_transport::TransportPeer,
) -> ZResult<Arc<dyn TransportPeerEventHandler>> {
let Some(session) = Weak::upgrade(&self.session) else {
bail!("session closed");
};
if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) {
if let Ok(own_zid) = keyexpr::new(&self.session.runtime.zid().to_string()) {
if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) {
let expr = WireExpr::from(
&(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid),
Expand All @@ -172,7 +165,7 @@ impl TransportMulticastEventHandler for Handler {
encoding: Some(Encoding::APPLICATION_JSON),
..Default::default()
};
session.execute_subscriber_callbacks(
self.session.execute_subscriber_callbacks(
true,
&expr,
Some(info),
Expand Down Expand Up @@ -205,7 +198,7 @@ impl TransportMulticastEventHandler for Handler {

pub(crate) struct PeerHandler {
pub(crate) expr: WireExpr<'static>,
pub(crate) session: Weak<SessionInner>,
pub(crate) session: WeakSession,
}

impl TransportPeerEventHandler for PeerHandler {
Expand All @@ -214,16 +207,13 @@ impl TransportPeerEventHandler for PeerHandler {
}

fn new_link(&self, link: zenoh_link::Link) {
let Some(session) = Weak::upgrade(&self.session) else {
return;
};
let mut s = DefaultHasher::new();
link.hash(&mut s);
let info = DataInfo {
encoding: Some(Encoding::APPLICATION_JSON),
..Default::default()
};
session.execute_subscriber_callbacks(
self.session.execute_subscriber_callbacks(
true,
&self
.expr
Expand All @@ -239,16 +229,13 @@ impl TransportPeerEventHandler for PeerHandler {
}

fn del_link(&self, link: zenoh_link::Link) {
let Some(session) = Weak::upgrade(&self.session) else {
return;
};
let mut s = DefaultHasher::new();
link.hash(&mut s);
let info = DataInfo {
kind: SampleKind::Delete,
..Default::default()
};
session.execute_subscriber_callbacks(
self.session.execute_subscriber_callbacks(
true,
&self
.expr
Expand All @@ -266,14 +253,11 @@ impl TransportPeerEventHandler for PeerHandler {
fn closing(&self) {}

fn closed(&self) {
let Some(session) = Weak::upgrade(&self.session) else {
return;
};
let info = DataInfo {
kind: SampleKind::Delete,
..Default::default()
};
session.execute_subscriber_callbacks(
self.session.execute_subscriber_callbacks(
true,
&self.expr,
Some(info),
Expand Down
12 changes: 5 additions & 7 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{
future::{IntoFuture, Ready},
sync::Arc,
};
use std::future::{IntoFuture, Ready};

use zenoh_core::{Resolvable, Result as ZResult, Wait};
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -333,7 +330,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
Ok(Publisher {
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: Arc::downgrade(&self.session.0),
session: self.session.downgrade(),
id: 0, // This is a one shot Publisher
key_expr: self.key_expr?,
encoding: self.encoding,
Expand Down Expand Up @@ -394,7 +391,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
Ok(Publisher {
#[cfg(feature = "unstable")]
session_id: self.session.0.runtime.zid(),
session: Arc::downgrade(&self.session.0),
session: self.session.downgrade(),
id,
key_expr,
encoding: self.encoding,
Expand All @@ -404,7 +401,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
reliability: self.reliability,
#[cfg(feature = "unstable")]matching_listeners: Default::default(),
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
Expand Down
15 changes: 6 additions & 9 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
convert::TryInto,
future::{IntoFuture, Ready},
mem::size_of,
sync::{Arc, Weak},
sync::Arc,
time::Duration,
};

Expand All @@ -33,7 +33,7 @@ use super::{
subscriber::{Subscriber, SubscriberInner},
Id,
};
use crate::api::session::SessionInner;
use crate::api::session::WeakSession;

/// A structure with functions to declare a
/// [`LivelinessToken`](LivelinessToken), query
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
.0
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
session: Arc::downgrade(&self.session.0),
session: self.session.downgrade(),
state: tok_state,
undeclare_on_drop: true,
})
Expand Down Expand Up @@ -311,7 +311,7 @@ pub(crate) struct LivelinessTokenState {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LivelinessToken {
session: Weak<SessionInner>,
session: WeakSession,
state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}
Expand Down Expand Up @@ -388,10 +388,7 @@ impl LivelinessToken {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
match self.session.upgrade() {
Some(session) => session.undeclare_liveliness(self.state.id),
None => Ok(()),
}
self.session.undeclare_liveliness(self.state.id)
}
}

Expand Down Expand Up @@ -581,7 +578,7 @@ where
inner: SubscriberInner {
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: Arc::downgrade(&self.session.0),
session: self.session.downgrade(),
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
// `size_of::<Handler::Handler>() == 0` means callback-only subscriber
Expand Down
37 changes: 13 additions & 24 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::{
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::{Arc, Weak},
task::{Context, Poll},
};

Expand All @@ -36,7 +35,7 @@ use {
handlers::{Callback, DefaultHandler, IntoHandler},
sample::SourceInfo,
},
std::{collections::HashSet, sync::Mutex},
std::{collections::HashSet, sync::Arc, sync::Mutex},
zenoh_config::wrappers::EntityGlobalId,
zenoh_config::ZenohId,
zenoh_protocol::core::EntityGlobalIdProto,
Expand All @@ -54,7 +53,7 @@ use super::{
session::UndeclarableSealed,
};
use crate::{
api::{session::SessionInner, subscriber::SubscriberKind, Id},
api::{session::WeakSession, subscriber::SubscriberKind, Id},
net::primitives::Primitives,
};

Expand Down Expand Up @@ -109,7 +108,7 @@ impl fmt::Debug for PublisherState {
pub struct Publisher<'a> {
#[cfg(feature = "unstable")]
pub(crate) session_id: ZenohId,
pub(crate) session: Weak<SessionInner>,
pub(crate) session: WeakSession,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) encoding: Encoding,
Expand All @@ -125,12 +124,6 @@ pub struct Publisher<'a> {
}

impl<'a> Publisher<'a> {
fn session(&self) -> ZResult<Arc<SessionInner>> {
self.session
.upgrade()
.ok_or_else(|| zerror!("session closed").into())
}

/// Returns the [`EntityGlobalId`] of this Publisher.
///
/// # Examples
Expand Down Expand Up @@ -256,7 +249,7 @@ impl<'a> Publisher<'a> {
#[zenoh_macros::unstable]
pub fn matching_status(&self) -> impl Resolve<ZResult<MatchingStatus>> + '_ {
zenoh_core::ResolveFuture::new(async move {
self.session()?
self.session
.matching_status(self.key_expr(), self.destination)
})
}
Expand Down Expand Up @@ -312,17 +305,14 @@ impl<'a> Publisher<'a> {
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
let Ok(session) = self.session() else {
return Ok(());
};
#[cfg(feature = "unstable")]
{
let ids: Vec<Id> = zlock!(self.matching_listeners).drain().collect();
for id in ids {
session.undeclare_matches_listener_inner(id)?
self.session.undeclare_matches_listener_inner(id)?
}
}
session.undeclare_publisher_inner(self.id)
self.session.undeclare_publisher_inner(self.id)
}
}

Expand Down Expand Up @@ -430,17 +420,16 @@ impl Publisher<'_> {
attachment: Option<ZBytes>,
) -> ZResult<()> {
tracing::trace!("write({:?}, [...])", &self.key_expr);
let session = self.session()?;
let primitives = zread!(session.state).primitives()?;
let primitives = zread!(self.session.state).primitives()?;
let timestamp = if timestamp.is_none() {
session.runtime.new_timestamp()
self.session.runtime.new_timestamp()
} else {
timestamp
};
if self.destination != Locality::SessionLocal {
primitives.send_push(
Push {
wire_expr: self.key_expr.to_wire(&session).to_owned(),
wire_expr: self.key_expr.to_wire(&self.session).to_owned(),
ext_qos: ext::QoSType::new(
self.priority.into(),
self.congestion_control,
Expand Down Expand Up @@ -493,9 +482,9 @@ impl Publisher<'_> {
)),
};

session.execute_subscriber_callbacks(
self.session.execute_subscriber_callbacks(
true,
&self.key_expr.to_wire(&session),
&self.key_expr.to_wire(&self.session),
Some(data_info),
payload.into(),
SubscriberKind::Subscriber,
Expand Down Expand Up @@ -779,7 +768,7 @@ where
let (callback, receiver) = self.handler.into_handler();
let state = self
.publisher
.session()?
.session
.declare_matches_listener_inner(self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
Ok(MatchingListener {
Expand Down Expand Up @@ -941,7 +930,7 @@ impl Wait for MatchingListenerUndeclaration<'_> {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
.session()?
.session
.undeclare_matches_listener_inner(self.subscriber.state.id)
}
}
Expand Down
Loading

0 comments on commit f46c50e

Please sign in to comment.