diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 2b0c8e4ca7..82c2a96166 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -41,8 +41,8 @@ use zenoh_transport::unicast::TransportUnicast; use self::{ network::Network, - pubsub::{pubsub_new_face, pubsub_remove_node, undeclare_client_subscription}, - queries::{queries_new_face, queries_remove_node, undeclare_client_queryable}, + pubsub::{pubsub_remove_node, undeclare_client_subscription}, + queries::{queries_remove_node, undeclare_client_queryable}, }; use super::{ super::dispatcher::{ @@ -212,12 +212,11 @@ impl HatBaseTrait for HatCode { fn new_local_face( &self, - tables: &mut Tables, + _tables: &mut Tables, _tables_ref: &Arc, - face: &mut Face, + _face: &mut Face, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + // Nothing to do Ok(()) } @@ -239,8 +238,6 @@ impl HatBaseTrait for HatCode { }; face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); if face.state.whatami != WhatAmI::Client { hat_mut!(tables).schedule_compute_trees(tables_ref.clone()); diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 67b04661c6..0bd9f62f98 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -554,10 +554,6 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(_tables: &mut Tables, _face: &mut Arc) { - // Nothing to do -} - pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId) { for mut res in hat!(tables) .peer_subs diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 9c3d502e5f..b75233409d 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -584,10 +584,6 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(_tables: &mut Tables, _face: &mut Arc) { - // Nothing to do -} - pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId) { let mut qabls = vec![]; for res in hat!(tables).peer_qabls.iter() { diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index e46ff3ff16..ef092d286a 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -24,7 +24,8 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode}, + interest::{InterestId, InterestMode, InterestOptions}, + Interest, }, }; use zenoh_sync::get_mut_unchecked; @@ -34,11 +35,11 @@ use crate::{ key_expr::KeyExpr, net::routing::{ dispatcher::{ - face::FaceState, + face::{FaceState, InterestState}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, Sources}, + hat::{HatPubSubTrait, Sources}, router::{update_data_routes_from, RoutesIndexes}, RoutingContext, PREFIX_LIVELINESS, }, @@ -341,7 +342,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers }; - for src_face in tables + for mut src_face in tables .faces .values() .cloned() @@ -356,6 +357,33 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { &mut src_face.clone(), ); } + if face.whatami == WhatAmI::Router { + for (res, _) in face_hat_mut!(&mut src_face).remote_sub_interests.values() { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: false, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face)); + face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::CurrentFuture, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } } } // recompute routes @@ -374,133 +402,91 @@ impl HatPubSubTrait for HatCode { mode: InterestMode, aggregate: bool, ) { - if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - if let Some(res) = res.as_ref() { - if aggregate { - if tables.faces.values().any(|src_face| { - src_face.id != face.id - && face_hat!(src_face) - .remote_subs - .values() - .any(|sub| sub.context.is_some() && sub.matches(res)) - }) { - let id = if mode.future() { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert((*res).clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(res, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }), - }, - res.expr(), - )); - } - } else { - for src_face in tables - .faces + face_hat_mut!(face) + .remote_sub_interests + .insert(id, (res.as_ref().map(|res| (*res).clone()), aggregate)); + for dst_face in tables + .faces + .values_mut() + .filter(|f| f.whatami == WhatAmI::Router) + { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + let options = InterestOptions::KEYEXPRS + InterestOptions::SUBSCRIBERS; + get_mut_unchecked(dst_face).local_interests.insert( + id, + InterestState { + options, + res: res.as_ref().map(|res| (*res).clone()), + finalized: mode == InterestMode::Future, + }, + ); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode, + options, + wire_expr, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + }, + res.as_ref().map(|res| res.expr()).unwrap_or_default(), + )); + } + } + + fn undeclare_sub_interest( + &self, + tables: &mut Tables, + face: &mut Arc, + id: InterestId, + ) { + if let Some(interest) = face_hat_mut!(face).remote_sub_interests.remove(&id) { + if !tables.faces.values().any(|f| { + f.whatami == WhatAmI::Client + && face_hat!(f) + .remote_sub_interests .values() - .cloned() - .collect::>>() - { - if src_face.id != face.id { - for sub in face_hat!(src_face).remote_subs.values() { - if sub.context.is_some() && sub.matches(res) { - let id = if mode.future() { - let id = - face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber( - DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }, - ), - }, - sub.expr(), - )); - } - } - } - } - } - } else { - for src_face in tables + .any(|i| *i == interest) + }) { + for dst_face in tables .faces - .values() - .cloned() - .collect::>>() + .values_mut() + .filter(|f| f.whatami == WhatAmI::Router) { - if src_face.id != face.id { - for sub in face_hat!(src_face).remote_subs.values() { - let id = if mode.future() { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - id - } else { - 0 - }; - let wire_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id, + for id in dst_face + .local_interests + .keys() + .cloned() + .collect::>() + { + let local_interest = dst_face.local_interests.get(&id).unwrap(); + if local_interest.options.subscribers() + && (local_interest.res == interest.0) + { + dst_face.primitives.send_interest(RoutingContext::with_expr( + Interest { + id, + mode: InterestMode::Final, + options: InterestOptions::empty(), + wire_expr: None, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }), }, - sub.expr(), + local_interest + .res + .as_ref() + .map(|res| res.expr()) + .unwrap_or_default(), )); + get_mut_unchecked(dst_face).local_interests.remove(&id); } } } } } - if mode.future() { - face_hat_mut!(face) - .remote_sub_interests - .insert(id, (res.cloned(), aggregate)); - } - } - - fn undeclare_sub_interest( - &self, - _tables: &mut Tables, - face: &mut Arc, - id: InterestId, - ) { - face_hat_mut!(face).remote_sub_interests.remove(&id); } fn declare_subscription( @@ -570,6 +556,50 @@ impl HatPubSubTrait for HatCode { } }; + for face in tables + .faces + .values() + .filter(|f| f.whatami == WhatAmI::Router) + { + if face.local_interests.values().any(|interest| { + interest.finalized + && interest.options.subscribers() + && interest + .res + .as_ref() + .map(|res| { + KeyExpr::try_from(res.expr()) + .and_then(|intres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| intres.includes(&putres)) + }) + .unwrap_or(false) + }) + .unwrap_or(true) + }) { + if face_hat!(face).remote_subs.values().any(|sub| { + KeyExpr::try_from(sub.expr()) + .and_then(|subres| { + KeyExpr::try_from(expr.full_expr()) + .map(|putres| subres.intersects(&putres)) + }) + .unwrap_or(false) + }) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } else { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.insert( + face.id, + (face.clone(), key_expr.to_owned(), NodeId::default()), + ); + } + } + for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer && !f diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index caa5f79694..b909190184 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -554,6 +554,16 @@ impl HatQueriesTrait for HatCode { } }; + // TODO: BNestMatching: What if there is a local compete ? + if let Some(face) = tables.faces.values().find(|f| f.whatami == WhatAmI::Router) { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: (face.clone(), key_expr.to_owned(), NodeId::default()), + complete: 0, + distance: f64::MAX, + }); + } + for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer && !f diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 910e527bfe..dd7c6e11c7 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -42,12 +42,8 @@ use zenoh_transport::unicast::TransportUnicast; use self::{ network::{shared_nodes, Network}, - pubsub::{ - pubsub_linkstate_change, pubsub_new_face, pubsub_remove_node, undeclare_client_subscription, - }, - queries::{ - queries_linkstate_change, queries_new_face, queries_remove_node, undeclare_client_queryable, - }, + pubsub::{pubsub_linkstate_change, pubsub_remove_node, undeclare_client_subscription}, + queries::{queries_linkstate_change, queries_remove_node, undeclare_client_queryable}, }; use super::{ super::dispatcher::{ @@ -364,12 +360,11 @@ impl HatBaseTrait for HatCode { fn new_local_face( &self, - tables: &mut Tables, + _tables: &mut Tables, _tables_ref: &Arc, - face: &mut Face, + _face: &mut Face, ) -> ZResult<()> { - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); + // Nothing to do Ok(()) } @@ -404,8 +399,6 @@ impl HatBaseTrait for HatCode { } face_hat_mut!(&mut face.state).link_id = link_id; - pubsub_new_face(tables, &mut face.state); - queries_new_face(tables, &mut face.state); match face.state.whatami { WhatAmI::Router => { diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 2af567d989..233e0b8cdf 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -111,57 +111,37 @@ fn propagate_simple_subscription_to( || hat!(tables).failover_brokering(src_face.zid, dst_face.zid)) } { - if dst_face.whatami != WhatAmI::Client { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); - } else { - let matching_interests = face_hat!(dst_face) - .remote_sub_interests - .values() - .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) - .cloned() - .collect::>, bool)>>(); + let matching_interests = face_hat!(dst_face) + .remote_sub_interests + .values() + .filter(|si| si.0.as_ref().map(|si| si.matches(res)).unwrap_or(true)) + .cloned() + .collect::>, bool)>>(); - for (int_res, aggregate) in matching_interests { - let res = if aggregate { - int_res.as_ref().unwrap_or(res) - } else { - res - }; - if !face_hat!(dst_face).local_subs.contains_key(res) { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); - dst_face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: *sub_info, - }), - }, - res.expr(), - )); - } + for (int_res, aggregate) in matching_interests { + let res = if aggregate { + int_res.as_ref().unwrap_or(res) + } else { + res + }; + if !face_hat!(dst_face).local_subs.contains_key(res) { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); + let key_expr = Resource::decl_key(res, dst_face); + dst_face.primitives.send_declare(RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareSubscriber(DeclareSubscriber { + id, + wire_expr: key_expr, + ext_info: *sub_info, + }), + }, + res.expr(), + )); } } } @@ -704,44 +684,6 @@ fn forget_client_subscription( } } -pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; - - if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - for sub in &hat!(tables).router_subs { - if sub.context.is_some() - && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) - || sub.session_ctxs.values().any(|s| { - s.subs.is_some() - && (s.face.whatami == WhatAmI::Client - || (s.face.whatami == WhatAmI::Peer - && hat!(tables).failover_brokering(s.face.zid, face.zid))) - })) - { - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face).local_subs.insert(sub.clone(), id); - let key_expr = Resource::decl_key(sub, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareSubscriber(DeclareSubscriber { - id, - wire_expr: key_expr, - ext_info: sub_info, - }), - }, - sub.expr(), - )); - } - } - } -} - pub(super) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { match net_type { WhatAmI::Router => { @@ -919,9 +861,16 @@ impl HatPubSubTrait for HatCode { id: InterestId, res: Option<&mut Arc>, mode: InterestMode, - aggregate: bool, + mut aggregate: bool, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if aggregate && face.whatami == WhatAmI::Peer { + tracing::warn!( + "Received Interest with aggregate=true from peer {}. Not supported!", + face.zid + ); + aggregate = true; + } + if mode.current() { let interest_id = (!mode.future()).then_some(id); let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers @@ -962,9 +911,17 @@ impl HatPubSubTrait for HatCode { for sub in &hat!(tables).router_subs { if sub.context.is_some() && sub.matches(res) - && (remote_client_subs(sub, face) - || remote_peer_subs(tables, sub) - || remote_router_subs(tables, sub)) + && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) + || res_hat!(sub).peer_subs.iter().any(|r| *r != tables.zid) + || sub.session_ctxs.values().any(|s| { + s.face.id != face.id + && s.subs.is_some() + && (s.face.whatami == WhatAmI::Client + || face.whatami == WhatAmI::Client + || (s.face.whatami == WhatAmI::Peer + && hat!(tables) + .failover_brokering(s.face.zid, face.zid))) + })) { let id = if mode.future() { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); @@ -994,9 +951,14 @@ impl HatPubSubTrait for HatCode { } else { for sub in &hat!(tables).router_subs { if sub.context.is_some() - && (remote_client_subs(sub, face) - || remote_peer_subs(tables, sub) - || remote_router_subs(tables, sub)) + && (res_hat!(sub).router_subs.iter().any(|r| *r != tables.zid) + || res_hat!(sub).peer_subs.iter().any(|r| *r != tables.zid) + || sub.session_ctxs.values().any(|s| { + s.subs.is_some() + && (s.face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables).failover_brokering(s.face.zid, face.zid)) + })) { let id = if mode.future() { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 9a2beeb001..3ab0ac507d 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -241,11 +241,10 @@ fn propagate_simple_queryable( let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) && (current.is_none() || current.unwrap().1 != info) - && (dst_face.whatami != WhatAmI::Client - || face_hat!(dst_face) - .remote_qabl_interests - .values() - .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true))) + && face_hat!(dst_face) + .remote_qabl_interests + .values() + .any(|si| si.as_ref().map(|si| si.matches(res)).unwrap_or(true)) && if full_peers_net { dst_face.whatami == WhatAmI::Client } else { @@ -811,43 +810,6 @@ fn forget_client_queryable( } } -pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { - if face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer) { - for qabl in hat!(tables).router_qabls.iter() { - if qabl.context.is_some() - && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) - || qabl.session_ctxs.values().any(|s| { - s.qabl.is_some() - && (s.face.whatami == WhatAmI::Client - || (s.face.whatami == WhatAmI::Peer - && hat!(tables).failover_brokering(s.face.zid, face.zid))) - })) - { - let info = local_qabl_info(tables, qabl, face); - let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(face) - .local_qabls - .insert(qabl.clone(), (id, info)); - let key_expr = Resource::decl_key(qabl, face); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareQueryable(DeclareQueryable { - id, - wire_expr: key_expr, - ext_info: info, - }), - }, - qabl.expr(), - )); - } - } - } -} - pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI) { match net_type { WhatAmI::Router => { @@ -1075,8 +1037,15 @@ impl HatQueriesTrait for HatCode { id: InterestId, res: Option<&mut Arc>, mode: InterestMode, - aggregate: bool, + mut aggregate: bool, ) { + if aggregate && face.whatami == WhatAmI::Peer { + tracing::warn!( + "Received Interest with aggregate=true from peer {}. Not supported!", + face.zid + ); + aggregate = true; + } if mode.current() && face.whatami == WhatAmI::Client { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { @@ -1084,9 +1053,17 @@ impl HatQueriesTrait for HatCode { if hat!(tables).router_qabls.iter().any(|qabl| { qabl.context.is_some() && qabl.matches(res) - && (remote_client_qabls(qabl, face) - || remote_peer_qabls(tables, qabl) - || remote_router_qabls(tables, qabl)) + && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) + || res_hat!(qabl).peer_qabls.keys().any(|r| *r != tables.zid) + || qabl.session_ctxs.values().any(|s| { + s.face.id != face.id + && s.qabl.is_some() + && (s.face.whatami == WhatAmI::Client + || face.whatami == WhatAmI::Client + || (s.face.whatami == WhatAmI::Peer + && hat!(tables) + .failover_brokering(s.face.zid, face.zid))) + })) }) { let info = local_qabl_info(tables, res, face); let id = if mode.future() { @@ -1118,9 +1095,15 @@ impl HatQueriesTrait for HatCode { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() && qabl.matches(res) - && (remote_client_qabls(qabl, face) - || remote_peer_qabls(tables, qabl) - || remote_router_qabls(tables, qabl)) + && (res_hat!(qabl).router_qabls.keys().any(|r| *r != tables.zid) + || res_hat!(qabl).peer_qabls.keys().any(|r| *r != tables.zid) + || qabl.session_ctxs.values().any(|s| { + s.qabl.is_some() + && (s.face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables) + .failover_brokering(s.face.zid, face.zid)) + })) { let info = local_qabl_info(tables, qabl, face); let id = if mode.future() { diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 339bc196b1..db10241cc4 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -39,6 +39,7 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_any() -> ZResult<()> { + zenoh_util::try_init_log_from_env(); let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await; let publisher1 = ztimeout!(session1 @@ -90,8 +91,9 @@ async fn zenoh_matching_status_any() -> ZResult<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_remote() -> ZResult<()> { - let session1 = ztimeout!(zenoh::open(config::peer())).unwrap(); + zenoh_util::try_init_log_from_env(); + let session1 = ztimeout!(zenoh::open(config::peer())).unwrap(); let session2 = ztimeout!(zenoh::open(config::peer())).unwrap(); let publisher1 = ztimeout!(session1 @@ -144,8 +146,9 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_local() -> ZResult<()> { - let session1 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); + zenoh_util::try_init_log_from_env(); + let session1 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); let session2 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); let publisher1 = ztimeout!(session1