From cb5f0c2ce6ebe782feee85f418051cc3f66aec7c Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 19 Dec 2024 12:25:52 +0100 Subject: [PATCH] Apply gossip_target option to routers and linkstate peers --- .../src/net/routing/hat/linkstate_peer/mod.rs | 2 + .../net/routing/hat/linkstate_peer/network.rs | 77 +++++++++++-------- zenoh/src/net/routing/hat/router/mod.rs | 3 + zenoh/src/net/routing/hat/router/network.rs | 77 +++++++++++-------- 4 files changed, 93 insertions(+), 66 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index c07ecbae0b..5644f0be0b 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -185,6 +185,7 @@ impl HatBaseTrait for HatCode { let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); + let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami)); let autoconnect = if gossip { *unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami)) } else { @@ -205,6 +206,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/network.rs b/zenoh/src/net/routing/hat/linkstate_peer/network.rs index bfa7ccf969..9eca0d100e 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/network.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/network.rs @@ -39,7 +39,7 @@ use crate::net::{ runtime::{Runtime, WeakRuntime}, }; -#[derive(Clone)] +#[derive(Clone, Default)] struct Details { zid: bool, locators: bool, @@ -119,6 +119,7 @@ pub(super) struct Network { pub(super) router_peers_failover_brokering: bool, pub(super) gossip: bool, pub(super) gossip_multihop: bool, + pub(super) gossip_target: WhatAmIMatcher, pub(super) autoconnect: WhatAmIMatcher, pub(super) idx: NodeIndex, pub(super) links: VecMap, @@ -138,6 +139,7 @@ impl Network { router_peers_failover_brokering: bool, gossip: bool, gossip_multihop: bool, + gossip_target: WhatAmIMatcher, autoconnect: WhatAmIMatcher, ) -> Self { let mut graph = petgraph::stable_graph::StableGraph::default(); @@ -155,6 +157,7 @@ impl Network { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, idx, links: VecMap::new(), @@ -225,7 +228,7 @@ impl Network { idx } - fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState { + fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState { let links = if details.links { self.graph[idx] .links @@ -268,10 +271,10 @@ impl Network { } } - fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result { + fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result { let mut link_states = vec![]; for (idx, details) in idxs { - link_states.push(self.make_link_state(idx, details)); + link_states.push(self.make_link_state(*idx, details)); } let codec = Zenoh080Routing::new(); let mut buf = ZBuf::empty(); @@ -285,8 +288,11 @@ impl Network { .into()) } - fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { - if let Ok(msg) = self.make_msg(idxs) { + fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, transport); + } + if let Ok(msg) = self.make_msg(&idxs) { tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); if let Err(e) = transport.schedule(msg) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); @@ -296,21 +302,24 @@ impl Network { } } - fn send_on_links

(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P) + fn send_on_links

(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P) where P: FnMut(&Link) -> bool, { - if let Ok(msg) = self.make_msg(idxs) { - for link in self.links.values() { + for link in self.links.values() { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, &link.transport); + } + if let Ok(msg) = self.make_msg(&idxs) { if parameters(link) { tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg); if let Err(e) = link.transport.schedule(msg.clone()) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); } } + } else { + tracing::error!("Failed to encode Linkstate message"); } - } else { - tracing::error!("Failed to encode Linkstate message"); } } @@ -318,8 +327,10 @@ impl Network { // from the given node. // Returns true if gossip is enabled and if multihop gossip is enabled or // the node is one of self neighbours. - fn propagate_locators(&self, idx: NodeIndex) -> bool { + fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool { + let target_whatami = target.get_whatami().unwrap_or_default(); self.gossip + && self.gossip_target.matches(target_whatami) && (self.gossip_multihop || idx == self.idx || self.links.values().any(|link| { @@ -491,8 +502,8 @@ impl Network { idx, Details { zid: true, - locators: true, links: false, + ..Default::default() }, )], |link| link.zid != zid, @@ -657,20 +668,20 @@ impl Network { Vec<(Vec, NodeIndex, bool)>, Vec<(Vec, NodeIndex, bool)>, ) = link_states.into_iter().partition(|(_, _, new)| *new); - let new_idxs = new_idxs - .into_iter() - .map(|(_, idx1, _new_node)| { - ( - idx1, - Details { - zid: true, - locators: self.propagate_locators(idx1), - links: true, - }, - ) - }) - .collect::>(); for link in self.links.values() { + let new_idxs = new_idxs + .iter() + .map(|(_, idx1, _new_node)| { + ( + *idx1, + Details { + zid: true, + links: true, + ..Default::default() + }, + ) + }) + .collect::>(); if link.zid != src { let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs .clone() @@ -681,8 +692,8 @@ impl Network { idx1, Details { zid: false, - locators: self.propagate_locators(idx1), links: true, + ..Default::default() }, )) } else { @@ -761,16 +772,16 @@ impl Network { idx, Details { zid: true, - locators: false, links: false, + ..Default::default() }, ), ( self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, ), ] @@ -779,8 +790,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, )] }, @@ -806,11 +817,11 @@ impl Network { idx, Details { zid: true, - locators: self.propagate_locators(idx), links: self.full_linkstate || (self.router_peers_failover_brokering && idx == self.idx && whatami == WhatAmI::Router), + ..Default::default() }, ) }) @@ -840,8 +851,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |_| true, @@ -858,8 +869,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |link| { diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 5815ff1df8..b44801919a 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -312,6 +312,7 @@ impl HatBaseTrait for HatCode { let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); + let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami)); let autoconnect = if gossip { *unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami)) } else { @@ -334,6 +335,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } @@ -346,6 +348,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } diff --git a/zenoh/src/net/routing/hat/router/network.rs b/zenoh/src/net/routing/hat/router/network.rs index 3bfdde49d1..103332955b 100644 --- a/zenoh/src/net/routing/hat/router/network.rs +++ b/zenoh/src/net/routing/hat/router/network.rs @@ -39,7 +39,7 @@ use crate::net::{ runtime::Runtime, }; -#[derive(Clone)] +#[derive(Clone, Default)] struct Details { zid: bool, locators: bool, @@ -119,6 +119,7 @@ pub(super) struct Network { pub(super) router_peers_failover_brokering: bool, pub(super) gossip: bool, pub(super) gossip_multihop: bool, + pub(super) gossip_target: WhatAmIMatcher, pub(super) autoconnect: WhatAmIMatcher, pub(super) idx: NodeIndex, pub(super) links: VecMap, @@ -138,6 +139,7 @@ impl Network { router_peers_failover_brokering: bool, gossip: bool, gossip_multihop: bool, + gossip_target: WhatAmIMatcher, autoconnect: WhatAmIMatcher, ) -> Self { let mut graph = petgraph::stable_graph::StableGraph::default(); @@ -155,6 +157,7 @@ impl Network { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, idx, links: VecMap::new(), @@ -230,7 +233,7 @@ impl Network { idx } - fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState { + fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState { let links = if details.links { self.graph[idx] .links @@ -273,10 +276,10 @@ impl Network { } } - fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result { + fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result { let mut link_states = vec![]; for (idx, details) in idxs { - link_states.push(self.make_link_state(idx, details)); + link_states.push(self.make_link_state(*idx, details)); } let codec = Zenoh080Routing::new(); let mut buf = ZBuf::empty(); @@ -290,8 +293,11 @@ impl Network { .into()) } - fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { - if let Ok(msg) = self.make_msg(idxs) { + fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, transport); + } + if let Ok(msg) = self.make_msg(&idxs) { tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); if let Err(e) = transport.schedule(msg) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); @@ -301,21 +307,24 @@ impl Network { } } - fn send_on_links

(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P) + fn send_on_links

(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P) where P: FnMut(&Link) -> bool, { - if let Ok(msg) = self.make_msg(idxs) { - for link in self.links.values() { + for link in self.links.values() { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, &link.transport); + } + if let Ok(msg) = self.make_msg(&idxs) { if parameters(link) { tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg); if let Err(e) = link.transport.schedule(msg.clone()) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); } } + } else { + tracing::error!("Failed to encode Linkstate message"); } - } else { - tracing::error!("Failed to encode Linkstate message"); } } @@ -323,8 +332,10 @@ impl Network { // from the given node. // Returns true if gossip is enabled and if multihop gossip is enabled or // the node is one of self neighbours. - fn propagate_locators(&self, idx: NodeIndex) -> bool { + fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool { + let target_whatami = target.get_whatami().unwrap_or_default(); self.gossip + && self.gossip_target.matches(target_whatami) && (self.gossip_multihop || idx == self.idx || self.links.values().any(|link| { @@ -495,8 +506,8 @@ impl Network { idx, Details { zid: true, - locators: true, links: false, + ..Default::default() }, )], |link| link.zid != zid, @@ -661,20 +672,20 @@ impl Network { Vec<(Vec, NodeIndex, bool)>, Vec<(Vec, NodeIndex, bool)>, ) = link_states.into_iter().partition(|(_, _, new)| *new); - let new_idxs = new_idxs - .into_iter() - .map(|(_, idx1, _new_node)| { - ( - idx1, - Details { - zid: true, - locators: self.propagate_locators(idx1), - links: true, - }, - ) - }) - .collect::>(); for link in self.links.values() { + let new_idxs = new_idxs + .iter() + .map(|(_, idx1, _new_node)| { + ( + *idx1, + Details { + zid: true, + links: true, + ..Default::default() + }, + ) + }) + .collect::>(); if link.zid != src { let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs .clone() @@ -685,8 +696,8 @@ impl Network { idx1, Details { zid: false, - locators: self.propagate_locators(idx1), links: true, + ..Default::default() }, )) } else { @@ -765,16 +776,16 @@ impl Network { idx, Details { zid: true, - locators: false, links: false, + ..Default::default() }, ), ( self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, ), ] @@ -783,8 +794,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, )] }, @@ -810,11 +821,11 @@ impl Network { idx, Details { zid: true, - locators: self.propagate_locators(idx), links: self.full_linkstate || (self.router_peers_failover_brokering && idx == self.idx && whatami == WhatAmI::Router), + ..Default::default() }, ) }) @@ -844,8 +855,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |_| true, @@ -862,8 +873,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |link| {