diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 15b7fb4d7..afe2107e5 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -17,10 +17,11 @@ use std::{ collections::HashMap, convert::TryInto, fmt, + mem::ManuallyDrop, ops::Deref, sync::{ - atomic::{AtomicU16, Ordering}, - Arc, Mutex, RwLock, + atomic::{AtomicU16, AtomicUsize, Ordering}, + Arc, RwLock, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -527,7 +528,7 @@ impl Undeclarable for T where T: UndeclarableSealed {} pub(crate) struct SessionInner { /// See [`WeakSession`] doc - weak_counter: Mutex, + strong_counter: AtomicUsize, pub(crate) runtime: Runtime, pub(crate) state: RwLock, pub(crate) id: u16, @@ -573,16 +574,14 @@ impl fmt::Debug for Session { impl Clone for Session { fn clone(&self) -> Self { - let _weak = self.0.weak_counter.lock().unwrap(); + self.0.strong_counter.fetch_add(1, Ordering::Relaxed); Self(self.0.clone()) } } impl Drop for Session { fn drop(&mut self) { - let weak = self.0.weak_counter.lock().unwrap(); - if Arc::strong_count(&self.0) == *weak + /* the `Arc` currently dropped */ 1 { - drop(weak); + if self.0.strong_counter.fetch_sub(1, Ordering::Relaxed) == 1 { if let Err(error) = self.close().wait() { tracing::error!(error) } @@ -596,32 +595,17 @@ impl Drop for Session { /// When all `Session` instance are dropped, [`Session::close`] is be called and cleans /// the reference cycles, allowing the underlying `Arc` to be properly reclaimed. /// -/// The pseudo-weak algorithm relies on a counter wrapped in a mutex. It was indeed the simplest -/// to implement it, because atomic manipulations to achieve this semantic would not have been -/// trivial at all — what could happen if a pseudo-weak is cloned while the last session instance -/// is dropped? With a mutex, it's simple, and it works perfectly fine, as we don't care about the -/// performance penalty when it comes to session entities cloning/dropping. -/// /// (Although it was planed to be used initially, `Weak` was in fact causing errors in the session /// closing, because the primitive implementation seemed to be used in the closing operation.) +#[derive(Clone)] pub(crate) struct WeakSession(Arc); impl WeakSession { fn new(session: &Arc) -> Self { - let mut weak = session.weak_counter.lock().unwrap(); - *weak += 1; Self(session.clone()) } } -impl Clone for WeakSession { - fn clone(&self) -> Self { - let mut weak = self.0.weak_counter.lock().unwrap(); - *weak += 1; - Self(self.0.clone()) - } -} - impl fmt::Debug for WeakSession { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { self.0.fmt(f) @@ -636,13 +620,6 @@ impl Deref for WeakSession { } } -impl Drop for WeakSession { - fn drop(&mut self) { - let mut weak = self.0.weak_counter.lock().unwrap(); - *weak -= 1; - } -} - /// Error indicating the operation cannot proceed because the session is closed. /// /// It may be returned by operations like [`Session::get`] or [`Publisher::put`](crate::api::publisher::Publisher::put) when @@ -677,7 +654,7 @@ impl Session { publisher_qos.into(), )); let session = Session(Arc::new(SessionInner { - weak_counter: Mutex::new(0), + strong_counter: AtomicUsize::new(1), runtime: runtime.clone(), state, id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst),