Skip to content

Commit

Permalink
Forbid multicast endpoint in connect list (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Sep 20, 2023
1 parent d2ed08d commit a8cd82c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 91 deletions.
138 changes: 50 additions & 88 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ impl Runtime {
self.bind_listeners(&listeners).await?;

for peer in peers {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}

if scouting {
Expand Down Expand Up @@ -188,8 +187,7 @@ impl Runtime {
self.bind_listeners(&listeners).await?;

for peer in peers {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}

if scouting {
Expand Down Expand Up @@ -283,8 +281,7 @@ impl Runtime {
}
false
}) {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}
}
}
Expand Down Expand Up @@ -447,98 +444,63 @@ impl Runtime {
Ok(std::net::UdpSocket::from(socket).into())
}

async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> {
if !LocatorInspector::default()
.is_multicast(&peer.to_locator())
.await?
{
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
Ok(())
} else {
bail!("Forbidden multicast endpoint in connect list!")
}
}

async fn peer_connector(&self, peer: EndPoint) {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
loop {
log::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
if let Ok(is_mcast) = LocatorInspector::default()
.is_multicast(&endpoint.to_locator())
match self
.manager()
.open_transport_unicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
if is_mcast {
match self
.manager()
.open_transport_multicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>(
) {
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
} else {
match self
.manager()
.open_transport_unicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>(
) {
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>()
{
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions zenoh/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) {
async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, Session) {
// Open the sessions
let mut config = config::peer();
config.connect.endpoints = vec![endpoint01.parse().unwrap()];
config.listen.endpoints = vec![endpoint01.parse().unwrap()];
config.scouting.multicast.set_enabled(Some(true)).unwrap();
println!("[ ][01a] Opening peer01 session: {}", endpoint01);
let peer01 = ztimeout!(zenoh::open(config).res_async()).unwrap();

let mut config = config::peer();
config.connect.endpoints = vec![endpoint02.parse().unwrap()];
config.listen.endpoints = vec![endpoint02.parse().unwrap()];
config.scouting.multicast.set_enabled(Some(true)).unwrap();
println!("[ ][02a] Opening peer02 session: {}", endpoint02);
let peer02 = ztimeout!(zenoh::open(config).res_async()).unwrap();
Expand Down Expand Up @@ -203,7 +203,7 @@ fn zenoh_session_multicast() {
let _ = env_logger::try_init();

let (peer01, peer02) =
open_session_multicast("udp/224.0.0.1:17448", "udp/224.0.0.1:17449").await;
open_session_multicast("udp/224.0.0.1:17448", "udp/224.0.0.1:17448").await;
test_session_pubsub(&peer01, &peer02, Reliability::BestEffort).await;
close_session(peer01, peer02).await;
});
Expand Down

0 comments on commit a8cd82c

Please sign in to comment.