Skip to content

Commit

Permalink
refactor: Handle incoming liveliness token declaration/undeclaration
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Mar 26, 2024
1 parent 92b8df6 commit 5b2ebbb
Showing 1 changed file with 83 additions and 2 deletions.
85 changes: 83 additions & 2 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use zenoh_protocol::core::EntityId;
use zenoh_protocol::network::declare::Interest;
#[cfg(feature = "unstable")]
use zenoh_protocol::network::declare::SubscriberId;
use zenoh_protocol::network::declare::TokenId;
use zenoh_protocol::network::AtomicRequestId;
use zenoh_protocol::network::DeclareInterest;
use zenoh_protocol::network::RequestId;
Expand Down Expand Up @@ -108,6 +109,9 @@ pub(crate) struct SessionState {
#[cfg(feature = "unstable")]
pub(crate) remote_subscribers: HashMap<SubscriberId, KeyExpr<'static>>,
pub(crate) publishers: HashMap<Id, PublisherState>,
#[cfg(feature = "unstable")]
pub(crate) remote_tokens: HashMap<TokenId, KeyExpr<'static>>,
//pub(crate) publications: Vec<OwnedKeyExpr>,
pub(crate) subscribers: HashMap<Id, Arc<SubscriberState>>,
pub(crate) queryables: HashMap<Id, Arc<QueryableState>>,
#[cfg(feature = "unstable")]
Expand All @@ -133,6 +137,9 @@ impl SessionState {
#[cfg(feature = "unstable")]
remote_subscribers: HashMap::new(),
publishers: HashMap::new(),
#[cfg(feature = "unstable")]
remote_tokens: HashMap::new(),
//publications: Vec::new(),
subscribers: HashMap::new(),
queryables: HashMap::new(),
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -2078,11 +2085,85 @@ impl Primitives for Session {
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
trace!("recv UndeclareQueryable {:?}", m.id);
}
DeclareBody::DeclareToken(m) => {

zenoh_protocol::network::DeclareBody::DeclareToken(m) => {
trace!("recv DeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
match state
.wireexpr_to_keyexpr(&m.wire_expr, false)
.map(|e| e.into_owned())
{
Ok(expr) => {
state.remote_tokens.insert(m.id, expr.clone());

// NOTE(fuzzypixelz): I didn't put
// self.update_status_up() here because it doesn't
// make sense. An application which declares a
// liveliness token is not a subscriber and thus
// doens't need to to be visible to publishers
// through .matching_status().

if expr
.as_str()
.starts_with(crate::liveliness::PREFIX_LIVELINESS)
{
drop(state);

self.handle_data(
false,
&m.wire_expr,
None,
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
}
}
Err(err) => {
log::error!("Received DeclareToken for unkown wire_expr: {}", err)
}
}
}
}
DeclareBody::UndeclareToken(m) => {
zenoh_protocol::network::DeclareBody::UndeclareToken(m) => {
trace!("recv UndeclareToken {:?}", m.id);
#[cfg(feature = "unstable")]
{
let mut state = zwrite!(self.state);
if let Some(expr) = state.remote_tokens.remove(&m.id) {
// NOTE(fuzzypixelz): I didn't put
// self.update_status_down() here because it doesn't
// make sense. An application which declares a
// liveliness token is not a subscriber and thus
// doens't need to to be visible to publishers
// through .matching_status().

if expr
.as_str()
.starts_with(crate::liveliness::PREFIX_LIVELINESS)
{
drop(state);

let data_info = DataInfo {
kind: SampleKind::Delete,
..Default::default()
};

self.handle_data(
false,
&m.ext_wire_expr.wire_expr,
Some(data_info),
ZBuf::default(),
#[cfg(feature = "unstable")]
None,
);
}
} else {
log::error!("Received UndeclareToken for unkown id: {}", m.id);
}
}
}
DeclareBody::DeclareInterest(m) => {
trace!("recv DeclareInterest {:?}", m.id);
Expand Down

0 comments on commit 5b2ebbb

Please sign in to comment.