Skip to content

Commit

Permalink
Merge pull request #1615 from eclipse-zenoh/fix/multicast
Browse files Browse the repository at this point in the history
Fix message storm when using multiple peers with multicast
  • Loading branch information
Mallets authored Nov 29, 2024
2 parents 7e044ad + f2e3aa8 commit dee257f
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 61 deletions.
30 changes: 1 addition & 29 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub fn route_data(
reliability,
)
}
} else if tables.whatami == WhatAmI::Router {
} else {
let route = route
.values()
.filter(|(outface, _key_expr, _context)| {
Expand Down Expand Up @@ -481,34 +481,6 @@ pub fn route_data(
reliability,
)
}
} else {
drop(tables);
for (outface, key_expr, context) in route.values() {
if face.id != outface.id
&& match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
{
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, tx, user, msg.payload)
} else {
inc_stats!(face, tx, admin, msg.payload)
}

outface.primitives.send_push(
Push {
wire_expr: key_expr.into(),
ext_qos: msg.ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: msg.payload.clone(),
},
reliability,
)
}
}
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,8 @@ impl HatBaseTrait for HatCode {
_expr: &mut RoutingExpr,
) -> bool {
src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
&& out_face.mcast_group.is_none()
&& src_face.mcast_group.is_none()
}

fn info(&self, _tables: &Tables, _kind: WhatAmI) -> String {
Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,7 @@ impl HatBaseTrait for HatCode {
_expr: &mut RoutingExpr,
) -> bool {
src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
&& (out_face.mcast_group.is_none() || src_face.mcast_group.is_none())
}

fn info(&self, tables: &Tables, kind: WhatAmI) -> String {
Expand Down
6 changes: 2 additions & 4 deletions zenoh/src/net/routing/hat/p2p_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,8 @@ impl HatBaseTrait for HatCode {
_expr: &mut RoutingExpr,
) -> bool {
src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
&& (out_face.mcast_group.is_none()
|| (src_face.whatami == WhatAmI::Client && src_face.mcast_group.is_none()))
}

fn info(&self, _tables: &Tables, _kind: WhatAmI) -> String {
Expand Down
36 changes: 20 additions & 16 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,26 @@ fn declare_simple_subscription(
// This introduced a buffer overflow on windows
// TODO: Let's deactivate this on windows until Fixed
#[cfg(not(windows))]
for mcast_group in &tables.mcast_groups {
mcast_group
.primitives
.send_declare(RoutingContext::with_expr(
Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
id: 0, // @TODO use proper SubscriberId
wire_expr: res.expr().into(),
}),
},
res.expr(),
))
if face.whatami == WhatAmI::Client {
for mcast_group in &tables.mcast_groups {
if mcast_group.mcast_group != face.mcast_group {
mcast_group
.primitives
.send_declare(RoutingContext::with_expr(
Declare {
interest_id: None,
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareSubscriber(DeclareSubscriber {
id: 0, // @TODO use proper SubscriberId
wire_expr: res.expr().into(),
}),
},
res.expr(),
))
}
}
}
}

Expand Down
5 changes: 1 addition & 4 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,10 +789,7 @@ impl HatBaseTrait for HatCode {
expr: &mut RoutingExpr,
) -> bool {
if src_face.id != out_face.id
&& match (src_face.mcast_group.as_ref(), out_face.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
&& (out_face.mcast_group.is_none() || src_face.mcast_group.is_none())
{
let dst_master = out_face.whatami != WhatAmI::Peer
|| hat!(tables).linkstatepeers_net.is_none()
Expand Down

0 comments on commit dee257f

Please sign in to comment.