Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gossip_target configuration option #1678

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@
/// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
/// direct connectivity with each other.
multihop: false,
/// Which type of Zenoh instances to send gossip messages to.
/// Accepts a single value (e.g. target: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. target: { router: ["router", "peer"], peer: ["router"] }).
/// Each value is a list of: "peer", "router" and/or "client".
target: { router: ["router", "peer"], peer: ["router", "peer"]},
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
Expand Down
9 changes: 9 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ pub mod scouting {
pub mod gossip {
pub const enabled: bool = true;
pub const multihop: bool = false;
pub mod target {
pub const router: &crate::WhatAmIMatcher = // "router|peer"
&crate::WhatAmIMatcher::empty().router().peer();
pub const peer: &crate::WhatAmIMatcher = // "router|peer"
&crate::WhatAmIMatcher::empty().router().peer();
pub const client: &crate::WhatAmIMatcher = // ""
&crate::WhatAmIMatcher::empty();
mode_accessor!(crate::WhatAmIMatcher);
}
pub mod autoconnect {
pub const router: &crate::WhatAmIMatcher = // ""
&crate::WhatAmIMatcher::empty();
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ validated_struct::validator! {
/// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
/// direct connectivity with each other.
multihop: Option<bool>,
/// Which type of Zenoh instances to send gossip messages to.
target: Option<ModeDependentValue<WhatAmIMatcher>>,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery through gossip.
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
},
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ impl HatBaseTrait for HatCode {
let whatami = tables.whatami;
let gossip = unwrap_or_default!(config.scouting().gossip().enabled());
let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop());
let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami));
let autoconnect = if gossip {
*unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami))
} else {
Expand All @@ -205,6 +206,7 @@ impl HatBaseTrait for HatCode {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
));
}
Expand Down
77 changes: 44 additions & 33 deletions zenoh/src/net/routing/hat/linkstate_peer/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::net::{
runtime::{Runtime, WeakRuntime},
};

#[derive(Clone)]
#[derive(Clone, Default)]
struct Details {
zid: bool,
locators: bool,
Expand Down Expand Up @@ -119,6 +119,7 @@ pub(super) struct Network {
pub(super) router_peers_failover_brokering: bool,
pub(super) gossip: bool,
pub(super) gossip_multihop: bool,
pub(super) gossip_target: WhatAmIMatcher,
pub(super) autoconnect: WhatAmIMatcher,
pub(super) idx: NodeIndex,
pub(super) links: VecMap<Link>,
Expand All @@ -138,6 +139,7 @@ impl Network {
router_peers_failover_brokering: bool,
gossip: bool,
gossip_multihop: bool,
gossip_target: WhatAmIMatcher,
autoconnect: WhatAmIMatcher,
) -> Self {
let mut graph = petgraph::stable_graph::StableGraph::default();
Expand All @@ -155,6 +157,7 @@ impl Network {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
idx,
links: VecMap::new(),
Expand Down Expand Up @@ -225,7 +228,7 @@ impl Network {
idx
}

fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState {
fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState {
let links = if details.links {
self.graph[idx]
.links
Expand Down Expand Up @@ -268,10 +271,10 @@ impl Network {
}
}

fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result<NetworkMessage, DidntWrite> {
fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result<NetworkMessage, DidntWrite> {
let mut link_states = vec![];
for (idx, details) in idxs {
link_states.push(self.make_link_state(idx, details));
link_states.push(self.make_link_state(*idx, details));
}
let codec = Zenoh080Routing::new();
let mut buf = ZBuf::empty();
Expand All @@ -285,8 +288,11 @@ impl Network {
.into())
}

fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
if let Ok(msg) = self.make_msg(idxs) {
fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
for idx in &mut idxs {
idx.1.locators = self.propagate_locators(idx.0, transport);
}
if let Ok(msg) = self.make_msg(&idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
Expand All @@ -296,30 +302,35 @@ impl Network {
}
}

fn send_on_links<P>(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P)
fn send_on_links<P>(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P)
where
P: FnMut(&Link) -> bool,
{
if let Ok(msg) = self.make_msg(idxs) {
for link in self.links.values() {
for link in self.links.values() {
for idx in &mut idxs {
idx.1.locators = self.propagate_locators(idx.0, &link.transport);
}
if let Ok(msg) = self.make_msg(&idxs) {
if parameters(link) {
tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg);
if let Err(e) = link.transport.schedule(msg.clone()) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
}
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
}

// Indicates if locators should be included when propagating Linkstate message
// from the given node.
// Returns true if gossip is enabled and if multihop gossip is enabled or
// the node is one of self neighbours.
fn propagate_locators(&self, idx: NodeIndex) -> bool {
fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool {
let target_whatami = target.get_whatami().unwrap_or_default();
self.gossip
&& self.gossip_target.matches(target_whatami)
&& (self.gossip_multihop
|| idx == self.idx
|| self.links.values().any(|link| {
Expand Down Expand Up @@ -491,8 +502,8 @@ impl Network {
idx,
Details {
zid: true,
locators: true,
links: false,
..Default::default()
},
)],
|link| link.zid != zid,
Expand Down Expand Up @@ -657,20 +668,20 @@ impl Network {
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
) = link_states.into_iter().partition(|(_, _, new)| *new);
let new_idxs = new_idxs
.into_iter()
.map(|(_, idx1, _new_node)| {
(
idx1,
Details {
zid: true,
locators: self.propagate_locators(idx1),
links: true,
},
)
})
.collect::<Vec<(NodeIndex, Details)>>();
for link in self.links.values() {
let new_idxs = new_idxs
.iter()
.map(|(_, idx1, _new_node)| {
(
*idx1,
Details {
zid: true,
links: true,
..Default::default()
},
)
})
.collect::<Vec<(NodeIndex, Details)>>();
if link.zid != src {
let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs
.clone()
Expand All @@ -681,8 +692,8 @@ impl Network {
idx1,
Details {
zid: false,
locators: self.propagate_locators(idx1),
links: true,
..Default::default()
},
))
} else {
Expand Down Expand Up @@ -761,16 +772,16 @@ impl Network {
idx,
Details {
zid: true,
locators: false,
links: false,
..Default::default()
},
),
(
self.idx,
Details {
zid: false,
locators: self.propagate_locators(idx),
links: true,
..Default::default()
},
),
]
Expand All @@ -779,8 +790,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.propagate_locators(idx),
links: true,
..Default::default()
},
)]
},
Expand All @@ -806,11 +817,11 @@ impl Network {
idx,
Details {
zid: true,
locators: self.propagate_locators(idx),
links: self.full_linkstate
|| (self.router_peers_failover_brokering
&& idx == self.idx
&& whatami == WhatAmI::Router),
..Default::default()
},
)
})
Expand Down Expand Up @@ -840,8 +851,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.gossip,
links: true,
..Default::default()
},
)],
|_| true,
Expand All @@ -858,8 +869,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.gossip,
links: true,
..Default::default()
},
)],
|link| {
Expand Down
27 changes: 20 additions & 7 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(super) struct Network {
pub(super) router_peers_failover_brokering: bool,
pub(super) gossip: bool,
pub(super) gossip_multihop: bool,
pub(super) gossip_target: WhatAmIMatcher,
pub(super) autoconnect: WhatAmIMatcher,
pub(super) wait_declares: bool,
pub(super) idx: NodeIndex,
Expand All @@ -113,6 +114,7 @@ impl Network {
router_peers_failover_brokering: bool,
gossip: bool,
gossip_multihop: bool,
gossip_target: WhatAmIMatcher,
autoconnect: WhatAmIMatcher,
wait_declares: bool,
) -> Self {
Expand All @@ -130,6 +132,7 @@ impl Network {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
wait_declares,
idx,
Expand Down Expand Up @@ -231,13 +234,18 @@ impl Network {
}

fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
if let Ok(msg) = self.make_msg(idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
if transport
.get_whatami()
.is_ok_and(|w| self.gossip_target.matches(w))
{
if let Ok(msg) = self.make_msg(idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
}

Expand All @@ -247,7 +255,12 @@ impl Network {
{
if let Ok(msg) = self.make_msg(idxs) {
for link in self.links.values() {
if parameters(link) {
if link
.transport
.get_whatami()
.is_ok_and(|w| self.gossip_target.matches(w))
&& parameters(link)
{
tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg);
if let Err(e) = link.transport.schedule(msg.clone()) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ impl HatBaseTrait for HatCode {
let whatami = tables.whatami;
let gossip = unwrap_or_default!(config.scouting().gossip().enabled());
let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop());
let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami));
let autoconnect = if gossip {
*unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami))
} else {
Expand All @@ -133,6 +134,7 @@ impl HatBaseTrait for HatCode {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
wait_declares,
));
Expand Down
3 changes: 3 additions & 0 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ impl HatBaseTrait for HatCode {
let whatami = tables.whatami;
let gossip = unwrap_or_default!(config.scouting().gossip().enabled());
let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop());
let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami));
let autoconnect = if gossip {
*unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami))
} else {
Expand All @@ -334,6 +335,7 @@ impl HatBaseTrait for HatCode {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
));
}
Expand All @@ -346,6 +348,7 @@ impl HatBaseTrait for HatCode {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
));
}
Expand Down
Loading
Loading