diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 0e801f4522..7a072fd91f 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -61,6 +61,7 @@ use zenoh_protocol::core::EntityId; use zenoh_protocol::network::declare::Interest; #[cfg(feature = "unstable")] use zenoh_protocol::network::declare::SubscriberId; +use zenoh_protocol::network::declare::TokenId; use zenoh_protocol::network::AtomicRequestId; use zenoh_protocol::network::DeclareInterest; use zenoh_protocol::network::RequestId; @@ -108,6 +109,9 @@ pub(crate) struct SessionState { #[cfg(feature = "unstable")] pub(crate) remote_subscribers: HashMap>, pub(crate) publishers: HashMap, + #[cfg(feature = "unstable")] + pub(crate) remote_tokens: HashMap>, + //pub(crate) publications: Vec, pub(crate) subscribers: HashMap>, pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] @@ -133,6 +137,9 @@ impl SessionState { #[cfg(feature = "unstable")] remote_subscribers: HashMap::new(), publishers: HashMap::new(), + #[cfg(feature = "unstable")] + remote_tokens: HashMap::new(), + //publications: Vec::new(), subscribers: HashMap::new(), queryables: HashMap::new(), #[cfg(feature = "unstable")] @@ -2078,11 +2085,85 @@ impl Primitives for Session { zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); } - DeclareBody::DeclareToken(m) => { + + zenoh_protocol::network::DeclareBody::DeclareToken(m) => { trace!("recv DeclareToken {:?}", m.id); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + match state + .wireexpr_to_keyexpr(&m.wire_expr, false) + .map(|e| e.into_owned()) + { + Ok(expr) => { + state.remote_tokens.insert(m.id, expr.clone()); + + // NOTE(fuzzypixelz): I didn't put + // self.update_status_up() here because it doesn't + // make sense. An application which declares a + // liveliness token is not a subscriber and thus + // doens't need to to be visible to publishers + // through .matching_status(). + + if expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + drop(state); + + self.handle_data( + false, + &m.wire_expr, + None, + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); + } + } + Err(err) => { + log::error!("Received DeclareToken for unkown wire_expr: {}", err) + } + } + } } - DeclareBody::UndeclareToken(m) => { + zenoh_protocol::network::DeclareBody::UndeclareToken(m) => { trace!("recv UndeclareToken {:?}", m.id); + #[cfg(feature = "unstable")] + { + let mut state = zwrite!(self.state); + if let Some(expr) = state.remote_tokens.remove(&m.id) { + // NOTE(fuzzypixelz): I didn't put + // self.update_status_down() here because it doesn't + // make sense. An application which declares a + // liveliness token is not a subscriber and thus + // doens't need to to be visible to publishers + // through .matching_status(). + + if expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + drop(state); + + let data_info = DataInfo { + kind: SampleKind::Delete, + ..Default::default() + }; + + self.handle_data( + false, + &m.ext_wire_expr.wire_expr, + Some(data_info), + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); + } + } else { + log::error!("Received UndeclareToken for unkown id: {}", m.id); + } + } } DeclareBody::DeclareInterest(m) => { trace!("recv DeclareInterest {:?}", m.id);