diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index e60b0abf12..3470540d39 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -22,7 +22,7 @@ use std::any::Any; use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use zenoh_protocol::network::declare::{FinalInterest, InterestId}; +use zenoh_protocol::network::declare::{FinalInterest, Interest, InterestId}; use zenoh_protocol::network::{ext, Declare, DeclareBody}; use zenoh_protocol::zenoh::RequestBody; use zenoh_protocol::{ @@ -41,6 +41,7 @@ pub struct FaceState { #[cfg(feature = "stats")] pub(crate) stats: Option>, pub(crate) primitives: Arc, + pub(crate) local_interests: HashMap>, bool)>, pub(crate) remote_key_interests: HashMap>>, pub(crate) local_mappings: HashMap>, pub(crate) remote_mappings: HashMap>, @@ -70,6 +71,7 @@ impl FaceState { #[cfg(feature = "stats")] stats, primitives, + local_interests: HashMap::new(), remote_key_interests: HashMap::new(), local_mappings: HashMap::new(), remote_mappings: HashMap::new(), @@ -265,7 +267,10 @@ impl Primitives for Face { } } zenoh_protocol::network::DeclareBody::FinalInterest(m) => { - log::warn!("Received unsupported {m:?}") + get_mut_unchecked(&mut self.state.clone()) + .local_interests + .entry(m.id) + .and_modify(|interest| interest.2 = true); } zenoh_protocol::network::DeclareBody::UndeclareInterest(m) => { unregister_expr_interest(&self.tables, &mut self.state.clone(), m.id); diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index a9908f5f58..f18d7497f3 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -42,7 +42,9 @@ use std::{ sync::{atomic::AtomicU32, Arc}, }; use zenoh_config::WhatAmI; -use zenoh_protocol::network::declare::{queryable::ext::QueryableInfo, QueryableId, SubscriberId}; +use zenoh_protocol::network::declare::{ + queryable::ext::QueryableInfo, InterestId, QueryableId, SubscriberId, +}; use zenoh_protocol::network::Oam; use zenoh_result::ZResult; use zenoh_sync::get_mut_unchecked; @@ -282,6 +284,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness + remote_sub_interests: HashMap>>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_qabls: HashMap, (QueryableId, QueryableInfo)>, @@ -292,6 +295,7 @@ impl HatFace { fn new() -> Self { Self { next_id: AtomicU32::new(0), + remote_sub_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index fb932dfed9..cf0f51496e 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -20,12 +20,14 @@ use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; use crate::net::routing::hat::HatPubSubTrait; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; +use crate::KeyExpr; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_protocol::core::key_expr::OwnedKeyExpr; -use zenoh_protocol::network::declare::{InterestId, SubscriberId}; +use zenoh_protocol::network::declare::{Interest, InterestId, SubscriberId}; +use zenoh_protocol::network::{DeclareInterest, UndeclareInterest}; use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ @@ -244,24 +246,96 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { impl HatPubSubTrait for HatCode { fn declare_sub_interest( &self, - _tables: &mut Tables, - _face: &mut Arc, - _id: InterestId, - _res: Option<&mut Arc>, - _current: bool, - _future: bool, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + res: Option<&mut Arc>, + current: bool, + future: bool, _aggregate: bool, ) { - todo!() + face_hat_mut!(face) + .remote_sub_interests + .insert(id, res.as_ref().map(|res| (*res).clone())); + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami != WhatAmI::Client) + { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + let mut interest = Interest::KEYEXPRS + Interest::SUBSCRIBERS; + if current { + interest += Interest::CURRENT; + } + if future { + interest += Interest::FUTURE; + } + get_mut_unchecked(dst_face).local_interests.insert( + id, + (interest, res.as_ref().map(|res| (*res).clone()), !current), + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareInterest(DeclareInterest { + id, + interest, + wire_expr, + }), + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } } fn undeclare_sub_interest( &self, - _tables: &mut Tables, - _face: &mut Arc, - _id: InterestId, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, ) { - todo!() + if let Some(interest) = face_hat_mut!(face).remote_sub_interests.remove(&id) { + if !tables.faces.values().any(|f| { + f.whatami == WhatAmI::Client + && face_hat!(f) + .remote_sub_interests + .values() + .any(|i| *i == interest) + }) { + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami != WhatAmI::Client) + { + for id in dst_face + .local_interests + .keys() + .cloned() + .collect::>() + { + let (int, res, _) = dst_face.local_interests.get(&id).unwrap(); + if int.subscribers() && (*res == interest) { + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareInterest(UndeclareInterest { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + get_mut_unchecked(dst_face).local_interests.remove(&id); + } + } + } + } + } } fn declare_subscription( @@ -323,12 +397,51 @@ impl HatPubSubTrait for HatCode { } }; - if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.insert( - face.id, - (face.clone(), key_expr.to_owned(), NodeId::default()), - ); + for face in tables + .faces + .values() + .filter(|f| f.whatami != WhatAmI::Client) + { + if face + .local_interests + .values() + .any(|(interest, res, finalized)| { + *finalized + && interest.subscribers() + && res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) + }) + { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } } let res = Resource::get_resource(expr.prefix, expr.suffix); @@ -342,15 +455,7 @@ impl HatPubSubTrait for HatCode { let mres = mres.upgrade().unwrap(); for (sid, context) in &mres.session_ctxs { - if context.subs.is_some() - && match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => { - source_type == WhatAmI::Client - || context.face.whatami == WhatAmI::Client - } - } - { + if context.subs.is_some() && context.face.whatami == WhatAmI::Client { route.entry(*sid).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); (context.face.clone(), key_expr.to_owned(), NodeId::default()) diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 392c0bf8c1..9c2086eb90 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -25,6 +25,7 @@ use crate::{ handlers::{Callback, DefaultHandler, IntoHandler}, Id, }; +use std::fmt; use std::future::Ready; use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, SyncResolve}; use zenoh_protocol::network::push::ext; @@ -157,7 +158,7 @@ impl SyncResolve for PutBuilder<'_, '_> { let publisher = Publisher { session, #[cfg(feature = "unstable")] - eid: 0, // This is a one shot Publisher + id: 0, // This is a one shot Publisher key_expr: key_expr?, congestion_control, priority, @@ -193,6 +194,22 @@ use std::pin::Pin; use std::task::{Context, Poll}; use zenoh_result::Error; +pub(crate) struct PublisherState { + pub(crate) id: Id, + pub(crate) remote_id: Id, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, +} + +impl fmt::Debug for PublisherState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Publisher") + .field("id", &self.id) + .field("key_expr", &self.key_expr) + .finish() + } +} + #[zenoh_macros::unstable] #[derive(Clone)] pub enum PublisherRef<'a> { @@ -254,8 +271,7 @@ impl std::fmt::Debug for PublisherRef<'_> { #[derive(Debug, Clone)] pub struct Publisher<'a> { pub(crate) session: SessionRef<'a>, - #[cfg(feature = "unstable")] - pub(crate) eid: EntityId, + pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, @@ -283,7 +299,7 @@ impl<'a> Publisher<'a> { pub fn id(&self) -> EntityGlobalId { EntityGlobalId { zid: self.session.zid(), - eid: self.eid, + eid: self.id, } } @@ -588,11 +604,9 @@ impl Resolvable for PublisherUndeclaration<'_> { impl SyncResolve for PublisherUndeclaration<'_> { fn res_sync(mut self) -> ::To { let Publisher { - session, key_expr, .. + session, id: eid, .. } = &self.publisher; - session - .undeclare_publication_intent(key_expr.clone()) - .res_sync()?; + session.undeclare_publisher_inner(*eid)?; self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into(); Ok(()) } @@ -609,10 +623,7 @@ impl AsyncResolve for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { if !self.key_expr.is_empty() { - let _ = self - .session - .undeclare_publication_intent(self.key_expr.clone()) - .res_sync(); + let _ = self.session.undeclare_publisher_inner(self.id); } } } @@ -841,23 +852,19 @@ impl<'a, 'b> SyncResolve for PublisherBuilder<'a, 'b> { } } } - self.session - .declare_publication_intent(key_expr.clone()) - .res_sync()?; - #[cfg(feature = "unstable")] - let eid = self.session.runtime.next_id(); - let publisher = Publisher { - session: self.session, - #[cfg(feature = "unstable")] - eid, - key_expr, - congestion_control: self.congestion_control, - priority: self.priority, - is_express: self.is_express, - destination: self.destination, - }; - log::trace!("publish({:?})", publisher.key_expr); - Ok(publisher) + let session = self.session; + session + .declare_publisher_inner(key_expr.clone(), self.destination) + .map(|eid| Publisher { + session, + #[cfg(feature = "unstable")] + id: eid, + key_expr, + congestion_control: self.congestion_control, + priority: self.priority, + is_express: self.is_express, + destination: self.destination, + }) } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 496c6879ce..0e801f4522 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -57,10 +57,14 @@ use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; use zenoh_config::unwrap_or_default; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; +use zenoh_protocol::core::EntityId; +use zenoh_protocol::network::declare::Interest; #[cfg(feature = "unstable")] use zenoh_protocol::network::declare::SubscriberId; use zenoh_protocol::network::AtomicRequestId; +use zenoh_protocol::network::DeclareInterest; use zenoh_protocol::network::RequestId; +use zenoh_protocol::network::UndeclareInterest; use zenoh_protocol::zenoh::reply::ReplyBody; use zenoh_protocol::zenoh::Del; use zenoh_protocol::zenoh::Put; @@ -103,7 +107,7 @@ pub(crate) struct SessionState { pub(crate) remote_resources: HashMap, #[cfg(feature = "unstable")] pub(crate) remote_subscribers: HashMap>, - //pub(crate) publications: Vec, + pub(crate) publishers: HashMap, pub(crate) subscribers: HashMap>, pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] @@ -112,13 +116,13 @@ pub(crate) struct SessionState { pub(crate) matching_listeners: HashMap>, pub(crate) queries: HashMap, pub(crate) aggregated_subscribers: Vec, - //pub(crate) aggregated_publishers: Vec, + pub(crate) aggregated_publishers: Vec, } impl SessionState { pub(crate) fn new( aggregated_subscribers: Vec, - _aggregated_publishers: Vec, + aggregated_publishers: Vec, ) -> SessionState { SessionState { primitives: None, @@ -128,7 +132,7 @@ impl SessionState { remote_resources: HashMap::new(), #[cfg(feature = "unstable")] remote_subscribers: HashMap::new(), - //publications: Vec::new(), + publishers: HashMap::new(), subscribers: HashMap::new(), queryables: HashMap::new(), #[cfg(feature = "unstable")] @@ -137,7 +141,7 @@ impl SessionState { matching_listeners: HashMap::new(), queries: HashMap::new(), aggregated_subscribers, - //aggregated_publishers, + aggregated_publishers, } } } @@ -881,84 +885,103 @@ impl Session { }) } - /// Declare a publication for the given key expression. - /// - /// Puts that match the given key expression will only be sent on the network - /// if matching subscribers exist in the system. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression to publish - pub(crate) fn declare_publication_intent<'a>( - &'a self, - _key_expr: KeyExpr<'a>, - ) -> impl Resolve> + 'a { - ResolveClosure::new(move || { - // log::trace!("declare_publication({:?})", key_expr); - // let mut state = zwrite!(self.state); - // if !state.publications.iter().any(|p| **p == **key_expr) { - // let declared_pub = if let Some(join_pub) = state - // .aggregated_publishers - // .iter() - // .find(|s| s.includes(&key_expr)) - // { - // let joined_pub = state.publications.iter().any(|p| join_pub.includes(p)); - // (!joined_pub).then(|| join_pub.clone().into()) - // } else { - // Some(key_expr.clone()) - // }; - // state.publications.push(key_expr.into()); - - // if let Some(res) = declared_pub { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // drop(state); - // primitives.decl_publisher(&res.to_wire(self), None); - // } - // } - Ok(()) - }) + pub(crate) fn declare_publisher_inner( + &self, + key_expr: KeyExpr, + destination: Locality, + ) -> ZResult { + let mut state = zwrite!(self.state); + log::trace!("declare_publisher({:?})", key_expr); + let id = self.runtime.next_id(); + + let mut pub_state = PublisherState { + id, + remote_id: id, + key_expr: key_expr.clone().into_owned(), + destination, + }; + + let declared_pub = (destination != Locality::SessionLocal) + .then(|| { + match state + .aggregated_publishers + .iter() + .find(|s| s.includes(&key_expr)) + { + Some(join_pub) => { + if let Some(joined_pub) = state.publishers.values().find(|p| { + p.destination != Locality::SessionLocal + && join_pub.includes(&p.key_expr) + }) { + pub_state.remote_id = joined_pub.remote_id; + None + } else { + Some(join_pub.clone().into()) + } + } + None => { + if let Some(twin_pub) = state.publishers.values().find(|p| { + p.destination != Locality::SessionLocal && p.key_expr == key_expr + }) { + pub_state.remote_id = twin_pub.remote_id; + None + } else { + Some(key_expr.clone()) + } + } + } + }) + .flatten(); + + state.publishers.insert(id, pub_state); + + if let Some(res) = declared_pub { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareInterest(DeclareInterest { + id, + interest: Interest::CURRENT + + Interest::FUTURE + + Interest::KEYEXPRS + + Interest::SUBSCRIBERS, + wire_expr: Some(res.to_wire(self).to_owned()), + }), + }); + } + Ok(id) } - /// Undeclare a publication previously declared - /// with [`declare_publication`](Session::declare_publication). - /// - /// # Arguments - /// - /// * `key_expr` - The key expression of the publication to undeclarte - pub(crate) fn undeclare_publication_intent<'a>( - &'a self, - _key_expr: KeyExpr<'a>, - ) -> impl Resolve> + 'a { - ResolveClosure::new(move || { - // let mut state = zwrite!(self.state); - // if let Some(idx) = state.publications.iter().position(|p| **p == *key_expr) { - // trace!("undeclare_publication({:?})", key_expr); - // state.publications.remove(idx); - // match state - // .aggregated_publishers - // .iter() - // .find(|s| s.includes(&key_expr)) - // { - // Some(join_pub) => { - // let joined_pub = state.publications.iter().any(|p| join_pub.includes(p)); - // if !joined_pub { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // let key_expr = WireExpr::from(join_pub).to_owned(); - // drop(state); - // primitives.forget_publisher(&key_expr, None); - // } - // } - // None => { - // let primitives = state.primitives.as_ref().unwrap().clone(); - // drop(state); - // primitives.forget_publisher(&key_expr.to_wire(self), None); - // } - // }; - // } else { - // bail!("Unable to find publication") - // } + pub(crate) fn undeclare_publisher_inner(&self, pid: Id) -> ZResult<()> { + let mut state = zwrite!(self.state); + if let Some(pub_state) = state.publishers.remove(&pid) { + trace!("undeclare_publisher({:?})", pub_state); + if pub_state.destination != Locality::SessionLocal { + // Note: there might be several publishers on the same KeyExpr. + // Before calling forget_publishers(key_expr), check if this was the last one. + if !state.publishers.values().any(|p| { + p.destination != Locality::SessionLocal && p.remote_id == pub_state.remote_id + }) { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareInterest(UndeclareInterest { + id: pub_state.remote_id, + ext_wire_expr: WireExprType::null(), + }), + }); + } + } Ok(()) - }) + } else { + Err(zerror!("Unable to find publisher").into()) + } } pub(crate) fn declare_subscriber_inner( @@ -970,7 +993,7 @@ impl Session { info: &SubscriberInfo, ) -> ZResult> { let mut state = zwrite!(self.state); - log::trace!("subscribe({:?})", key_expr); + log::trace!("declare_subscriber({:?})", key_expr); let id = self.runtime.next_id(); let key_expr = match scope { Some(scope) => scope / key_expr, @@ -1090,15 +1113,32 @@ impl Session { let state = zread!(self.state); self.update_status_up(&state, &key_expr) } + } else if key_expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareInterest(DeclareInterest { + id, + wire_expr: Some(key_expr.to_wire(self).to_owned()), + interest: Interest::KEYEXPRS + Interest::SUBSCRIBERS + Interest::FUTURE, + }), + }); } Ok(sub_state) } - pub(crate) fn unsubscribe(&self, sid: Id) -> ZResult<()> { + pub(crate) fn undeclare_subscriber_inner(&self, sid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); if let Some(sub_state) = state.subscribers.remove(&sid) { - trace!("unsubscribe({:?})", sub_state); + trace!("undeclare_subscriber({:?})", sub_state); for res in state .local_resources .values_mut() @@ -1147,6 +1187,23 @@ impl Session { self.update_status_down(&state, &sub_state.key_expr) } } + } else if sub_state + .key_expr + .as_str() + .starts_with(crate::liveliness::PREFIX_LIVELINESS) + { + let primitives = state.primitives.as_ref().unwrap().clone(); + drop(state); + + primitives.send_declare(Declare { + ext_qos: declare::ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: declare::ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareInterest(UndeclareInterest { + id: sub_state.id, + ext_wire_expr: WireExprType::null(), + }), + }); } Ok(()) } else { @@ -1162,7 +1219,7 @@ impl Session { callback: Callback<'static, Query>, ) -> ZResult> { let mut state = zwrite!(self.state); - log::trace!("queryable({:?})", key_expr); + log::trace!("declare_queryable({:?})", key_expr); let id = self.runtime.next_id(); let qable_state = Arc::new(QueryableState { id, @@ -1198,7 +1255,7 @@ impl Session { pub(crate) fn close_queryable(&self, qid: Id) -> ZResult<()> { let mut state = zwrite!(self.state); if let Some(qable_state) = state.queryables.remove(&qid) { - trace!("close_queryable({:?})", qable_state); + trace!("undeclare_queryable({:?})", qable_state); if qable_state.origin != Locality::SessionLocal { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); @@ -2003,7 +2060,7 @@ impl Primitives for Session { }; self.handle_data( false, - &m.ext_wire_expr.wire_expr, + &expr.to_wire(self), Some(data_info), ZBuf::default(), #[cfg(feature = "unstable")] @@ -2021,11 +2078,21 @@ impl Primitives for Session { zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); } - DeclareBody::DeclareToken(_) => todo!(), - DeclareBody::UndeclareToken(_) => todo!(), - DeclareBody::DeclareInterest(_) => todo!(), - DeclareBody::FinalInterest(_) => todo!(), - DeclareBody::UndeclareInterest(_) => todo!(), + DeclareBody::DeclareToken(m) => { + trace!("recv DeclareToken {:?}", m.id); + } + DeclareBody::UndeclareToken(m) => { + trace!("recv UndeclareToken {:?}", m.id); + } + DeclareBody::DeclareInterest(m) => { + trace!("recv DeclareInterest {:?}", m.id); + } + DeclareBody::FinalInterest(m) => { + trace!("recv FinalInterest {:?}", m.id); + } + DeclareBody::UndeclareInterest(m) => { + trace!("recv UndeclareInterest {:?}", m.id); + } } } diff --git a/zenoh/src/subscriber.rs b/zenoh/src/subscriber.rs index 4488140610..9b778e070f 100644 --- a/zenoh/src/subscriber.rs +++ b/zenoh/src/subscriber.rs @@ -144,7 +144,7 @@ impl SyncResolve for SubscriberUndeclaration<'_> { self.subscriber.alive = false; self.subscriber .session - .unsubscribe(self.subscriber.state.id) + .undeclare_subscriber_inner(self.subscriber.state.id) } } @@ -159,7 +159,7 @@ impl AsyncResolve for SubscriberUndeclaration<'_> { impl Drop for SubscriberInner<'_> { fn drop(&mut self) { if self.alive { - let _ = self.session.unsubscribe(self.state.id); + let _ = self.session.undeclare_subscriber_inner(self.state.id); } } }