diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 3aef55b4fe..ccd2e68f6a 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -143,8 +143,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; for peer in peers { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } if scouting { @@ -188,8 +187,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; for peer in peers { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } if scouting { @@ -283,8 +281,7 @@ impl Runtime { } false }) { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } } } @@ -447,98 +444,63 @@ impl Runtime { Ok(std::net::UdpSocket::from(socket).into()) } + async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> { + if !LocatorInspector::default() + .is_multicast(&peer.to_locator()) + .await? + { + let this = self.clone(); + self.spawn(async move { this.peer_connector(peer).await }); + Ok(()) + } else { + bail!("Forbidden multicast endpoint in connect list!") + } + } + async fn peer_connector(&self, peer: EndPoint) { let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; loop { log::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); - if let Ok(is_mcast) = LocatorInspector::default() - .is_multicast(&endpoint.to_locator()) + match self + .manager() + .open_transport_unicast(endpoint) + .timeout(CONNECTION_TIMEOUT) .await { - if is_mcast { - match self - .manager() - .open_transport_multicast(endpoint) - .timeout(CONNECTION_TIMEOUT) - .await - { - Ok(Ok(transport)) => { - log::debug!("Successfully connected to configured peer {}", peer); - if let Ok(Some(orch_transport)) = transport.get_callback() { - if let Some(orch_transport) = orch_transport - .as_any() - .downcast_ref::( - ) { - *zwrite!(orch_transport.endpoint) = Some(peer); - } - } - break; - } - Ok(Err(e)) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - Err(e) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - } - async_std::task::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } - } else { - match self - .manager() - .open_transport_unicast(endpoint) - .timeout(CONNECTION_TIMEOUT) - .await - { - Ok(Ok(transport)) => { - log::debug!("Successfully connected to configured peer {}", peer); - if let Ok(Some(orch_transport)) = transport.get_callback() { - if let Some(orch_transport) = orch_transport - .as_any() - .downcast_ref::( - ) { - *zwrite!(orch_transport.endpoint) = Some(peer); - } - } - break; - } - Ok(Err(e)) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - Err(e) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); + Ok(Ok(transport)) => { + log::debug!("Successfully connected to configured peer {}", peer); + if let Ok(Some(orch_transport)) = transport.get_callback() { + if let Some(orch_transport) = orch_transport + .as_any() + .downcast_ref::() + { + *zwrite!(orch_transport.endpoint) = Some(peer); } } - async_std::task::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + break; } + Ok(Err(e)) => { + log::debug!( + "Unable to connect to configured peer {}! {}. Retry in {:?}.", + peer, + e, + delay + ); + } + Err(e) => { + log::debug!( + "Unable to connect to configured peer {}! {}. Retry in {:?}.", + peer, + e, + delay + ); + } + } + async_std::task::sleep(delay).await; + delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; + if delay > CONNECTION_RETRY_MAX_PERIOD { + delay = CONNECTION_RETRY_MAX_PERIOD; } } } diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index eab7070efa..c94cb36510 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -57,13 +57,13 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, Session) { // Open the sessions let mut config = config::peer(); - config.connect.endpoints = vec![endpoint01.parse().unwrap()]; + config.listen.endpoints = vec![endpoint01.parse().unwrap()]; config.scouting.multicast.set_enabled(Some(true)).unwrap(); println!("[ ][01a] Opening peer01 session: {}", endpoint01); let peer01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); let mut config = config::peer(); - config.connect.endpoints = vec![endpoint02.parse().unwrap()]; + config.listen.endpoints = vec![endpoint02.parse().unwrap()]; config.scouting.multicast.set_enabled(Some(true)).unwrap(); println!("[ ][02a] Opening peer02 session: {}", endpoint02); let peer02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); @@ -203,7 +203,7 @@ fn zenoh_session_multicast() { let _ = env_logger::try_init(); let (peer01, peer02) = - open_session_multicast("udp/224.0.0.1:17448", "udp/224.0.0.1:17449").await; + open_session_multicast("udp/224.0.0.1:17448", "udp/224.0.0.1:17448").await; test_session_pubsub(&peer01, &peer02, Reliability::BestEffort).await; close_session(peer01, peer02).await; });