diff --git a/Cargo.lock b/Cargo.lock index 6c5b5402c..e47d08dd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5072,7 +5072,6 @@ dependencies = [ "phf", "rand 0.8.5", "rustc_version 0.4.1", - "scopeguard", "serde", "serde_json", "socket2 0.5.7", diff --git a/Cargo.toml b/Cargo.toml index de1ca0468..de23d2eea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,6 @@ rustls-pemfile = "2.1.3" rustls-webpki = "0.102.8" rustls-pki-types = "1.8.0" schemars = { version = "0.8.21", features = ["either"] } -scopeguard = "1.2.0" secrecy = { version = "0.8.0", features = ["serde", "alloc"] } serde = { version = "1.0.210", default-features = false, features = [ "derive", diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 30dae6057..6a998b296 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -84,7 +84,6 @@ paste = { workspace = true } petgraph = { workspace = true } phf = { workspace = true } rand = { workspace = true, features = ["default"] } -scopeguard = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } socket2 = { workspace = true } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 1264cf39d..e4a879235 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -446,7 +446,8 @@ impl Wait for PublisherBuilder<'_, '_> { .declare_publisher_inner(key_expr.clone(), self.destination)?; Ok(Publisher { session: self.session.downgrade(), - cache: AtomicU64::new(0), + // TODO use constants here + cache: AtomicU64::new(0b11), id, key_expr, encoding: self.encoding, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 65a40feb4..f0a2b1b29 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2153,8 +2153,8 @@ impl SessionInner { #[cfg(feature = "unstable")] source_info: SourceInfo, attachment: Option, ) -> ZResult<()> { - const NO_REMOTE_FLAG: u64 = 0b01; - const NO_LOCAL_FLAG: u64 = 0b10; + const REMOTE_TAG: u64 = 0b01; + const LOCAL_TAG: u64 = 0b10; const VERSION_SHIFT: u64 = 2; trace!("write({:?}, [...])", key_expr); let state = zread!(self.state); @@ -2163,26 +2163,21 @@ impl SessionInner { .as_ref() .cloned() .ok_or(SessionClosedError)?; - let mut cached = 0; - let mut update_cache = None; + let mut cached = REMOTE_TAG | LOCAL_TAG; + let mut to_cache = REMOTE_TAG | LOCAL_TAG; if let Some(cache) = cache { - let c = cache.load(Ordering::Relaxed); - let version = c >> VERSION_SHIFT; + cached = cache.load(Ordering::Relaxed); + let version = cached >> VERSION_SHIFT; if version == state.subscription_version { - cached = c; + to_cache = cached; } else { - cached = (state.subscription_version << VERSION_SHIFT); + to_cache = (state.subscription_version << VERSION_SHIFT) | REMOTE_TAG | LOCAL_TAG; } - update_cache = Some(scopeguard::guard((), |_| { - if cached != c { - let _ = cache.compare_exchange(c, cached, Ordering::Relaxed, Ordering::Relaxed); - } - })); } drop(state); let timestamp = timestamp.or_else(|| self.runtime.new_timestamp()); let wire_expr = key_expr.to_wire(self); - if (cached & NO_REMOTE_FLAG) == 0 && destination != Locality::SessionLocal { + if (to_cache & REMOTE_TAG) != 0 && destination != Locality::SessionLocal { let remote = primitives.route_data( Push { wire_expr: wire_expr.to_owned(), @@ -2224,10 +2219,10 @@ impl SessionInner { Reliability::DEFAULT, ); if !remote { - cached |= NO_REMOTE_FLAG + to_cache &= !REMOTE_TAG; } } - if (cached & NO_LOCAL_FLAG) == 0 && destination != Locality::Remote { + if (to_cache & LOCAL_TAG) != 0 && destination != Locality::Remote { let data_info = DataInfo { kind, encoding: Some(encoding), @@ -2252,9 +2247,12 @@ impl SessionInner { attachment, ); if !local { - cached |= NO_LOCAL_FLAG; + to_cache &= !LOCAL_TAG; } } + if let Some(cache) = cache.filter(|_| to_cache != cached) { + let _ = cache.compare_exchange(cached, to_cache, Ordering::Relaxed, Ordering::Relaxed); + } Ok(()) } @@ -2565,10 +2563,10 @@ impl Primitives for WeakSession { } zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => { trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr); - let mut state = zwrite!(self.state); - state.subscription_version += 1; #[cfg(feature = "unstable")] { + let mut state = zwrite!(self.state); + state.subscription_version += 1; if state.primitives.is_none() { return; // Session closing or closed }