Skip to content

Commit

Permalink
Revert "refactor: refactor"
Browse files Browse the repository at this point in the history
This reverts commit 1753654.
  • Loading branch information
wyfo committed Dec 11, 2024
1 parent 6bf1b01 commit 704fc19
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 23 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ rustls-pemfile = "2.1.3"
rustls-webpki = "0.102.8"
rustls-pki-types = "1.8.0"
schemars = { version = "0.8.21", features = ["either"] }
scopeguard = "1.2.0"
secrecy = { version = "0.8.0", features = ["serde", "alloc"] }
serde = { version = "1.0.210", default-features = false, features = [
"derive",
Expand Down
1 change: 0 additions & 1 deletion zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ paste = { workspace = true }
petgraph = { workspace = true }
phf = { workspace = true }
rand = { workspace = true, features = ["default"] }
scopeguard = { workspace = true }
serde = { workspace = true, features = ["default"] }
serde_json = { workspace = true }
socket2 = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ impl Wait for PublisherBuilder<'_, '_> {
.declare_publisher_inner(key_expr.clone(), self.destination)?;
Ok(Publisher {
session: self.session.downgrade(),
cache: AtomicU64::new(0),
// TODO use constants here
cache: AtomicU64::new(0b11),
id,
key_expr,
encoding: self.encoding,
Expand Down
36 changes: 17 additions & 19 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2153,8 +2153,8 @@ impl SessionInner {
#[cfg(feature = "unstable")] source_info: SourceInfo,
attachment: Option<ZBytes>,
) -> ZResult<()> {
const NO_REMOTE_FLAG: u64 = 0b01;
const NO_LOCAL_FLAG: u64 = 0b10;
const REMOTE_TAG: u64 = 0b01;
const LOCAL_TAG: u64 = 0b10;
const VERSION_SHIFT: u64 = 2;
trace!("write({:?}, [...])", key_expr);
let state = zread!(self.state);
Expand All @@ -2163,26 +2163,21 @@ impl SessionInner {
.as_ref()
.cloned()
.ok_or(SessionClosedError)?;
let mut cached = 0;
let mut update_cache = None;
let mut cached = REMOTE_TAG | LOCAL_TAG;
let mut to_cache = REMOTE_TAG | LOCAL_TAG;
if let Some(cache) = cache {
let c = cache.load(Ordering::Relaxed);
let version = c >> VERSION_SHIFT;
cached = cache.load(Ordering::Relaxed);
let version = cached >> VERSION_SHIFT;
if version == state.subscription_version {
cached = c;
to_cache = cached;
} else {
cached = (state.subscription_version << VERSION_SHIFT);
to_cache = (state.subscription_version << VERSION_SHIFT) | REMOTE_TAG | LOCAL_TAG;
}
update_cache = Some(scopeguard::guard((), |_| {
if cached != c {
let _ = cache.compare_exchange(c, cached, Ordering::Relaxed, Ordering::Relaxed);
}
}));
}
drop(state);
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if (cached & NO_REMOTE_FLAG) == 0 && destination != Locality::SessionLocal {
if (to_cache & REMOTE_TAG) != 0 && destination != Locality::SessionLocal {
let remote = primitives.route_data(
Push {
wire_expr: wire_expr.to_owned(),
Expand Down Expand Up @@ -2224,10 +2219,10 @@ impl SessionInner {
Reliability::DEFAULT,
);
if !remote {
cached |= NO_REMOTE_FLAG
to_cache &= !REMOTE_TAG;
}
}
if (cached & NO_LOCAL_FLAG) == 0 && destination != Locality::Remote {
if (to_cache & LOCAL_TAG) != 0 && destination != Locality::Remote {
let data_info = DataInfo {
kind,
encoding: Some(encoding),
Expand All @@ -2252,9 +2247,12 @@ impl SessionInner {
attachment,
);
if !local {
cached |= NO_LOCAL_FLAG;
to_cache &= !LOCAL_TAG;
}
}
if let Some(cache) = cache.filter(|_| to_cache != cached) {
let _ = cache.compare_exchange(cached, to_cache, Ordering::Relaxed, Ordering::Relaxed);
}
Ok(())
}

Expand Down Expand Up @@ -2565,10 +2563,10 @@ impl Primitives for WeakSession {
}
zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => {
trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr);
let mut state = zwrite!(self.state);
state.subscription_version += 1;
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
state.subscription_version += 1;
if state.primitives.is_none() {
return; // Session closing or closed
}
Expand Down

0 comments on commit 704fc19

Please sign in to comment.