From 3c5d4debc7d10f7fc2800de405183e305dc880c2 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 22 May 2024 17:00:40 +0200 Subject: [PATCH 1/6] Implement interest protocol between peers and routers --- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 237 +++++++++--------- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 10 + zenoh/src/net/routing/hat/router/pubsub.rs | 188 +++++++------- zenoh/src/net/routing/hat/router/queries.rs | 114 +++++---- 4 files changed, 291 insertions(+), 258 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index e46ff3ff16..fe78675975 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -24,7 +24,8 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode}, + interest::{InterestId, InterestMode, InterestOptions}, + Interest, }, }; use zenoh_sync::get_mut_unchecked; @@ -34,11 +35,11 @@ use crate::{ key_expr::KeyExpr, net::routing::{ dispatcher::{ - face::FaceState, + face::{FaceState, InterestState}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, Sources}, + hat::{HatPubSubTrait, Sources}, router::{update_data_routes_from, RoutesIndexes}, RoutingContext, PREFIX_LIVELINESS, }, @@ -374,133 +375,91 @@ impl HatPubSubTrait for HatCode { mode: InterestMode, aggregate: bool, ) { - if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - if let Some(res) = res.as_ref() { - if aggregate { - if tables.faces.values().any(|src_face| { - src_face.id != face.id - && face_hat!(src_face) - .remote_subs - .values() - .any(|sub| sub.context.is_some() && sub.matches(res)) - }) { - let id = if mode.future() { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert((*res).clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(res, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }), - }, - res.expr(), - )); - } - } else { - for src_face in tables - .faces + face_hat_mut!(face) + .remote_sub_interests + .insert(id, (res.as_ref().map(|res| (*res).clone()), aggregate)); + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami == WhatAmI::Router) + { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(dst_face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: mode == InterestMode::Future, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } + + fn undeclare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + if let Some(interest) = face_hat_mut!(face).remote_sub_interests.remove(&id) { + if !tables.faces.values().any(|f| { + f.whatami == WhatAmI::Client + && face_hat!(f) + .remote_sub_interests .values() - .cloned() - .collect::>>() - { - if src_face.id != face.id { - for sub in face_hat!(src_face).remote_subs.values() { - if sub.context.is_some() && sub.matches(res) { - let id = if mode.future() { - let id = - face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber( - DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }, - ), - }, - sub.expr(), - )); - } - } - } - } - } - } else { - for src_face in tables + .any(|i| *i == interest) + }) { + for dst_face in tables .faces - .values() - .cloned() - .collect::>>() + .values_mut() + .filter(|f| f.whatami == WhatAmI::Router) { - if src_face.id != face.id { - for sub in face_hat!(src_face).remote_subs.values() { - let id = if mode.future() { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, + for id in dst_face + .local_interests + .keys() + .cloned() + .collect::>() + { + let local_interest = dst_face.local_interests.get(&id).unwrap(); + if local_interest.options.subscribers() + && (local_interest.res == interest.0) + { + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }), }, - sub.expr(), + local_interest + .res + .as_ref() + .map(|res| res.expr()) + .unwrap_or_default(), )); + get_mut_unchecked(dst_face).local_interests.remove(&id); } } } } } - if mode.future() { - face_hat_mut!(face) - .remote_sub_interests - .insert(id, (res.cloned(), aggregate)); - } - } - - fn undeclare_sub_interest( - &self, - _tables: &mut Tables, - face: &mut Arc, - id: InterestId, - ) { - face_hat_mut!(face).remote_sub_interests.remove(&id); } fn declare_subscription( @@ -570,6 +529,50 @@ impl HatPubSubTrait for HatCode { } }; + for face in tables + .faces + .values() + .filter(|f| f.whatami == WhatAmI::Router) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) + }) { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } + for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer && !f diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index caa5f79694..b909190184 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -554,6 +554,16 @@ impl HatQueriesTrait for HatCode { } }; + // TODO: BNestMatching: What if there is a local compete ? + if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: f64::MAX, + }); + } + for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer && !f diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 2af567d989..3b16270c84 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -111,57 +111,37 @@ fn propagate_simple_subscription_to( || hat!(tables).failover_brokering(src_face.zid, dst_face.zid)) } { - if dst_face.whatami != WhatAmI::Client { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); - } else { - let matching_interests = face_hat!(dst_face) - .remote_sub_interests - .values() - .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) - .cloned() - .collect::>, bool)>>(); + let matching_interests = face_hat!(dst_face) + .remote_sub_interests + .values() + .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) + .cloned() + .collect::>, bool)>>(); - for (int_res, aggregate) in matching_interests { - let res = if aggregate { - int_res.as_ref().unwrap_or(res) - } else { - res - }; - if !face_hat!(dst_face).local_subs.contains_key(res) { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); - } + for (int_res, aggregate) in matching_interests { + let res = if aggregate { + int_res.as_ref().unwrap_or(res) + } else { + res + }; + if !face_hat!(dst_face).local_subs.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); } } } @@ -704,42 +684,42 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - - if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - for sub in &hat!(tables).router_subs { - if sub.context.is_some() - && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) - || sub.session_ctxs.values().any(|s| { - s.subs.is_some() - && (s.face.whatami == WhatAmI::Client - || (s.face.whatami == WhatAmI::Peer - && hat!(tables).failover_brokering(s.face.zid, face.zid))) - })) - { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); - } - } - } +pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { + // let sub_info = SubscriberInfo { + // reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers + // }; + + // if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { + // for sub in &hat!(tables).router_subs { + // if sub.context.is_some() + // && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) + // || sub.session_ctxs.values().any(|s| { + // s.subs.is_some() + // && (s.face.whatami == WhatAmI::Client + // || (s.face.whatami == WhatAmI::Peer + // && hat!(tables).failover_brokering(s.face.zid, face.zid))) + // })) + // { + // let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + // face_hat_mut!(face).local_subs.insert(sub.clone(), id); + // let key_expr = Resource::decl_key(sub, face); + // face.primitives.send_declare(RoutingContext::with_expr( + // Declare { + // interest_id: None, + // ext_qos: ext::QoSType::DECLARE, + // ext_tstamp: None, + // ext_nodeid: ext::NodeIdType::DEFAULT, + // body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + // id, + // wire_expr: key_expr, + // ext_info: sub_info, + // }), + // }, + // sub.expr(), + // )); + // } + // } + // } } pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { @@ -919,9 +899,16 @@ impl HatPubSubTrait for HatCode { id: InterestId, res: Option<&mut Arc>, mode: InterestMode, - aggregate: bool, + mut aggregate: bool, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if aggregate && face.whatami == WhatAmI::Peer { + tracing::warn!( + "Received Interest with aggregate=true from peer {}. Not supported!", + face.zid + ); + aggregate = true; + } + if mode.current() { let interest_id = (!mode.future()).then_some(id); let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers @@ -962,9 +949,17 @@ impl HatPubSubTrait for HatCode { for sub in &hat!(tables).router_subs { if sub.context.is_some() && sub.matches(res) - && (remote_client_subs(sub, face) - || remote_peer_subs(tables, sub) - || remote_router_subs(tables, sub)) + && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) + || res_hat!(sub).peer_subs.iter().any(|r| *r != tables.zid) + || sub.session_ctxs.values().any(|s| { + s.face.id != face.id + && s.subs.is_some() + && (s.face.whatami == WhatAmI::Client + || face.whatami == WhatAmI::Client + || (s.face.whatami == WhatAmI::Peer + && hat!(tables) + .failover_brokering(s.face.zid, face.zid))) + })) { let id = if mode.future() { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); @@ -994,9 +989,14 @@ impl HatPubSubTrait for HatCode { } else { for sub in &hat!(tables).router_subs { if sub.context.is_some() - && (remote_client_subs(sub, face) - || remote_peer_subs(tables, sub) - || remote_router_subs(tables, sub)) + && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) + || res_hat!(sub).peer_subs.iter().any(|r| *r != tables.zid) + || sub.session_ctxs.values().any(|s| { + s.subs.is_some() + && (s.face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables).failover_brokering(s.face.zid, face.zid)) + })) { let id = if mode.future() { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 9a2beeb001..03d3e5f8bd 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -241,11 +241,10 @@ fn propagate_simple_queryable( let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) && (current.is_none() || current.unwrap().1 != info) - && (dst_face.whatami != WhatAmI::Client - || face_hat!(dst_face) - .remote_qabl_interests - .values() - .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true))) + && face_hat!(dst_face) + .remote_qabl_interests + .values() + .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true)) && if full_peers_net { dst_face.whatami == WhatAmI::Client } else { @@ -811,41 +810,41 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { - if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - for qabl in hat!(tables).router_qabls.iter() { - if qabl.context.is_some() - && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) - || qabl.session_ctxs.values().any(|s| { - s.qabl.is_some() - && (s.face.whatami == WhatAmI::Client - || (s.face.whatami == WhatAmI::Peer - && hat!(tables).failover_brokering(s.face.zid, face.zid))) - })) - { - let info = local_qabl_info(tables, qabl, face); - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face) - .local_qabls - .insert(qabl.clone(), (id, info)); - let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); - } - } - } +pub(super) fn queries_new_face(_tables: &mut Tables, _face: &mut Arc) { + // if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { + // for qabl in hat!(tables).router_qabls.iter() { + // if qabl.context.is_some() + // && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) + // || qabl.session_ctxs.values().any(|s| { + // s.qabl.is_some() + // && (s.face.whatami == WhatAmI::Client + // || (s.face.whatami == WhatAmI::Peer + // && hat!(tables).failover_brokering(s.face.zid, face.zid))) + // })) + // { + // let info = local_qabl_info(tables, qabl, face); + // let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + // face_hat_mut!(face) + // .local_qabls + // .insert(qabl.clone(), (id, info)); + // let key_expr = Resource::decl_key(qabl, face); + // face.primitives.send_declare(RoutingContext::with_expr( + // Declare { + // interest_id: None, + // ext_qos: ext::QoSType::DECLARE, + // ext_tstamp: None, + // ext_nodeid: ext::NodeIdType::DEFAULT, + // body: DeclareBody::DeclareQueryable(DeclareQueryable { + // id, + // wire_expr: key_expr, + // ext_info: info, + // }), + // }, + // qabl.expr(), + // )); + // } + // } + // } } pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { @@ -1075,8 +1074,15 @@ impl HatQueriesTrait for HatCode { id: InterestId, res: Option<&mut Arc>, mode: InterestMode, - aggregate: bool, + mut aggregate: bool, ) { + if aggregate && face.whatami == WhatAmI::Peer { + tracing::warn!( + "Received Interest with aggregate=true from peer {}. Not supported!", + face.zid + ); + aggregate = true; + } if mode.current() && face.whatami == WhatAmI::Client { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { @@ -1084,9 +1090,17 @@ impl HatQueriesTrait for HatCode { if hat!(tables).router_qabls.iter().any(|qabl| { qabl.context.is_some() && qabl.matches(res) - && (remote_client_qabls(qabl, face) - || remote_peer_qabls(tables, qabl) - || remote_router_qabls(tables, qabl)) + && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) + || res_hat!(qabl).peer_qabls.keys().any(|r| *r != tables.zid) + || qabl.session_ctxs.values().any(|s| { + s.face.id != face.id + && s.qabl.is_some() + && (s.face.whatami == WhatAmI::Client + || face.whatami == WhatAmI::Client + || (s.face.whatami == WhatAmI::Peer + && hat!(tables) + .failover_brokering(s.face.zid, face.zid))) + })) }) { let info = local_qabl_info(tables, res, face); let id = if mode.future() { @@ -1118,9 +1132,15 @@ impl HatQueriesTrait for HatCode { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() && qabl.matches(res) - && (remote_client_qabls(qabl, face) - || remote_peer_qabls(tables, qabl) - || remote_router_qabls(tables, qabl)) + && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) + || res_hat!(qabl).peer_qabls.keys().any(|r| *r != tables.zid) + || qabl.session_ctxs.values().any(|s| { + s.qabl.is_some() + && (s.face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables) + .failover_brokering(s.face.zid, face.zid)) + })) { let info = local_qabl_info(tables, qabl, face); let id = if mode.future() { From d7bb636d3f7c4e3aa7563c2ef85a6740e9182012 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 22 May 2024 17:03:04 +0200 Subject: [PATCH 2/6] Add logger init in matching test --- zenoh/tests/matching.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 1473d7f6fc..1e5448cfb5 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -43,6 +43,7 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_any() -> ZResult<()> { + zenoh_util::try_init_log_from_env(); let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await; let publisher1 = ztimeout!(session1 @@ -95,6 +96,7 @@ async fn zenoh_matching_status_any() -> ZResult<()> { #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_remote() -> ZResult<()> { + zenoh_util::try_init_log_from_env(); let session1 = ztimeout!(zenoh::open(peer())).unwrap(); let session2 = ztimeout!(zenoh::open(peer())).unwrap(); @@ -150,6 +152,7 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_local() -> ZResult<()> { + zenoh_util::try_init_log_from_env(); let session1 = ztimeout!(zenoh::open(config::peer())).unwrap(); let session2 = ztimeout!(zenoh::open(config::peer())).unwrap(); From 555f012ab0b70f99253d24b558dd170a3a6b0ee0 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 24 May 2024 10:13:41 +0200 Subject: [PATCH 3/6] Peers send subscribers interests to newly connected routers --- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 29 +++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index fe78675975..ef092d286a 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -342,7 +342,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers }; - for src_face in tables + for mut src_face in tables .faces .values() .cloned() @@ -357,6 +357,33 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { &mut src_face.clone(), ); } + if face.whatami == WhatAmI::Router { + for (res, _) in face_hat_mut!(&mut src_face).remote_sub_interests.values() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: false, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face)); + face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::CurrentFuture, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } } } // recompute routes From 8a7987c2705998ff6ef6ed970a0ad034a7e5b0aa Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 24 May 2024 15:53:35 +0200 Subject: [PATCH 4/6] Remove commented code --- zenoh/src/net/routing/hat/router/pubsub.rs | 36 +--------------------- 1 file changed, 1 insertion(+), 35 deletions(-) diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 3b16270c84..01d53eb14b 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -685,41 +685,7 @@ fn forget_client_subscription( } pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { - // let sub_info = SubscriberInfo { - // reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - // }; - - // if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - // for sub in &hat!(tables).router_subs { - // if sub.context.is_some() - // && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) - // || sub.session_ctxs.values().any(|s| { - // s.subs.is_some() - // && (s.face.whatami == WhatAmI::Client - // || (s.face.whatami == WhatAmI::Peer - // && hat!(tables).failover_brokering(s.face.zid, face.zid))) - // })) - // { - // let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - // face_hat_mut!(face).local_subs.insert(sub.clone(), id); - // let key_expr = Resource::decl_key(sub, face); - // face.primitives.send_declare(RoutingContext::with_expr( - // Declare { - // interest_id: None, - // ext_qos: ext::QoSType::DECLARE, - // ext_tstamp: None, - // ext_nodeid: ext::NodeIdType::DEFAULT, - // body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - // id, - // wire_expr: key_expr, - // ext_info: sub_info, - // }), - // }, - // sub.expr(), - // )); - // } - // } - // } + // Nothing to do } pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { From f7bf8be322d846fa89a9502f5b030015fbf6cd10 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 30 May 2024 13:44:28 +0200 Subject: [PATCH 5/6] Remove empty functions --- .../src/net/routing/hat/linkstate_peer/mod.rs | 13 +++---- .../net/routing/hat/linkstate_peer/pubsub.rs | 4 -- .../net/routing/hat/linkstate_peer/queries.rs | 4 -- zenoh/src/net/routing/hat/router/mod.rs | 17 +++------ zenoh/src/net/routing/hat/router/pubsub.rs | 4 -- zenoh/src/net/routing/hat/router/queries.rs | 37 ------------------- 6 files changed, 10 insertions(+), 69 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 2b0c8e4ca7..82c2a96166 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -41,8 +41,8 @@ use zenoh_transport::unicast::TransportUnicast; use self::{ network::Network, - pubsub::{pubsub_new_face, pubsub_remove_node, undeclare_client_subscription}, - queries::{queries_new_face, queries_remove_node, undeclare_client_queryable}, + pubsub::{pubsub_remove_node, undeclare_client_subscription}, + queries::{queries_remove_node, undeclare_client_queryable}, }; use super::{ super::dispatcher::{ @@ -212,12 +212,11 @@ impl HatBaseTrait for HatCode { fn new_local_face( &self, - tables: &mut Tables, + _tables: &mut Tables, _tables_ref: &Arc, - face: &mut Face, + _face: &mut Face, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + // Nothing to do Ok(()) } @@ -239,8 +238,6 @@ impl HatBaseTrait for HatCode { }; face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); if face.state.whatami != WhatAmI::Client { hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 67b04661c6..0bd9f62f98 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -554,10 +554,6 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { - // Nothing to do -} - pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId) { for mut res in hat!(tables) .peer_subs diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 9c3d502e5f..b75233409d 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -584,10 +584,6 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(_tables: &mut Tables, _face: &mut Arc) { - // Nothing to do -} - pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId) { let mut qabls = vec![]; for res in hat!(tables).peer_qabls.iter() { diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 910e527bfe..dd7c6e11c7 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -42,12 +42,8 @@ use zenoh_transport::unicast::TransportUnicast; use self::{ network::{shared_nodes, Network}, - pubsub::{ - pubsub_linkstate_change, pubsub_new_face, pubsub_remove_node, undeclare_client_subscription, - }, - queries::{ - queries_linkstate_change, queries_new_face, queries_remove_node, undeclare_client_queryable, - }, + pubsub::{pubsub_linkstate_change, pubsub_remove_node, undeclare_client_subscription}, + queries::{queries_linkstate_change, queries_remove_node, undeclare_client_queryable}, }; use super::{ super::dispatcher::{ @@ -364,12 +360,11 @@ impl HatBaseTrait for HatCode { fn new_local_face( &self, - tables: &mut Tables, + _tables: &mut Tables, _tables_ref: &Arc, - face: &mut Face, + _face: &mut Face, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + // Nothing to do Ok(()) } @@ -404,8 +399,6 @@ impl HatBaseTrait for HatCode { } face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); match face.state.whatami { WhatAmI::Router => { diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 01d53eb14b..233e0b8cdf 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -684,10 +684,6 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { - // Nothing to do -} - pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { match net_type { WhatAmI::Router => { diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 03d3e5f8bd..3ab0ac507d 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -810,43 +810,6 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(_tables: &mut Tables, _face: &mut Arc) { - // if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - // for qabl in hat!(tables).router_qabls.iter() { - // if qabl.context.is_some() - // && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) - // || qabl.session_ctxs.values().any(|s| { - // s.qabl.is_some() - // && (s.face.whatami == WhatAmI::Client - // || (s.face.whatami == WhatAmI::Peer - // && hat!(tables).failover_brokering(s.face.zid, face.zid))) - // })) - // { - // let info = local_qabl_info(tables, qabl, face); - // let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - // face_hat_mut!(face) - // .local_qabls - // .insert(qabl.clone(), (id, info)); - // let key_expr = Resource::decl_key(qabl, face); - // face.primitives.send_declare(RoutingContext::with_expr( - // Declare { - // interest_id: None, - // ext_qos: ext::QoSType::DECLARE, - // ext_tstamp: None, - // ext_nodeid: ext::NodeIdType::DEFAULT, - // body: DeclareBody::DeclareQueryable(DeclareQueryable { - // id, - // wire_expr: key_expr, - // ext_info: info, - // }), - // }, - // qabl.expr(), - // )); - // } - // } - // } -} - pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { match net_type { WhatAmI::Router => { From 3fa2c56604f61a1db077de97c45daa7213417406 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 30 May 2024 14:30:44 +0200 Subject: [PATCH 6/6] Fix code fmt --- zenoh/tests/matching.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 294246a089..db10241cc4 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -147,7 +147,7 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_local() -> ZResult<()> { zenoh_util::try_init_log_from_env(); - + let session1 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); let session2 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap();