From 93f93d2d67f25886a25e83922a534694c2135669 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 17 Jun 2024 14:23:12 +0200 Subject: [PATCH] Release tables locks before propagating subscribers and queryables declarations to void dead locks (#1150) * Send simple sub and qabl declarations using a given function * Send simple sub and qabl declarations after releasing tables lock * Send simple sub and qabl declarations after releasing tables lock (missing places) --- zenoh/src/net/primitives/demux.rs | 24 +- zenoh/src/net/routing/dispatcher/face.rs | 25 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 21 +- zenoh/src/net/routing/dispatcher/queries.rs | 15 +- zenoh/src/net/routing/dispatcher/tables.rs | 8 +- zenoh/src/net/routing/hat/client/mod.rs | 25 +- zenoh/src/net/routing/hat/client/pubsub.rs | 132 ++++--- zenoh/src/net/routing/hat/client/queries.rs | 117 +++--- .../src/net/routing/hat/linkstate_peer/mod.rs | 33 +- .../net/routing/hat/linkstate_peer/pubsub.rs | 185 ++++++---- .../net/routing/hat/linkstate_peer/queries.rs | 177 +++++---- zenoh/src/net/routing/hat/mod.rs | 21 +- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 25 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 132 ++++--- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 117 +++--- zenoh/src/net/routing/hat/router/mod.rs | 71 +++- zenoh/src/net/routing/hat/router/pubsub.rs | 335 ++++++++++------- zenoh/src/net/routing/hat/router/queries.rs | 344 +++++++++++------- zenoh/src/net/routing/router.rs | 22 +- zenoh/src/net/tests/tables.rs | 12 + 20 files changed, 1194 insertions(+), 647 deletions(-) diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index d62e410c81..fe096a9dfe 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -72,9 +72,21 @@ impl TransportPeerEventHandler for DeMux { NetworkBody::ResponseFinal(m) => self.face.send_response_final(m), NetworkBody::OAM(m) => { if let Some(transport) = self.transport.as_ref() { + let mut declares = vec![]; let ctrl_lock = zlock!(self.face.tables.ctrl_lock); let mut tables = zwrite!(self.face.tables.tables); - ctrl_lock.handle_oam(&mut tables, &self.face.tables, m, transport)? + ctrl_lock.handle_oam( + &mut tables, + &self.face.tables, + m, + transport, + &mut |p, m| declares.push((p.clone(), m)), + )?; + drop(tables); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } } } @@ -89,9 +101,17 @@ impl TransportPeerEventHandler for DeMux { fn closing(&self) { self.face.send_close(); if let Some(transport) = self.transport.as_ref() { + let mut declares = vec![]; let ctrl_lock = zlock!(self.face.tables.ctrl_lock); let mut tables = zwrite!(self.face.tables.tables); - let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport); + let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| { + declares.push((p.clone(), m)) + }); + drop(tables); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } } diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index f2def1d20a..4df9b7054c 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -195,6 +195,7 @@ impl Primitives for Face { unregister_expr(&self.tables, &mut self.state.clone(), m.id); } zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => { + let mut declares = vec![]; declare_subscription( ctrl_lock.as_ref(), &self.tables, @@ -202,18 +203,30 @@ impl Primitives for Face { &m.wire_expr, &m.ext_info, msg.ext_nodeid.node_id, + &mut |p, m| declares.push((p.clone(), m)), ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } zenoh_protocol::network::DeclareBody::UndeclareSubscriber(m) => { + let mut declares = vec![]; undeclare_subscription( ctrl_lock.as_ref(), &self.tables, &mut self.state.clone(), &m.ext_wire_expr.wire_expr, msg.ext_nodeid.node_id, + &mut |p, m| declares.push((p.clone(), m)), ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } zenoh_protocol::network::DeclareBody::DeclareQueryable(m) => { + let mut declares = vec![]; declare_queryable( ctrl_lock.as_ref(), &self.tables, @@ -221,16 +234,27 @@ impl Primitives for Face { &m.wire_expr, &m.ext_info, msg.ext_nodeid.node_id, + &mut |p, m| declares.push((p.clone(), m)), ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { + let mut declares = vec![]; undeclare_queryable( ctrl_lock.as_ref(), &self.tables, &mut self.state.clone(), &m.ext_wire_expr.wire_expr, msg.ext_nodeid.node_id, + &mut |p, m| declares.push((p.clone(), m)), ); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(), zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(), @@ -238,7 +262,6 @@ impl Primitives for Face { zenoh_protocol::network::DeclareBody::FinalInterest(_m) => todo!(), zenoh_protocol::network::DeclareBody::UndeclareInterest(_m) => todo!(), } - drop(ctrl_lock); } #[inline] diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 6ec4bbf735..5ac1f60627 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -14,7 +14,7 @@ use super::face::FaceState; use super::resource::{DataRoutes, Direction, PullCaches, Resource}; use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock}; -use crate::net::routing::hat::HatTrait; +use crate::net::routing::hat::{HatTrait, SendDeclare}; use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; @@ -37,6 +37,7 @@ pub(crate) fn declare_subscription( expr: &WireExpr, sub_info: &SubscriberInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { tracing::debug!("Declare subscription {}", face); let rtables = zread!(tables.tables); @@ -66,7 +67,14 @@ pub(crate) fn declare_subscription( (res, wtables) }; - hat_code.declare_subscription(&mut wtables, face, &mut res, sub_info, node_id); + hat_code.declare_subscription( + &mut wtables, + face, + &mut res, + sub_info, + node_id, + send_declare, + ); disable_matches_data_routes(&mut wtables, &mut res); drop(wtables); @@ -96,6 +104,7 @@ pub(crate) fn undeclare_subscription( face: &mut Arc, expr: &WireExpr, node_id: NodeId, + send_declare: &mut SendDeclare, ) { tracing::debug!("Undeclare subscription {}", face); let rtables = zread!(tables.tables); @@ -105,7 +114,13 @@ pub(crate) fn undeclare_subscription( drop(rtables); let mut wtables = zwrite!(tables.tables); - hat_code.undeclare_subscription(&mut wtables, face, &mut res, node_id); + hat_code.undeclare_subscription( + &mut wtables, + face, + &mut res, + node_id, + send_declare, + ); disable_matches_data_routes(&mut wtables, &mut res); drop(wtables); diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 719a3834d6..9de841949c 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -15,7 +15,7 @@ use super::face::FaceState; use super::resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource}; use super::tables::NodeId; use super::tables::{RoutingExpr, Tables, TablesLock}; -use crate::net::routing::hat::HatTrait; +use crate::net::routing::hat::{HatTrait, SendDeclare}; use crate::net::routing::RoutingContext; use async_trait::async_trait; use std::collections::HashMap; @@ -56,6 +56,7 @@ pub(crate) fn declare_queryable( expr: &WireExpr, qabl_info: &QueryableInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { tracing::debug!("Register queryable {}", face); let rtables = zread!(tables.tables); @@ -85,7 +86,14 @@ pub(crate) fn declare_queryable( (res, wtables) }; - hat_code.declare_queryable(&mut wtables, face, &mut res, qabl_info, node_id); + hat_code.declare_queryable( + &mut wtables, + face, + &mut res, + qabl_info, + node_id, + send_declare, + ); disable_matches_query_routes(&mut wtables, &mut res); drop(wtables); @@ -112,6 +120,7 @@ pub(crate) fn undeclare_queryable( face: &mut Arc, expr: &WireExpr, node_id: NodeId, + send_declare: &mut SendDeclare, ) { let rtables = zread!(tables.tables); match rtables.get_mapping(face, &expr.scope, expr.mapping) { @@ -120,7 +129,7 @@ pub(crate) fn undeclare_queryable( drop(rtables); let mut wtables = zwrite!(tables.tables); - hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id); + hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id, send_declare); disable_matches_query_routes(&mut wtables, &mut res); drop(wtables); diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 2d5eb436e7..9e71eee853 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -174,7 +174,13 @@ pub fn close_face(tables: &TablesLock, face: &Weak) { tracing::debug!("Close {}", face); face.task_controller.terminate_all(Duration::from_secs(10)); finalize_pending_queries(tables, &mut face); - zlock!(tables.ctrl_lock).close_face(tables, &mut face); + let mut declares = vec![]; + let ctrl_lock = zlock!(tables.ctrl_lock); + ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m))); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } } None => tracing::error!("Face already closed!"), } diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index c19faf39f8..8e8d8d4cb6 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -36,7 +36,7 @@ use super::{ face::FaceState, tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock}, }, - HatBaseTrait, HatTrait, + HatBaseTrait, HatTrait, SendDeclare, }; use std::{ any::Any, @@ -97,9 +97,10 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, _tables_ref: &Arc, face: &mut Face, + send_declare: &mut SendDeclare, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } @@ -109,13 +110,19 @@ impl HatBaseTrait for HatCode { _tables_ref: &Arc, face: &mut Face, _transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } - fn close_face(&self, tables: &TablesLock, face: &mut Arc) { + fn close_face( + &self, + tables: &TablesLock, + face: &mut Arc, + send_declare: &mut SendDeclare, + ) { let mut wtables = zwrite!(tables.tables); let mut face_clone = face.clone(); let face = get_mut_unchecked(face); @@ -139,7 +146,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); + undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -167,7 +174,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); + undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -229,6 +236,7 @@ impl HatBaseTrait for HatCode { _tables_ref: &Arc, _oam: Oam, _transport: &TransportUnicast, + _send_declare: &mut SendDeclare, ) -> ZResult<()> { Ok(()) } @@ -248,6 +256,7 @@ impl HatBaseTrait for HatCode { _tables: &mut Tables, _tables_ref: &Arc, _transport: &TransportUnicast, + _send_declare: &mut SendDeclare, ) -> ZResult<()> { Ok(()) } diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index fb92ae614d..3f194e4e56 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -17,7 +17,7 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::{HatPubSubTrait, Sources}; +use crate::net::routing::hat::{HatPubSubTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use std::borrow::Cow; @@ -40,6 +40,7 @@ fn propagate_simple_subscription_to( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || (dst_face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS))) @@ -48,19 +49,22 @@ fn propagate_simple_subscription_to( { face_hat_mut!(dst_face).local_subs.insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + ), + ); } } @@ -69,6 +73,7 @@ fn propagate_simple_subscription( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { for mut dst_face in tables .faces @@ -76,7 +81,14 @@ fn propagate_simple_subscription( .cloned() .collect::>>() { - propagate_simple_subscription_to(tables, &mut dst_face, res, sub_info, src_face); + propagate_simple_subscription_to( + tables, + &mut dst_face, + res, + sub_info, + src_face, + send_declare, + ); } } @@ -126,12 +138,13 @@ fn declare_client_subscription( face: &mut Arc, res: &mut Arc, sub_info: &SubscriberInfo, + send_declare: &mut SendDeclare, ) { register_client_subscription(tables, face, res, sub_info); let mut propa_sub_info = *sub_info; propa_sub_info.mode = Mode::Push; - propagate_simple_subscription(tables, res, &propa_sub_info, face); + propagate_simple_subscription(tables, res, &propa_sub_info, face, send_declare); // This introduced a buffer overflow on windows // @TODO: Let's deactivate this on windows until Fixed #[cfg(not(windows))] @@ -168,22 +181,29 @@ fn client_subs(res: &Arc) -> Vec> { .collect() } -fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { +fn propagate_forget_simple_subscription( + tables: &mut Tables, + res: &Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_subs.contains(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } } @@ -193,6 +213,7 @@ pub(super) fn undeclare_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client subscription {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -202,7 +223,7 @@ pub(super) fn undeclare_client_subscription( let mut client_subs = client_subs(res); if client_subs.is_empty() { - propagate_forget_simple_subscription(tables, res); + propagate_forget_simple_subscription(tables, res, send_declare); } if client_subs.len() == 1 { let face = &mut client_subs[0]; @@ -210,18 +231,21 @@ pub(super) fn undeclare_client_subscription( && !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } @@ -231,11 +255,16 @@ fn forget_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_subscription(tables, face, res); + undeclare_client_subscription(tables, face, res, send_declare); } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn pubsub_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers mode: Mode::Push, @@ -247,7 +276,14 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { .collect::>>() { for sub in &face_hat!(src_face).remote_subs { - propagate_simple_subscription_to(tables, face, sub, &sub_info, &mut src_face.clone()); + propagate_simple_subscription_to( + tables, + face, + sub, + &sub_info, + &mut src_face.clone(), + send_declare, + ); } } } @@ -260,8 +296,9 @@ impl HatPubSubTrait for HatCode { res: &mut Arc, sub_info: &SubscriberInfo, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - declare_client_subscription(tables, face, res, sub_info); + declare_client_subscription(tables, face, res, sub_info, send_declare); } fn undeclare_subscription( @@ -270,8 +307,9 @@ impl HatPubSubTrait for HatCode { face: &mut Arc, res: &mut Arc, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - forget_client_subscription(tables, face, res); + forget_client_subscription(tables, face, res, send_declare); } fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 3576148aaf..445f618845 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -17,7 +17,7 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::{HatQueriesTrait, Sources}; +use crate::net::routing::hat::{HatQueriesTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -79,6 +79,7 @@ fn propagate_simple_queryable( tables: &mut Tables, res: &Arc, src_face: Option<&mut Arc>, + send_declare: &mut SendDeclare, ) { let faces = tables.faces.values().cloned(); for mut dst_face in faces { @@ -94,19 +95,22 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, &mut dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } @@ -143,9 +147,10 @@ fn declare_client_queryable( face: &mut Arc, res: &mut Arc, qabl_info: &QueryableInfo, + send_declare: &mut SendDeclare, ) { register_client_queryable(tables, face, res, qabl_info); - propagate_simple_queryable(tables, res, Some(face)); + propagate_simple_queryable(tables, res, Some(face), send_declare); } #[inline] @@ -162,22 +167,29 @@ fn client_qabls(res: &Arc) -> Vec> { .collect() } -fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { +fn propagate_forget_simple_queryable( + tables: &mut Tables, + res: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -188,6 +200,7 @@ pub(super) fn undeclare_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client queryable {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -199,26 +212,29 @@ pub(super) fn undeclare_client_queryable( let mut client_qabls = client_qabls(res); if client_qabls.is_empty() { - propagate_forget_simple_queryable(tables, res); + propagate_forget_simple_queryable(tables, res, send_declare); } else { - propagate_simple_queryable(tables, res, None); + propagate_simple_queryable(tables, res, None, send_declare); } if client_qabls.len() == 1 { let face = &mut client_qabls[0]; if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -229,11 +245,16 @@ fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_queryable(tables, face, res); + undeclare_client_queryable(tables, face, res, send_declare); } -pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) { +pub(super) fn queries_new_face( + tables: &mut Tables, + _face: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables .faces .values() @@ -241,7 +262,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) .collect::>>() { for qabl in face_hat!(face).remote_qabls.iter() { - propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); + propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare); } } } @@ -258,8 +279,9 @@ impl HatQueriesTrait for HatCode { res: &mut Arc, qabl_info: &QueryableInfo, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, res, qabl_info, send_declare); } fn undeclare_queryable( @@ -268,8 +290,9 @@ impl HatQueriesTrait for HatCode { face: &mut Arc, res: &mut Arc, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - forget_client_queryable(tables, face, res); + forget_client_queryable(tables, face, res, send_declare); } fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 808acef23f..ad4e1667f0 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -27,7 +27,7 @@ use super::{ face::FaceState, tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock}, }, - HatBaseTrait, HatTrait, + HatBaseTrait, HatTrait, SendDeclare, }; use crate::{ net::{ @@ -213,9 +213,10 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, _tables_ref: &Arc, face: &mut Face, + send_declare: &mut SendDeclare, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } @@ -225,6 +226,7 @@ impl HatBaseTrait for HatCode { tables_ref: &Arc, face: &mut Face, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { let link_id = if face.state.whatami != WhatAmI::Client { if let Some(net) = hat_mut!(tables).peers_net.as_mut() { @@ -237,8 +239,8 @@ impl HatBaseTrait for HatCode { }; face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); if face.state.whatami != WhatAmI::Client { hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); @@ -246,7 +248,12 @@ impl HatBaseTrait for HatCode { Ok(()) } - fn close_face(&self, tables: &TablesLock, face: &mut Arc) { + fn close_face( + &self, + tables: &TablesLock, + face: &mut Arc, + send_declare: &mut SendDeclare, + ) { let mut wtables = zwrite!(tables.tables); let mut face_clone = face.clone(); let face = get_mut_unchecked(face); @@ -270,7 +277,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); + undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -298,7 +305,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); + undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -360,6 +367,7 @@ impl HatBaseTrait for HatCode { tables_ref: &Arc, oam: Oam, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { if oam.id == OAM_LINKSTATE { if let ZExtBody::ZBuf(buf) = oam.body { @@ -376,8 +384,8 @@ impl HatBaseTrait for HatCode { let changes = net.link_states(list.link_states, zid); for (_, removed_node) in changes.removed_nodes { - pubsub_remove_node(tables, &removed_node.zid); - queries_remove_node(tables, &removed_node.zid); + pubsub_remove_node(tables, &removed_node.zid, send_declare); + queries_remove_node(tables, &removed_node.zid, send_declare); } hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); @@ -409,6 +417,7 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, tables_ref: &Arc, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { match (transport.get_zid(), transport.get_whatami()) { (Ok(zid), Ok(whatami)) => { @@ -419,8 +428,8 @@ impl HatBaseTrait for HatCode { .unwrap() .remove_link(&zid) { - pubsub_remove_node(tables, &removed_node.zid); - queries_remove_node(tables, &removed_node.zid); + pubsub_remove_node(tables, &removed_node.zid, send_declare); + queries_remove_node(tables, &removed_node.zid, send_declare); } hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 232e241670..80a8eff95d 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::{HatPubSubTrait, Sources}; +use crate::net::routing::hat::{HatPubSubTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use petgraph::graph::NodeIndex; @@ -85,6 +85,7 @@ fn propagate_simple_subscription_to( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || res.expr().starts_with(PREFIX_LIVELINESS)) && !face_hat!(dst_face).local_subs.contains(res) @@ -92,19 +93,22 @@ fn propagate_simple_subscription_to( { face_hat_mut!(dst_face).local_subs.insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // TODO - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // TODO + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + ), + ); } } @@ -113,6 +117,7 @@ fn propagate_simple_subscription( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { for mut dst_face in tables .faces @@ -120,7 +125,14 @@ fn propagate_simple_subscription( .cloned() .collect::>>() { - propagate_simple_subscription_to(tables, &mut dst_face, res, sub_info, src_face); + propagate_simple_subscription_to( + tables, + &mut dst_face, + res, + sub_info, + src_face, + send_declare, + ); } } @@ -167,6 +179,7 @@ fn register_peer_subscription( res: &mut Arc, sub_info: &SubscriberInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { if !res_hat!(res).peer_subs.contains(&peer) { // Register peer subscription @@ -182,7 +195,7 @@ fn register_peer_subscription( if tables.whatami == WhatAmI::Peer { // Propagate subscription to clients - propagate_simple_subscription(tables, res, sub_info, face); + propagate_simple_subscription(tables, res, sub_info, face, send_declare); } } @@ -192,8 +205,9 @@ fn declare_peer_subscription( res: &mut Arc, sub_info: &SubscriberInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { - register_peer_subscription(tables, face, res, sub_info, peer); + register_peer_subscription(tables, face, res, sub_info, peer, send_declare); } fn register_client_subscription( @@ -242,12 +256,13 @@ fn declare_client_subscription( face: &mut Arc, res: &mut Arc, sub_info: &SubscriberInfo, + send_declare: &mut SendDeclare, ) { register_client_subscription(tables, face, res, sub_info); let mut propa_sub_info = *sub_info; propa_sub_info.mode = Mode::Push; let zid = tables.zid; - register_peer_subscription(tables, face, res, &propa_sub_info, zid); + register_peer_subscription(tables, face, res, &propa_sub_info, zid, send_declare); } #[inline] @@ -313,22 +328,29 @@ fn send_forget_sourced_subscription_to_net_children( } } -fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { +fn propagate_forget_simple_subscription( + tables: &mut Tables, + res: &Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_subs.contains(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // TODO - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // TODO + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } } @@ -369,7 +391,12 @@ fn propagate_forget_sourced_subscription( } } -fn unregister_peer_subscription(tables: &mut Tables, res: &mut Arc, peer: &ZenohId) { +fn unregister_peer_subscription( + tables: &mut Tables, + res: &mut Arc, + peer: &ZenohId, + send_declare: &mut SendDeclare, +) { tracing::debug!( "Unregister peer subscription {} (peer: {})", res.expr(), @@ -383,7 +410,7 @@ fn unregister_peer_subscription(tables: &mut Tables, res: &mut Arc, pe .retain(|sub| !Arc::ptr_eq(sub, res)); if tables.whatami == WhatAmI::Peer { - propagate_forget_simple_subscription(tables, res); + propagate_forget_simple_subscription(tables, res, send_declare); } } } @@ -393,9 +420,10 @@ fn undeclare_peer_subscription( face: Option<&Arc>, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { if res_hat!(res).peer_subs.contains(peer) { - unregister_peer_subscription(tables, res, peer); + unregister_peer_subscription(tables, res, peer, send_declare); propagate_forget_sourced_subscription(tables, res, face, peer); } } @@ -405,14 +433,16 @@ fn forget_peer_subscription( face: &mut Arc, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { - undeclare_peer_subscription(tables, Some(face), res, peer); + undeclare_peer_subscription(tables, Some(face), res, peer, send_declare); } pub(super) fn undeclare_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client subscription {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -423,7 +453,7 @@ pub(super) fn undeclare_client_subscription( let mut client_subs = client_subs(res); let peer_subs = remote_peer_subs(tables, res); if client_subs.is_empty() { - undeclare_peer_subscription(tables, None, res, &tables.zid.clone()); + undeclare_peer_subscription(tables, None, res, &tables.zid.clone(), send_declare); } if client_subs.len() == 1 && !peer_subs { let face = &mut client_subs[0]; @@ -431,18 +461,21 @@ pub(super) fn undeclare_client_subscription( && !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // TODO - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // TODO + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } @@ -453,11 +486,16 @@ fn forget_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_subscription(tables, face, res); + undeclare_client_subscription(tables, face, res, send_declare); } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn pubsub_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO mode: Mode::Push, @@ -467,24 +505,31 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { for sub in &hat!(tables).peer_subs { face_hat_mut!(face).local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // TODO - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // TODO + wire_expr: key_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + ), + ); } } } -pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId) { +pub(super) fn pubsub_remove_node( + tables: &mut Tables, + node: &ZenohId, + send_declare: &mut SendDeclare, +) { for mut res in hat!(tables) .peer_subs .iter() @@ -492,7 +537,7 @@ pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId) { .cloned() .collect::>>() { - unregister_peer_subscription(tables, &mut res, node); + unregister_peer_subscription(tables, &mut res, node, send_declare); update_matches_data_routes(tables, &mut res); Resource::clean(&mut res) @@ -579,13 +624,14 @@ impl HatPubSubTrait for HatCode { res: &mut Arc, sub_info: &SubscriberInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { if let Some(peer) = get_peer(tables, face, node_id) { - declare_peer_subscription(tables, face, res, sub_info, peer) + declare_peer_subscription(tables, face, res, sub_info, peer, send_declare) } } else { - declare_client_subscription(tables, face, res, sub_info) + declare_client_subscription(tables, face, res, sub_info, send_declare) } } @@ -595,13 +641,14 @@ impl HatPubSubTrait for HatCode { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_subscription(tables, face, res, &peer); + forget_peer_subscription(tables, face, res, &peer, send_declare); } } else { - forget_client_subscription(tables, face, res); + forget_client_subscription(tables, face, res, send_declare); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index a1dd01d903..dfe729e4a3 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::{HatQueriesTrait, Sources}; +use crate::net::routing::hat::{HatQueriesTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -162,6 +162,7 @@ fn propagate_simple_queryable( tables: &mut Tables, res: &Arc, src_face: Option<&mut Arc>, + send_declare: &mut SendDeclare, ) { let faces = tables.faces.values().cloned(); for mut dst_face in faces { @@ -175,19 +176,22 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, &mut dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } @@ -235,6 +239,7 @@ fn register_peer_queryable( res: &mut Arc, qabl_info: &QueryableInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { let current_info = res_hat!(res).peer_qabls.get(&peer); if current_info.is_none() || current_info.unwrap() != qabl_info { @@ -251,7 +256,7 @@ fn register_peer_queryable( if tables.whatami == WhatAmI::Peer { // Propagate queryable to clients - propagate_simple_queryable(tables, res, face); + propagate_simple_queryable(tables, res, face, send_declare); } } @@ -261,9 +266,10 @@ fn declare_peer_queryable( res: &mut Arc, qabl_info: &QueryableInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { let face = Some(face); - register_peer_queryable(tables, face, res, qabl_info, peer); + register_peer_queryable(tables, face, res, qabl_info, peer, send_declare); } fn register_client_queryable( @@ -298,12 +304,13 @@ fn declare_client_queryable( face: &mut Arc, res: &mut Arc, qabl_info: &QueryableInfo, + send_declare: &mut SendDeclare, ) { register_client_queryable(tables, face, res, qabl_info); let local_details = local_peer_qabl_info(tables, res); let zid = tables.zid; - register_peer_queryable(tables, Some(face), res, &local_details, zid); + register_peer_queryable(tables, Some(face), res, &local_details, zid, send_declare); } #[inline] @@ -369,22 +376,29 @@ fn send_forget_sourced_queryable_to_net_children( } } -fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { +fn propagate_forget_simple_queryable( + tables: &mut Tables, + res: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -426,7 +440,12 @@ fn propagate_forget_sourced_queryable( } } -fn unregister_peer_queryable(tables: &mut Tables, res: &mut Arc, peer: &ZenohId) { +fn unregister_peer_queryable( + tables: &mut Tables, + res: &mut Arc, + peer: &ZenohId, + send_declare: &mut SendDeclare, +) { tracing::debug!("Unregister peer queryable {} (peer: {})", res.expr(), peer,); res_hat_mut!(res).peer_qabls.remove(peer); @@ -436,7 +455,7 @@ fn unregister_peer_queryable(tables: &mut Tables, res: &mut Arc, peer: .retain(|qabl| !Arc::ptr_eq(qabl, res)); if tables.whatami == WhatAmI::Peer { - propagate_forget_simple_queryable(tables, res); + propagate_forget_simple_queryable(tables, res, send_declare); } } } @@ -446,9 +465,10 @@ fn undeclare_peer_queryable( face: Option<&Arc>, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { if res_hat!(res).peer_qabls.contains_key(peer) { - unregister_peer_queryable(tables, res, peer); + unregister_peer_queryable(tables, res, peer, send_declare); propagate_forget_sourced_queryable(tables, res, face, peer); } } @@ -458,14 +478,16 @@ fn forget_peer_queryable( face: &mut Arc, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { - undeclare_peer_queryable(tables, Some(face), res, peer); + undeclare_peer_queryable(tables, Some(face), res, peer, send_declare); } pub(super) fn undeclare_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client queryable {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -479,28 +501,31 @@ pub(super) fn undeclare_client_queryable( let peer_qabls = remote_peer_qabls(tables, res); if client_qabls.is_empty() { - undeclare_peer_queryable(tables, None, res, &tables.zid.clone()); + undeclare_peer_queryable(tables, None, res, &tables.zid.clone(), send_declare); } else { let local_info = local_peer_qabl_info(tables, res); - register_peer_queryable(tables, None, res, &local_info, tables.zid); + register_peer_queryable(tables, None, res, &local_info, tables.zid, send_declare); } if client_qabls.len() == 1 && !peer_qabls { let face = &mut client_qabls[0]; if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -511,36 +536,48 @@ fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_queryable(tables, face, res); + undeclare_client_queryable(tables, face, res, send_declare); } -pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn queries_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { if face.whatami == WhatAmI::Client { for qabl in &hat!(tables).peer_qabls { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + ), + ); } } } } -pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId) { +pub(super) fn queries_remove_node( + tables: &mut Tables, + node: &ZenohId, + send_declare: &mut SendDeclare, +) { let mut qabls = vec![]; for res in hat!(tables).peer_qabls.iter() { for qabl in res_hat!(res).peer_qabls.keys() { @@ -550,7 +587,7 @@ pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId) { } } for mut res in qabls { - unregister_peer_queryable(tables, &mut res, node); + unregister_peer_queryable(tables, &mut res, node, send_declare); update_matches_query_routes(tables, &res); Resource::clean(&mut res) @@ -644,13 +681,14 @@ impl HatQueriesTrait for HatCode { res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { if let Some(peer) = get_peer(tables, face, node_id) { - declare_peer_queryable(tables, face, res, qabl_info, peer); + declare_peer_queryable(tables, face, res, qabl_info, peer, send_declare); } } else { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, res, qabl_info, send_declare); } } @@ -660,13 +698,14 @@ impl HatQueriesTrait for HatCode { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_queryable(tables, face, res, &peer); + forget_peer_queryable(tables, face, res, &peer, send_declare); } } else { - forget_client_queryable(tables, face, res); + forget_client_queryable(tables, face, res, send_declare); } } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 82f2a6746e..99e2f175b6 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -23,6 +23,7 @@ use super::{ tables::{NodeId, QueryTargetQablSet, Resource, Route, RoutingExpr, Tables, TablesLock}, }, router::RoutesIndexes, + RoutingContext, }; use crate::runtime::Runtime; use std::{any::Any, sync::Arc}; @@ -32,7 +33,7 @@ use zenoh_protocol::{ core::WireExpr, network::{ declare::{queryable::ext::QueryableInfo, subscriber::ext::SubscriberInfo}, - Oam, + Declare, Oam, }, }; use zenoh_result::ZResult; @@ -64,6 +65,9 @@ impl Sources { } } +pub(crate) type SendDeclare<'a> = dyn FnMut(&Arc, RoutingContext) + + 'a; + pub(crate) trait HatTrait: HatBaseTrait + HatPubSubTrait + HatQueriesTrait {} pub(crate) trait HatBaseTrait { @@ -80,6 +84,7 @@ pub(crate) trait HatBaseTrait { tables: &mut Tables, tables_ref: &Arc, face: &mut Face, + send_declare: &mut SendDeclare, ) -> ZResult<()>; fn new_transport_unicast_face( @@ -88,6 +93,7 @@ pub(crate) trait HatBaseTrait { tables_ref: &Arc, face: &mut Face, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()>; fn handle_oam( @@ -96,6 +102,7 @@ pub(crate) trait HatBaseTrait { tables_ref: &Arc, oam: Oam, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()>; fn map_routing_context( @@ -122,9 +129,15 @@ pub(crate) trait HatBaseTrait { tables: &mut Tables, tables_ref: &Arc, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()>; - fn close_face(&self, tables: &TablesLock, face: &mut Arc); + fn close_face( + &self, + tables: &TablesLock, + face: &mut Arc, + send_declare: &mut SendDeclare, + ); } pub(crate) trait HatPubSubTrait { @@ -135,6 +148,7 @@ pub(crate) trait HatPubSubTrait { res: &mut Arc, sub_info: &SubscriberInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ); fn undeclare_subscription( &self, @@ -142,6 +156,7 @@ pub(crate) trait HatPubSubTrait { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ); fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)>; @@ -165,6 +180,7 @@ pub(crate) trait HatQueriesTrait { res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ); fn undeclare_queryable( &self, @@ -172,6 +188,7 @@ pub(crate) trait HatQueriesTrait { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ); fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)>; diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 294932fe24..89270ffe2c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -41,7 +41,7 @@ use super::{ face::FaceState, tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock}, }, - HatBaseTrait, HatTrait, + HatBaseTrait, HatTrait, SendDeclare, }; use std::{ any::Any, @@ -138,9 +138,10 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, _tables_ref: &Arc, face: &mut Face, + send_declare: &mut SendDeclare, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } @@ -150,18 +151,24 @@ impl HatBaseTrait for HatCode { _tables_ref: &Arc, face: &mut Face, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { if face.state.whatami != WhatAmI::Client { if let Some(net) = hat_mut!(tables).gossip.as_mut() { net.add_link(transport.clone()); } } - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } - fn close_face(&self, tables: &TablesLock, face: &mut Arc) { + fn close_face( + &self, + tables: &TablesLock, + face: &mut Arc, + send_declare: &mut SendDeclare, + ) { let mut wtables = zwrite!(tables.tables); let mut face_clone = face.clone(); let face = get_mut_unchecked(face); @@ -185,7 +192,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); + undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -213,7 +220,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); + undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -275,6 +282,7 @@ impl HatBaseTrait for HatCode { _tables_ref: &Arc, oam: Oam, transport: &TransportUnicast, + _send_declare: &mut SendDeclare, ) -> ZResult<()> { if oam.id == OAM_LINKSTATE { if let ZExtBody::ZBuf(buf) = oam.body { @@ -313,6 +321,7 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, _tables_ref: &Arc, transport: &TransportUnicast, + _send_declare: &mut SendDeclare, ) -> ZResult<()> { match (transport.get_zid(), transport.get_whatami()) { (Ok(zid), Ok(whatami)) => { diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index bbaf0f5bac..175ee8f0ca 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -17,7 +17,7 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::{HatPubSubTrait, Sources}; +use crate::net::routing::hat::{HatPubSubTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use std::borrow::Cow; @@ -40,6 +40,7 @@ fn propagate_simple_subscription_to( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || (dst_face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS))) @@ -48,19 +49,22 @@ fn propagate_simple_subscription_to( { face_hat_mut!(dst_face).local_subs.insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + ), + ); } } @@ -69,6 +73,7 @@ fn propagate_simple_subscription( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { for mut dst_face in tables .faces @@ -76,7 +81,14 @@ fn propagate_simple_subscription( .cloned() .collect::>>() { - propagate_simple_subscription_to(tables, &mut dst_face, res, sub_info, src_face); + propagate_simple_subscription_to( + tables, + &mut dst_face, + res, + sub_info, + src_face, + send_declare, + ); } } @@ -126,12 +138,13 @@ fn declare_client_subscription( face: &mut Arc, res: &mut Arc, sub_info: &SubscriberInfo, + send_declare: &mut SendDeclare, ) { register_client_subscription(tables, face, res, sub_info); let mut propa_sub_info = *sub_info; propa_sub_info.mode = Mode::Push; - propagate_simple_subscription(tables, res, &propa_sub_info, face); + propagate_simple_subscription(tables, res, &propa_sub_info, face, send_declare); // This introduced a buffer overflow on windows // TODO: Let's deactivate this on windows until Fixed #[cfg(not(windows))] @@ -168,22 +181,29 @@ fn client_subs(res: &Arc) -> Vec> { .collect() } -fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { +fn propagate_forget_simple_subscription( + tables: &mut Tables, + res: &Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_subs.contains(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } } @@ -193,6 +213,7 @@ pub(super) fn undeclare_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client subscription {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -202,7 +223,7 @@ pub(super) fn undeclare_client_subscription( let mut client_subs = client_subs(res); if client_subs.is_empty() { - propagate_forget_simple_subscription(tables, res); + propagate_forget_simple_subscription(tables, res, send_declare); } if client_subs.len() == 1 { let face = &mut client_subs[0]; @@ -210,18 +231,21 @@ pub(super) fn undeclare_client_subscription( && !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } @@ -232,11 +256,16 @@ fn forget_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_subscription(tables, face, res); + undeclare_client_subscription(tables, face, res, send_declare); } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn pubsub_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers mode: Mode::Push, @@ -248,7 +277,14 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { .collect::>>() { for sub in &face_hat!(src_face).remote_subs { - propagate_simple_subscription_to(tables, face, sub, &sub_info, &mut src_face.clone()); + propagate_simple_subscription_to( + tables, + face, + sub, + &sub_info, + &mut src_face.clone(), + send_declare, + ); } } } @@ -261,8 +297,9 @@ impl HatPubSubTrait for HatCode { res: &mut Arc, sub_info: &SubscriberInfo, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - declare_client_subscription(tables, face, res, sub_info); + declare_client_subscription(tables, face, res, sub_info, send_declare); } fn undeclare_subscription( @@ -271,8 +308,9 @@ impl HatPubSubTrait for HatCode { face: &mut Arc, res: &mut Arc, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - forget_client_subscription(tables, face, res); + forget_client_subscription(tables, face, res, send_declare); } fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)> { diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index aeaee21409..6084164a80 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -17,7 +17,7 @@ use crate::net::routing::dispatcher::face::FaceState; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::{HatQueriesTrait, Sources}; +use crate::net::routing::hat::{HatQueriesTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -79,6 +79,7 @@ fn propagate_simple_queryable( tables: &mut Tables, res: &Arc, src_face: Option<&mut Arc>, + send_declare: &mut SendDeclare, ) { let faces = tables.faces.values().cloned(); for mut dst_face in faces { @@ -94,19 +95,22 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, &mut dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } @@ -143,9 +147,10 @@ fn declare_client_queryable( face: &mut Arc, res: &mut Arc, qabl_info: &QueryableInfo, + send_declare: &mut SendDeclare, ) { register_client_queryable(tables, face, res, qabl_info); - propagate_simple_queryable(tables, res, Some(face)); + propagate_simple_queryable(tables, res, Some(face), send_declare); } #[inline] @@ -162,22 +167,29 @@ fn client_qabls(res: &Arc) -> Vec> { .collect() } -fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { +fn propagate_forget_simple_queryable( + tables: &mut Tables, + res: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -188,6 +200,7 @@ pub(super) fn undeclare_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client queryable {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -199,26 +212,29 @@ pub(super) fn undeclare_client_queryable( let mut client_qabls = client_qabls(res); if client_qabls.is_empty() { - propagate_forget_simple_queryable(tables, res); + propagate_forget_simple_queryable(tables, res, send_declare); } else { - propagate_simple_queryable(tables, res, None); + propagate_simple_queryable(tables, res, None, send_declare); } if client_qabls.len() == 1 { let face = &mut client_qabls[0]; if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -229,11 +245,16 @@ fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_queryable(tables, face, res); + undeclare_client_queryable(tables, face, res, send_declare); } -pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) { +pub(super) fn queries_new_face( + tables: &mut Tables, + _face: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables .faces .values() @@ -241,7 +262,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) .collect::>>() { for qabl in face_hat!(face).remote_qabls.iter() { - propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); + propagate_simple_queryable(tables, qabl, Some(&mut face.clone()), send_declare); } } } @@ -258,8 +279,9 @@ impl HatQueriesTrait for HatCode { res: &mut Arc, qabl_info: &QueryableInfo, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, res, qabl_info, send_declare); } fn undeclare_queryable( @@ -268,8 +290,9 @@ impl HatQueriesTrait for HatCode { face: &mut Arc, res: &mut Arc, _node_id: NodeId, + send_declare: &mut SendDeclare, ) { - forget_client_queryable(tables, face, res); + forget_client_queryable(tables, face, res, send_declare); } fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)> { diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 571c21bfed..3be278aa02 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -31,7 +31,7 @@ use super::{ face::FaceState, tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock}, }, - HatBaseTrait, HatTrait, + HatBaseTrait, HatTrait, SendDeclare, }; use crate::{ net::{ @@ -366,9 +366,10 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, _tables_ref: &Arc, face: &mut Face, + send_declare: &mut SendDeclare, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); Ok(()) } @@ -378,6 +379,7 @@ impl HatBaseTrait for HatCode { tables_ref: &Arc, face: &mut Face, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { let link_id = match face.state.whatami { WhatAmI::Router => hat_mut!(tables) @@ -403,8 +405,8 @@ impl HatBaseTrait for HatCode { } face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + pubsub_new_face(tables, &mut face.state, send_declare); + queries_new_face(tables, &mut face.state, send_declare); match face.state.whatami { WhatAmI::Router => { @@ -420,7 +422,12 @@ impl HatBaseTrait for HatCode { Ok(()) } - fn close_face(&self, tables: &TablesLock, face: &mut Arc) { + fn close_face( + &self, + tables: &TablesLock, + face: &mut Arc, + send_declare: &mut SendDeclare, + ) { let mut wtables = zwrite!(tables.tables); let mut face_clone = face.clone(); let face = get_mut_unchecked(face); @@ -444,7 +451,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); + undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -472,7 +479,7 @@ impl HatBaseTrait for HatCode { .drain() { get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); + undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { for match_ in &res.context().matches { @@ -534,6 +541,7 @@ impl HatBaseTrait for HatCode { tables_ref: &Arc, oam: Oam, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { if oam.id == OAM_LINKSTATE { if let ZExtBody::ZBuf(buf) = oam.body { @@ -554,8 +562,18 @@ impl HatBaseTrait for HatCode { .link_states(list.link_states, zid) .removed_nodes { - pubsub_remove_node(tables, &removed_node.zid, WhatAmI::Router); - queries_remove_node(tables, &removed_node.zid, WhatAmI::Router); + pubsub_remove_node( + tables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + queries_remove_node( + tables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); } if hat!(tables).full_net(WhatAmI::Peer) { @@ -577,11 +595,13 @@ impl HatBaseTrait for HatCode { tables, &removed_node.zid, WhatAmI::Peer, + send_declare, ); queries_remove_node( tables, &removed_node.zid, WhatAmI::Peer, + send_declare, ); } @@ -598,11 +618,13 @@ impl HatBaseTrait for HatCode { tables, &updated_node.zid, &updated_node.links, + send_declare, ); queries_linkstate_change( tables, &updated_node.zid, &updated_node.links, + send_declare, ); } } @@ -650,6 +672,7 @@ impl HatBaseTrait for HatCode { tables: &mut Tables, tables_ref: &Arc, transport: &TransportUnicast, + send_declare: &mut SendDeclare, ) -> ZResult<()> { match (transport.get_zid(), transport.get_whatami()) { (Ok(zid), Ok(whatami)) => { @@ -661,8 +684,18 @@ impl HatBaseTrait for HatCode { .unwrap() .remove_link(&zid) { - pubsub_remove_node(tables, &removed_node.zid, WhatAmI::Router); - queries_remove_node(tables, &removed_node.zid, WhatAmI::Router); + pubsub_remove_node( + tables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); + queries_remove_node( + tables, + &removed_node.zid, + WhatAmI::Router, + send_declare, + ); } if hat!(tables).full_net(WhatAmI::Peer) { @@ -683,8 +716,18 @@ impl HatBaseTrait for HatCode { .unwrap() .remove_link(&zid) { - pubsub_remove_node(tables, &removed_node.zid, WhatAmI::Peer); - queries_remove_node(tables, &removed_node.zid, WhatAmI::Peer); + pubsub_remove_node( + tables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); + queries_remove_node( + tables, + &removed_node.zid, + WhatAmI::Peer, + send_declare, + ); } hat_mut!(tables).shared_nodes = shared_nodes( diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index e8c6cb4e6a..d6f1f4fbc1 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::pubsub::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; -use crate::net::routing::hat::{HatPubSubTrait, Sources}; +use crate::net::routing::hat::{HatPubSubTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use petgraph::graph::NodeIndex; @@ -86,6 +86,7 @@ fn propagate_simple_subscription_to( sub_info: &SubscriberInfo, src_face: &mut Arc, full_peer_net: bool, + send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || (dst_face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS))) @@ -101,19 +102,22 @@ fn propagate_simple_subscription_to( { face_hat_mut!(dst_face).local_subs.insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + ), + ); } } @@ -122,6 +126,7 @@ fn propagate_simple_subscription( res: &Arc, sub_info: &SubscriberInfo, src_face: &mut Arc, + send_declare: &mut SendDeclare, ) { let full_peer_net = hat!(tables).full_net(WhatAmI::Peer); for mut dst_face in tables @@ -137,6 +142,7 @@ fn propagate_simple_subscription( sub_info, src_face, full_peer_net, + send_declare, ); } } @@ -185,6 +191,7 @@ fn register_router_subscription( res: &mut Arc, sub_info: &SubscriberInfo, router: ZenohId, + send_declare: &mut SendDeclare, ) { if !res_hat!(res).router_subs.contains(&router) { // Register router subscription @@ -207,7 +214,7 @@ fn register_router_subscription( } // Propagate subscription to clients - propagate_simple_subscription(tables, res, sub_info, face); + propagate_simple_subscription(tables, res, sub_info, face, send_declare); } fn declare_router_subscription( @@ -216,8 +223,9 @@ fn declare_router_subscription( res: &mut Arc, sub_info: &SubscriberInfo, router: ZenohId, + send_declare: &mut SendDeclare, ) { - register_router_subscription(tables, face, res, sub_info, router); + register_router_subscription(tables, face, res, sub_info, router, send_declare); } fn register_peer_subscription( @@ -246,12 +254,13 @@ fn declare_peer_subscription( res: &mut Arc, sub_info: &SubscriberInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { register_peer_subscription(tables, face, res, sub_info, peer); let mut propa_sub_info = *sub_info; propa_sub_info.mode = Mode::Push; let zid = tables.zid; - register_router_subscription(tables, face, res, &propa_sub_info, zid); + register_router_subscription(tables, face, res, &propa_sub_info, zid, send_declare); } fn register_client_subscription( @@ -300,12 +309,13 @@ fn declare_client_subscription( face: &mut Arc, res: &mut Arc, sub_info: &SubscriberInfo, + send_declare: &mut SendDeclare, ) { register_client_subscription(tables, face, res, sub_info); let mut propa_sub_info = *sub_info; propa_sub_info.mode = Mode::Push; let zid = tables.zid; - register_router_subscription(tables, face, res, &propa_sub_info, zid); + register_router_subscription(tables, face, res, &propa_sub_info, zid, send_declare); } #[inline] @@ -380,28 +390,39 @@ fn send_forget_sourced_subscription_to_net_children( } } -fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { +fn propagate_forget_simple_subscription( + tables: &mut Tables, + res: &Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_subs.contains(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } } } -fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc) { +fn propagate_forget_simple_subscription_to_peers( + tables: &mut Tables, + res: &Arc, + send_declare: &mut SendDeclare, +) { if !hat!(tables).full_net(WhatAmI::Peer) && res_hat!(res).router_subs.len() == 1 && res_hat!(res).router_subs.contains(&tables.zid) @@ -423,18 +444,21 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< }) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(&mut face).local_subs.remove(res); } @@ -478,7 +502,12 @@ fn propagate_forget_sourced_subscription( } } -fn unregister_router_subscription(tables: &mut Tables, res: &mut Arc, router: &ZenohId) { +fn unregister_router_subscription( + tables: &mut Tables, + res: &mut Arc, + router: &ZenohId, + send_declare: &mut SendDeclare, +) { tracing::debug!( "Unregister router subscription {} (router: {})", res.expr(), @@ -494,10 +523,10 @@ fn unregister_router_subscription(tables: &mut Tables, res: &mut Arc, if hat_mut!(tables).full_net(WhatAmI::Peer) { undeclare_peer_subscription(tables, None, res, &tables.zid.clone()); } - propagate_forget_simple_subscription(tables, res); + propagate_forget_simple_subscription(tables, res, send_declare); } - propagate_forget_simple_subscription_to_peers(tables, res); + propagate_forget_simple_subscription_to_peers(tables, res, send_declare); } fn undeclare_router_subscription( @@ -505,9 +534,10 @@ fn undeclare_router_subscription( face: Option<&Arc>, res: &mut Arc, router: &ZenohId, + send_declare: &mut SendDeclare, ) { if res_hat!(res).router_subs.contains(router) { - unregister_router_subscription(tables, res, router); + unregister_router_subscription(tables, res, router, send_declare); propagate_forget_sourced_subscription(tables, res, face, router, WhatAmI::Router); } } @@ -517,8 +547,9 @@ fn forget_router_subscription( face: &mut Arc, res: &mut Arc, router: &ZenohId, + send_declare: &mut SendDeclare, ) { - undeclare_router_subscription(tables, Some(face), res, router); + undeclare_router_subscription(tables, Some(face), res, router, send_declare); } fn unregister_peer_subscription(tables: &mut Tables, res: &mut Arc, peer: &ZenohId) { @@ -553,13 +584,14 @@ fn forget_peer_subscription( face: &mut Arc, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { undeclare_peer_subscription(tables, Some(face), res, peer); let client_subs = res.session_ctxs.values().any(|ctx| ctx.subs.is_some()); let peer_subs = remote_peer_subs(tables, res); let zid = tables.zid; if !client_subs && !peer_subs { - undeclare_router_subscription(tables, None, res, &zid); + undeclare_router_subscription(tables, None, res, &zid, send_declare); } } @@ -567,6 +599,7 @@ pub(super) fn undeclare_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client subscription {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -578,9 +611,9 @@ pub(super) fn undeclare_client_subscription( let router_subs = remote_router_subs(tables, res); let peer_subs = remote_peer_subs(tables, res); if client_subs.is_empty() && !peer_subs { - undeclare_router_subscription(tables, None, res, &tables.zid.clone()); + undeclare_router_subscription(tables, None, res, &tables.zid.clone(), send_declare); } else { - propagate_forget_simple_subscription_to_peers(tables, res); + propagate_forget_simple_subscription_to_peers(tables, res, send_declare); } if client_subs.len() == 1 && !router_subs && !peer_subs { let face = &mut client_subs[0]; @@ -588,18 +621,21 @@ pub(super) fn undeclare_client_subscription( && !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_subs.remove(res); } @@ -610,11 +646,16 @@ fn forget_client_subscription( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_subscription(tables, face, res); + undeclare_client_subscription(tables, face, res, send_declare); } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn pubsub_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers mode: Mode::Push, @@ -624,19 +665,22 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { for sub in &hat!(tables).router_subs { face_hat_mut!(face).local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + ), + ); } } else if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { for sub in &hat!(tables).router_subs { @@ -651,25 +695,33 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { { face_hat_mut!(face).local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: sub_info, + }), + }, + sub.expr(), + ), + ); } } } } -pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { +pub(super) fn pubsub_remove_node( + tables: &mut Tables, + node: &ZenohId, + net_type: WhatAmI, + send_declare: &mut SendDeclare, +) { match net_type { WhatAmI::Router => { for mut res in hat!(tables) @@ -679,7 +731,7 @@ pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: .cloned() .collect::>>() { - unregister_router_subscription(tables, &mut res, node); + unregister_router_subscription(tables, &mut res, node, send_declare); update_matches_data_routes(tables, &mut res); Resource::clean(&mut res) @@ -697,7 +749,13 @@ pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: let client_subs = res.session_ctxs.values().any(|ctx| ctx.subs.is_some()); let peer_subs = remote_peer_subs(tables, &res); if !client_subs && !peer_subs { - undeclare_router_subscription(tables, None, &mut res, &tables.zid.clone()); + undeclare_router_subscription( + tables, + None, + &mut res, + &tables.zid.clone(), + send_declare, + ); } update_matches_data_routes(tables, &mut res); @@ -757,7 +815,12 @@ pub(super) fn pubsub_tree_change( update_data_routes_from(tables, &mut tables.root_res.clone()); } -pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: &[ZenohId]) { +pub(super) fn pubsub_linkstate_change( + tables: &mut Tables, + zid: &ZenohId, + links: &[ZenohId], + send_declare: &mut SendDeclare, +) { if let Some(src_face) = tables.get_face(zid).cloned() { if hat!(tables).router_peers_failover_brokering && src_face.whatami == WhatAmI::Peer { for res in &face_hat!(src_face).remote_subs { @@ -791,20 +854,23 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; if forget { let wire_expr = Resource::get_best_key(res, "", dst_face.id); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareSubscriber( - UndeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }, - ), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareSubscriber( + UndeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }, + ), + }, + res.expr(), + ), + ); face_hat_mut!(dst_face).local_subs.remove(res); } @@ -816,19 +882,24 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers mode: Mode::Push, }; - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id: 0, // @TODO use proper SubscriberId (#703) - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareSubscriber( + DeclareSubscriber { + id: 0, // @TODO use proper SubscriberId (#703) + wire_expr: key_expr, + ext_info: sub_info, + }, + ), + }, + res.expr(), + ), + ); } } } @@ -879,23 +950,24 @@ impl HatPubSubTrait for HatCode { res: &mut Arc, sub_info: &SubscriberInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { match face.whatami { WhatAmI::Router => { if let Some(router) = get_router(tables, face, node_id) { - declare_router_subscription(tables, face, res, sub_info, router) + declare_router_subscription(tables, face, res, sub_info, router, send_declare) } } WhatAmI::Peer => { if hat!(tables).full_net(WhatAmI::Peer) { if let Some(peer) = get_peer(tables, face, node_id) { - declare_peer_subscription(tables, face, res, sub_info, peer) + declare_peer_subscription(tables, face, res, sub_info, peer, send_declare) } } else { - declare_client_subscription(tables, face, res, sub_info) + declare_client_subscription(tables, face, res, sub_info, send_declare) } } - _ => declare_client_subscription(tables, face, res, sub_info), + _ => declare_client_subscription(tables, face, res, sub_info, send_declare), } } @@ -905,23 +977,24 @@ impl HatPubSubTrait for HatCode { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ) { match face.whatami { WhatAmI::Router => { if let Some(router) = get_router(tables, face, node_id) { - forget_router_subscription(tables, face, res, &router) + forget_router_subscription(tables, face, res, &router, send_declare) } } WhatAmI::Peer => { if hat!(tables).full_net(WhatAmI::Peer) { if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_subscription(tables, face, res, &peer) + forget_peer_subscription(tables, face, res, &peer, send_declare) } } else { - forget_client_subscription(tables, face, res) + forget_client_subscription(tables, face, res, send_declare) } } - _ => forget_client_subscription(tables, face, res), + _ => forget_client_subscription(tables, face, res, send_declare), } } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 76ddba7235..ac7840fbe8 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -19,7 +19,7 @@ use crate::net::routing::dispatcher::queries::*; use crate::net::routing::dispatcher::resource::{NodeId, Resource, SessionContext}; use crate::net::routing::dispatcher::tables::Tables; use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; -use crate::net::routing::hat::{HatQueriesTrait, Sources}; +use crate::net::routing::hat::{HatQueriesTrait, SendDeclare, Sources}; use crate::net::routing::router::RoutesIndexes; use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; @@ -233,6 +233,7 @@ fn propagate_simple_queryable( tables: &mut Tables, res: &Arc, src_face: Option<&mut Arc>, + send_declare: &mut SendDeclare, ) { let full_peers_net = hat!(tables).full_net(WhatAmI::Peer); let faces = tables.faces.values().cloned(); @@ -256,19 +257,22 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, &mut dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } @@ -317,6 +321,7 @@ fn register_router_queryable( res: &mut Arc, qabl_info: &QueryableInfo, router: ZenohId, + send_declare: &mut SendDeclare, ) { let current_info = res_hat!(res).router_qabls.get(&router); if current_info.is_none() || current_info.unwrap() != qabl_info { @@ -351,7 +356,7 @@ fn register_router_queryable( } // Propagate queryable to clients - propagate_simple_queryable(tables, res, face); + propagate_simple_queryable(tables, res, face, send_declare); } fn declare_router_queryable( @@ -360,8 +365,9 @@ fn declare_router_queryable( res: &mut Arc, qabl_info: &QueryableInfo, router: ZenohId, + send_declare: &mut SendDeclare, ) { - register_router_queryable(tables, Some(face), res, qabl_info, router); + register_router_queryable(tables, Some(face), res, qabl_info, router, send_declare); } fn register_peer_queryable( @@ -391,12 +397,13 @@ fn declare_peer_queryable( res: &mut Arc, qabl_info: &QueryableInfo, peer: ZenohId, + send_declare: &mut SendDeclare, ) { let mut face = Some(face); register_peer_queryable(tables, face.as_deref_mut(), res, qabl_info, peer); let local_info = local_router_qabl_info(tables, res); let zid = tables.zid; - register_router_queryable(tables, face, res, &local_info, zid); + register_router_queryable(tables, face, res, &local_info, zid, send_declare); } fn register_client_queryable( @@ -431,11 +438,12 @@ fn declare_client_queryable( face: &mut Arc, res: &mut Arc, qabl_info: &QueryableInfo, + send_declare: &mut SendDeclare, ) { register_client_queryable(tables, face, res, qabl_info); let local_details = local_router_qabl_info(tables, res); let zid = tables.zid; - register_router_queryable(tables, Some(face), res, &local_details, zid); + register_router_queryable(tables, Some(face), res, &local_details, zid, send_declare); } #[inline] @@ -510,29 +518,40 @@ fn send_forget_sourced_queryable_to_net_children( } } -fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { +fn propagate_forget_simple_queryable( + tables: &mut Tables, + res: &mut Arc, + send_declare: &mut SendDeclare, +) { for face in tables.faces.values_mut() { if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } } } -fn propagate_forget_simple_queryable_to_peers(tables: &mut Tables, res: &mut Arc) { +fn propagate_forget_simple_queryable_to_peers( + tables: &mut Tables, + res: &mut Arc, + send_declare: &mut SendDeclare, +) { if !hat!(tables).full_net(WhatAmI::Peer) && res_hat!(res).router_qabls.len() == 1 && res_hat!(res).router_qabls.contains_key(&tables.zid) @@ -554,18 +573,21 @@ fn propagate_forget_simple_queryable_to_peers(tables: &mut Tables, res: &mut Arc }) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(&mut face).local_qabls.remove(res); } @@ -609,7 +631,12 @@ fn propagate_forget_sourced_queryable( } } -fn unregister_router_queryable(tables: &mut Tables, res: &mut Arc, router: &ZenohId) { +fn unregister_router_queryable( + tables: &mut Tables, + res: &mut Arc, + router: &ZenohId, + send_declare: &mut SendDeclare, +) { tracing::debug!( "Unregister router queryable {} (router: {})", res.expr(), @@ -625,10 +652,10 @@ fn unregister_router_queryable(tables: &mut Tables, res: &mut Arc, rou if hat!(tables).full_net(WhatAmI::Peer) { undeclare_peer_queryable(tables, None, res, &tables.zid.clone()); } - propagate_forget_simple_queryable(tables, res); + propagate_forget_simple_queryable(tables, res, send_declare); } - propagate_forget_simple_queryable_to_peers(tables, res); + propagate_forget_simple_queryable_to_peers(tables, res, send_declare); } fn undeclare_router_queryable( @@ -636,9 +663,10 @@ fn undeclare_router_queryable( face: Option<&Arc>, res: &mut Arc, router: &ZenohId, + send_declare: &mut SendDeclare, ) { if res_hat!(res).router_qabls.contains_key(router) { - unregister_router_queryable(tables, res, router); + unregister_router_queryable(tables, res, router, send_declare); propagate_forget_sourced_queryable(tables, res, face, router, WhatAmI::Router); } } @@ -648,8 +676,9 @@ fn forget_router_queryable( face: &mut Arc, res: &mut Arc, router: &ZenohId, + send_declare: &mut SendDeclare, ) { - undeclare_router_queryable(tables, Some(face), res, router); + undeclare_router_queryable(tables, Some(face), res, router, send_declare); } fn unregister_peer_queryable(tables: &mut Tables, res: &mut Arc, peer: &ZenohId) { @@ -680,6 +709,7 @@ fn forget_peer_queryable( face: &mut Arc, res: &mut Arc, peer: &ZenohId, + send_declare: &mut SendDeclare, ) { undeclare_peer_queryable(tables, Some(face), res, peer); @@ -687,10 +717,10 @@ fn forget_peer_queryable( let peer_qabls = remote_peer_qabls(tables, res); let zid = tables.zid; if !client_qabls && !peer_qabls { - undeclare_router_queryable(tables, None, res, &zid); + undeclare_router_queryable(tables, None, res, &zid, send_declare); } else { let local_info = local_router_qabl_info(tables, res); - register_router_queryable(tables, None, res, &local_info, zid); + register_router_queryable(tables, None, res, &local_info, zid, send_declare); } } @@ -698,6 +728,7 @@ pub(super) fn undeclare_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { tracing::debug!("Unregister client queryable {} for {}", res.expr(), face); if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { @@ -712,29 +743,32 @@ pub(super) fn undeclare_client_queryable( let peer_qabls = remote_peer_qabls(tables, res); if client_qabls.is_empty() && !peer_qabls { - undeclare_router_queryable(tables, None, res, &tables.zid.clone()); + undeclare_router_queryable(tables, None, res, &tables.zid.clone(), send_declare); } else { let local_info = local_router_qabl_info(tables, res); - register_router_queryable(tables, None, res, &local_info, tables.zid); - propagate_forget_simple_queryable_to_peers(tables, res); + register_router_queryable(tables, None, res, &local_info, tables.zid, send_declare); + propagate_forget_simple_queryable_to_peers(tables, res, send_declare); } if client_qabls.len() == 1 && !router_qabls && !peer_qabls { let face = &mut client_qabls[0]; if face_hat!(face).local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }), + }, + res.expr(), + ), + ); face_hat_mut!(face).local_qabls.remove(res); } @@ -745,30 +779,38 @@ fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, res: &mut Arc, + send_declare: &mut SendDeclare, ) { - undeclare_client_queryable(tables, face, res); + undeclare_client_queryable(tables, face, res, send_declare); } -pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { +pub(super) fn queries_new_face( + tables: &mut Tables, + face: &mut Arc, + send_declare: &mut SendDeclare, +) { if face.whatami == WhatAmI::Client { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + ), + ); } } } else if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { @@ -785,25 +827,33 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let info = local_qabl_info(tables, qabl, face); face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); + send_declare( + &face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + qabl.expr(), + ), + ); } } } } -pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { +pub(super) fn queries_remove_node( + tables: &mut Tables, + node: &ZenohId, + net_type: WhatAmI, + send_declare: &mut SendDeclare, +) { match net_type { WhatAmI::Router => { let mut qabls = vec![]; @@ -815,7 +865,7 @@ pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: } } for mut res in qabls { - unregister_router_queryable(tables, &mut res, node); + unregister_router_queryable(tables, &mut res, node, send_declare); update_matches_query_routes(tables, &res); Resource::clean(&mut res); @@ -836,10 +886,23 @@ pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: let client_qabls = res.session_ctxs.values().any(|ctx| ctx.qabl.is_some()); let peer_qabls = remote_peer_qabls(tables, &res); if !client_qabls && !peer_qabls { - undeclare_router_queryable(tables, None, &mut res, &tables.zid.clone()); + undeclare_router_queryable( + tables, + None, + &mut res, + &tables.zid.clone(), + send_declare, + ); } else { let local_info = local_router_qabl_info(tables, &res); - register_router_queryable(tables, None, &mut res, &local_info, tables.zid); + register_router_queryable( + tables, + None, + &mut res, + &local_info, + tables.zid, + send_declare, + ); } update_matches_query_routes(tables, &res); @@ -850,7 +913,12 @@ pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: } } -pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: &[ZenohId]) { +pub(super) fn queries_linkstate_change( + tables: &mut Tables, + zid: &ZenohId, + links: &[ZenohId], + send_declare: &mut SendDeclare, +) { if let Some(src_face) = tables.get_face(zid) { if hat!(tables).router_peers_failover_brokering && src_face.whatami == WhatAmI::Peer { for res in &face_hat!(src_face).remote_qabls { @@ -884,20 +952,23 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links }; if forget { let wire_expr = Resource::get_best_key(res, "", dst_face.id); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable( - UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }, - ), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable( + UndeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + ext_wire_expr: WireExprType { wire_expr }, + }, + ), + }, + res.expr(), + ), + ); face_hat_mut!(dst_face).local_qabls.remove(res); } @@ -908,19 +979,22 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - wire_expr: key_expr, - ext_info: info, - }), - }, - res.expr(), - )); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareQueryable(DeclareQueryable { + id: 0, // @TODO use proper QueryableId (#703) + wire_expr: key_expr, + ext_info: info, + }), + }, + res.expr(), + ), + ); } } } @@ -1027,23 +1101,24 @@ impl HatQueriesTrait for HatCode { res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, + send_declare: &mut SendDeclare, ) { match face.whatami { WhatAmI::Router => { if let Some(router) = get_router(tables, face, node_id) { - declare_router_queryable(tables, face, res, qabl_info, router) + declare_router_queryable(tables, face, res, qabl_info, router, send_declare) } } WhatAmI::Peer => { if hat!(tables).full_net(WhatAmI::Peer) { if let Some(peer) = get_peer(tables, face, node_id) { - declare_peer_queryable(tables, face, res, qabl_info, peer) + declare_peer_queryable(tables, face, res, qabl_info, peer, send_declare) } } else { - declare_client_queryable(tables, face, res, qabl_info) + declare_client_queryable(tables, face, res, qabl_info, send_declare) } } - _ => declare_client_queryable(tables, face, res, qabl_info), + _ => declare_client_queryable(tables, face, res, qabl_info, send_declare), } } @@ -1053,23 +1128,24 @@ impl HatQueriesTrait for HatCode { face: &mut Arc, res: &mut Arc, node_id: NodeId, + send_declare: &mut SendDeclare, ) { match face.whatami { WhatAmI::Router => { if let Some(router) = get_router(tables, face, node_id) { - forget_router_queryable(tables, face, res, &router) + forget_router_queryable(tables, face, res, &router, send_declare) } } WhatAmI::Peer => { if hat!(tables).full_net(WhatAmI::Peer) { if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_queryable(tables, face, res, &peer) + forget_peer_queryable(tables, face, res, &peer, send_declare) } } else { - forget_client_queryable(tables, face, res) + forget_client_queryable(tables, face, res, send_declare) } } - _ => forget_client_queryable(tables, face, res), + _ => forget_client_queryable(tables, face, res, send_declare), } } diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 87766f021b..d8a5ee4526 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -101,11 +101,17 @@ impl Router { tables: self.tables.clone(), state: newface, }; + let mut declares = vec![]; ctrl_lock - .new_local_face(&mut tables, &self.tables, &mut face) + .new_local_face(&mut tables, &self.tables, &mut face, &mut |p, m| { + declares.push((p.clone(), m)) + }) .unwrap(); drop(tables); drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } Arc::new(face) } @@ -157,7 +163,19 @@ impl Router { let _ = mux.face.set(Face::downgrade(&face)); - ctrl_lock.new_transport_unicast_face(&mut tables, &self.tables, &mut face, &transport)?; + let mut declares = vec![]; + ctrl_lock.new_transport_unicast_face( + &mut tables, + &self.tables, + &mut face, + &transport, + &mut |p, m| declares.push((p.clone(), m)), + )?; + drop(tables); + drop(ctrl_lock); + for (p, m) in declares { + p.send_declare(m); + } Ok(Arc::new(DeMux::new(face, Some(transport), ingress))) } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index bc889d720e..b6da5a2391 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -69,6 +69,7 @@ fn base_test() { &WireExpr::from(1).with_suffix("four/five"), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); Tables::print(&zread!(tables.tables)); @@ -244,6 +245,7 @@ async fn clean_test() { &"todrop1/todrop11".into(), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); let optres2 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1/todrop11") .map(|res| Arc::downgrade(&res)); @@ -258,6 +260,7 @@ async fn clean_test() { &WireExpr::from(1).with_suffix("/todrop12"), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); let optres3 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop1/todrop12") .map(|res| Arc::downgrade(&res)); @@ -272,6 +275,7 @@ async fn clean_test() { &mut face0.upgrade().unwrap(), &WireExpr::from(1).with_suffix("/todrop12"), NodeId::default(), + &mut |p, m| p.send_declare(m), ); println!("COUNT2: {}", res3.strong_count()); @@ -286,6 +290,7 @@ async fn clean_test() { &mut face0.upgrade().unwrap(), &"todrop1/todrop11".into(), NodeId::default(), + &mut |p, m| p.send_declare(m), ); assert!(res1.upgrade().is_some()); assert!(res2.upgrade().is_none()); @@ -305,6 +310,7 @@ async fn clean_test() { &"todrop3".into(), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop3") .map(|res| Arc::downgrade(&res)); @@ -318,6 +324,7 @@ async fn clean_test() { &mut face0.upgrade().unwrap(), &"todrop3".into(), NodeId::default(), + &mut |p, m| p.send_declare(m), ); assert!(res1.upgrade().is_some()); @@ -334,6 +341,7 @@ async fn clean_test() { &"todrop5".into(), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -342,6 +350,7 @@ async fn clean_test() { &"todrop6".into(), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); let optres1 = Resource::get_resource(zread!(tables.tables)._get_root(), "todrop4") @@ -519,6 +528,7 @@ fn client_test() { &WireExpr::from(11).with_suffix("/**"), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); register_expr( &tables, @@ -566,6 +576,7 @@ fn client_test() { &WireExpr::from(21).with_suffix("/**"), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); register_expr( &tables, @@ -613,6 +624,7 @@ fn client_test() { &WireExpr::from(31).with_suffix("/**"), &sub_info, NodeId::default(), + &mut |p, m| p.send_declare(m), ); primitives0.clear_data();