diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index a2731b952..603653f2b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -151,6 +151,11 @@ /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have /// direct connectivity with each other. multihop: false, + /// Which type of Zenoh instances to send gossip messages to. + /// Accepts a single value (e.g. target: ["router", "peer"]) which applies whatever the configured "mode" is, + /// or different values for router, peer or client mode (e.g. target: { router: ["router", "peer"], peer: ["router"] }). + /// Each value is a list of: "peer", "router" and/or "client". + target: { router: ["router", "peer"], peer: ["router", "peer"]}, /// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip. /// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is, /// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }). diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 195a08a07..eb000923e 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -95,6 +95,15 @@ pub mod scouting { pub mod gossip { pub const enabled: bool = true; pub const multihop: bool = false; + pub mod target { + pub const router: &crate::WhatAmIMatcher = // "router|peer" + &crate::WhatAmIMatcher::empty().router().peer(); + pub const peer: &crate::WhatAmIMatcher = // "router|peer" + &crate::WhatAmIMatcher::empty().router().peer(); + pub const client: &crate::WhatAmIMatcher = // "" + &crate::WhatAmIMatcher::empty(); + mode_accessor!(crate::WhatAmIMatcher); + } pub mod autoconnect { pub const router: &crate::WhatAmIMatcher = // "" &crate::WhatAmIMatcher::empty(); diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 30bd9a7c8..a00755c87 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -315,6 +315,8 @@ validated_struct::validator! { /// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have /// direct connectivity with each other. multihop: Option, + /// Which type of Zenoh instances to send gossip messages to. + target: Option>, /// Which type of Zenoh instances to automatically establish sessions with upon discovery through gossip. autoconnect: Option>, }, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index c07ecbae0..5644f0be0 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -185,6 +185,7 @@ impl HatBaseTrait for HatCode { let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); + let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami)); let autoconnect = if gossip { *unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami)) } else { @@ -205,6 +206,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/network.rs b/zenoh/src/net/routing/hat/linkstate_peer/network.rs index bfa7ccf96..9eca0d100 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/network.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/network.rs @@ -39,7 +39,7 @@ use crate::net::{ runtime::{Runtime, WeakRuntime}, }; -#[derive(Clone)] +#[derive(Clone, Default)] struct Details { zid: bool, locators: bool, @@ -119,6 +119,7 @@ pub(super) struct Network { pub(super) router_peers_failover_brokering: bool, pub(super) gossip: bool, pub(super) gossip_multihop: bool, + pub(super) gossip_target: WhatAmIMatcher, pub(super) autoconnect: WhatAmIMatcher, pub(super) idx: NodeIndex, pub(super) links: VecMap, @@ -138,6 +139,7 @@ impl Network { router_peers_failover_brokering: bool, gossip: bool, gossip_multihop: bool, + gossip_target: WhatAmIMatcher, autoconnect: WhatAmIMatcher, ) -> Self { let mut graph = petgraph::stable_graph::StableGraph::default(); @@ -155,6 +157,7 @@ impl Network { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, idx, links: VecMap::new(), @@ -225,7 +228,7 @@ impl Network { idx } - fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState { + fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState { let links = if details.links { self.graph[idx] .links @@ -268,10 +271,10 @@ impl Network { } } - fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result { + fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result { let mut link_states = vec![]; for (idx, details) in idxs { - link_states.push(self.make_link_state(idx, details)); + link_states.push(self.make_link_state(*idx, details)); } let codec = Zenoh080Routing::new(); let mut buf = ZBuf::empty(); @@ -285,8 +288,11 @@ impl Network { .into()) } - fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { - if let Ok(msg) = self.make_msg(idxs) { + fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, transport); + } + if let Ok(msg) = self.make_msg(&idxs) { tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); if let Err(e) = transport.schedule(msg) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); @@ -296,21 +302,24 @@ impl Network { } } - fn send_on_links

(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P) + fn send_on_links

(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P) where P: FnMut(&Link) -> bool, { - if let Ok(msg) = self.make_msg(idxs) { - for link in self.links.values() { + for link in self.links.values() { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, &link.transport); + } + if let Ok(msg) = self.make_msg(&idxs) { if parameters(link) { tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg); if let Err(e) = link.transport.schedule(msg.clone()) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); } } + } else { + tracing::error!("Failed to encode Linkstate message"); } - } else { - tracing::error!("Failed to encode Linkstate message"); } } @@ -318,8 +327,10 @@ impl Network { // from the given node. // Returns true if gossip is enabled and if multihop gossip is enabled or // the node is one of self neighbours. - fn propagate_locators(&self, idx: NodeIndex) -> bool { + fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool { + let target_whatami = target.get_whatami().unwrap_or_default(); self.gossip + && self.gossip_target.matches(target_whatami) && (self.gossip_multihop || idx == self.idx || self.links.values().any(|link| { @@ -491,8 +502,8 @@ impl Network { idx, Details { zid: true, - locators: true, links: false, + ..Default::default() }, )], |link| link.zid != zid, @@ -657,20 +668,20 @@ impl Network { Vec<(Vec, NodeIndex, bool)>, Vec<(Vec, NodeIndex, bool)>, ) = link_states.into_iter().partition(|(_, _, new)| *new); - let new_idxs = new_idxs - .into_iter() - .map(|(_, idx1, _new_node)| { - ( - idx1, - Details { - zid: true, - locators: self.propagate_locators(idx1), - links: true, - }, - ) - }) - .collect::>(); for link in self.links.values() { + let new_idxs = new_idxs + .iter() + .map(|(_, idx1, _new_node)| { + ( + *idx1, + Details { + zid: true, + links: true, + ..Default::default() + }, + ) + }) + .collect::>(); if link.zid != src { let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs .clone() @@ -681,8 +692,8 @@ impl Network { idx1, Details { zid: false, - locators: self.propagate_locators(idx1), links: true, + ..Default::default() }, )) } else { @@ -761,16 +772,16 @@ impl Network { idx, Details { zid: true, - locators: false, links: false, + ..Default::default() }, ), ( self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, ), ] @@ -779,8 +790,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, )] }, @@ -806,11 +817,11 @@ impl Network { idx, Details { zid: true, - locators: self.propagate_locators(idx), links: self.full_linkstate || (self.router_peers_failover_brokering && idx == self.idx && whatami == WhatAmI::Router), + ..Default::default() }, ) }) @@ -840,8 +851,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |_| true, @@ -858,8 +869,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |link| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index b9155b44b..b0b382eef 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -96,6 +96,7 @@ pub(super) struct Network { pub(super) router_peers_failover_brokering: bool, pub(super) gossip: bool, pub(super) gossip_multihop: bool, + pub(super) gossip_target: WhatAmIMatcher, pub(super) autoconnect: WhatAmIMatcher, pub(super) wait_declares: bool, pub(super) idx: NodeIndex, @@ -113,6 +114,7 @@ impl Network { router_peers_failover_brokering: bool, gossip: bool, gossip_multihop: bool, + gossip_target: WhatAmIMatcher, autoconnect: WhatAmIMatcher, wait_declares: bool, ) -> Self { @@ -130,6 +132,7 @@ impl Network { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, wait_declares, idx, @@ -231,13 +234,18 @@ impl Network { } fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { - if let Ok(msg) = self.make_msg(idxs) { - tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); - if let Err(e) = transport.schedule(msg) { - tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); + if transport + .get_whatami() + .is_ok_and(|w| self.gossip_target.matches(w)) + { + if let Ok(msg) = self.make_msg(idxs) { + tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); + if let Err(e) = transport.schedule(msg) { + tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); + } + } else { + tracing::error!("Failed to encode Linkstate message"); } - } else { - tracing::error!("Failed to encode Linkstate message"); } } @@ -247,7 +255,12 @@ impl Network { { if let Ok(msg) = self.make_msg(idxs) { for link in self.links.values() { - if parameters(link) { + if link + .transport + .get_whatami() + .is_ok_and(|w| self.gossip_target.matches(w)) + && parameters(link) + { tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg); if let Err(e) = link.transport.schedule(msg.clone()) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index f50e57ba9..429231f3c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -115,6 +115,7 @@ impl HatBaseTrait for HatCode { let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); + let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami)); let autoconnect = if gossip { *unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami)) } else { @@ -133,6 +134,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, wait_declares, )); diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 5815ff1df..b44801919 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -312,6 +312,7 @@ impl HatBaseTrait for HatCode { let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); + let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami)); let autoconnect = if gossip { *unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami)) } else { @@ -334,6 +335,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } @@ -346,6 +348,7 @@ impl HatBaseTrait for HatCode { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, )); } diff --git a/zenoh/src/net/routing/hat/router/network.rs b/zenoh/src/net/routing/hat/router/network.rs index 3bfdde49d..103332955 100644 --- a/zenoh/src/net/routing/hat/router/network.rs +++ b/zenoh/src/net/routing/hat/router/network.rs @@ -39,7 +39,7 @@ use crate::net::{ runtime::Runtime, }; -#[derive(Clone)] +#[derive(Clone, Default)] struct Details { zid: bool, locators: bool, @@ -119,6 +119,7 @@ pub(super) struct Network { pub(super) router_peers_failover_brokering: bool, pub(super) gossip: bool, pub(super) gossip_multihop: bool, + pub(super) gossip_target: WhatAmIMatcher, pub(super) autoconnect: WhatAmIMatcher, pub(super) idx: NodeIndex, pub(super) links: VecMap, @@ -138,6 +139,7 @@ impl Network { router_peers_failover_brokering: bool, gossip: bool, gossip_multihop: bool, + gossip_target: WhatAmIMatcher, autoconnect: WhatAmIMatcher, ) -> Self { let mut graph = petgraph::stable_graph::StableGraph::default(); @@ -155,6 +157,7 @@ impl Network { router_peers_failover_brokering, gossip, gossip_multihop, + gossip_target, autoconnect, idx, links: VecMap::new(), @@ -230,7 +233,7 @@ impl Network { idx } - fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState { + fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState { let links = if details.links { self.graph[idx] .links @@ -273,10 +276,10 @@ impl Network { } } - fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result { + fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result { let mut link_states = vec![]; for (idx, details) in idxs { - link_states.push(self.make_link_state(idx, details)); + link_states.push(self.make_link_state(*idx, details)); } let codec = Zenoh080Routing::new(); let mut buf = ZBuf::empty(); @@ -290,8 +293,11 @@ impl Network { .into()) } - fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { - if let Ok(msg) = self.make_msg(idxs) { + fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, transport); + } + if let Ok(msg) = self.make_msg(&idxs) { tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg); if let Err(e) = transport.schedule(msg) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); @@ -301,21 +307,24 @@ impl Network { } } - fn send_on_links

(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P) + fn send_on_links

(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P) where P: FnMut(&Link) -> bool, { - if let Ok(msg) = self.make_msg(idxs) { - for link in self.links.values() { + for link in self.links.values() { + for idx in &mut idxs { + idx.1.locators = self.propagate_locators(idx.0, &link.transport); + } + if let Ok(msg) = self.make_msg(&idxs) { if parameters(link) { tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg); if let Err(e) = link.transport.schedule(msg.clone()) { tracing::debug!("{} Error sending LinkStateList: {}", self.name, e); } } + } else { + tracing::error!("Failed to encode Linkstate message"); } - } else { - tracing::error!("Failed to encode Linkstate message"); } } @@ -323,8 +332,10 @@ impl Network { // from the given node. // Returns true if gossip is enabled and if multihop gossip is enabled or // the node is one of self neighbours. - fn propagate_locators(&self, idx: NodeIndex) -> bool { + fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool { + let target_whatami = target.get_whatami().unwrap_or_default(); self.gossip + && self.gossip_target.matches(target_whatami) && (self.gossip_multihop || idx == self.idx || self.links.values().any(|link| { @@ -495,8 +506,8 @@ impl Network { idx, Details { zid: true, - locators: true, links: false, + ..Default::default() }, )], |link| link.zid != zid, @@ -661,20 +672,20 @@ impl Network { Vec<(Vec, NodeIndex, bool)>, Vec<(Vec, NodeIndex, bool)>, ) = link_states.into_iter().partition(|(_, _, new)| *new); - let new_idxs = new_idxs - .into_iter() - .map(|(_, idx1, _new_node)| { - ( - idx1, - Details { - zid: true, - locators: self.propagate_locators(idx1), - links: true, - }, - ) - }) - .collect::>(); for link in self.links.values() { + let new_idxs = new_idxs + .iter() + .map(|(_, idx1, _new_node)| { + ( + *idx1, + Details { + zid: true, + links: true, + ..Default::default() + }, + ) + }) + .collect::>(); if link.zid != src { let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs .clone() @@ -685,8 +696,8 @@ impl Network { idx1, Details { zid: false, - locators: self.propagate_locators(idx1), links: true, + ..Default::default() }, )) } else { @@ -765,16 +776,16 @@ impl Network { idx, Details { zid: true, - locators: false, links: false, + ..Default::default() }, ), ( self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, ), ] @@ -783,8 +794,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.propagate_locators(idx), links: true, + ..Default::default() }, )] }, @@ -810,11 +821,11 @@ impl Network { idx, Details { zid: true, - locators: self.propagate_locators(idx), links: self.full_linkstate || (self.router_peers_failover_brokering && idx == self.idx && whatami == WhatAmI::Router), + ..Default::default() }, ) }) @@ -844,8 +855,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |_| true, @@ -862,8 +873,8 @@ impl Network { self.idx, Details { zid: false, - locators: self.gossip, links: true, + ..Default::default() }, )], |link| {