diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 3aef55b4fe..ccd2e68f6a 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -143,8 +143,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; for peer in peers { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } if scouting { @@ -188,8 +187,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; for peer in peers { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } if scouting { @@ -283,8 +281,7 @@ impl Runtime { } false }) { - let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn_peer_connector(peer).await?; } } } @@ -447,98 +444,63 @@ impl Runtime { Ok(std::net::UdpSocket::from(socket).into()) } + async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> { + if !LocatorInspector::default() + .is_multicast(&peer.to_locator()) + .await? + { + let this = self.clone(); + self.spawn(async move { this.peer_connector(peer).await }); + Ok(()) + } else { + bail!("Forbidden multicast endpoint in connect list!") + } + } + async fn peer_connector(&self, peer: EndPoint) { let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; loop { log::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); - if let Ok(is_mcast) = LocatorInspector::default() - .is_multicast(&endpoint.to_locator()) + match self + .manager() + .open_transport_unicast(endpoint) + .timeout(CONNECTION_TIMEOUT) .await { - if is_mcast { - match self - .manager() - .open_transport_multicast(endpoint) - .timeout(CONNECTION_TIMEOUT) - .await - { - Ok(Ok(transport)) => { - log::debug!("Successfully connected to configured peer {}", peer); - if let Ok(Some(orch_transport)) = transport.get_callback() { - if let Some(orch_transport) = orch_transport - .as_any() - .downcast_ref::( - ) { - *zwrite!(orch_transport.endpoint) = Some(peer); - } - } - break; - } - Ok(Err(e)) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - Err(e) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - } - async_std::task::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } - } else { - match self - .manager() - .open_transport_unicast(endpoint) - .timeout(CONNECTION_TIMEOUT) - .await - { - Ok(Ok(transport)) => { - log::debug!("Successfully connected to configured peer {}", peer); - if let Ok(Some(orch_transport)) = transport.get_callback() { - if let Some(orch_transport) = orch_transport - .as_any() - .downcast_ref::( - ) { - *zwrite!(orch_transport.endpoint) = Some(peer); - } - } - break; - } - Ok(Err(e)) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); - } - Err(e) => { - log::debug!( - "Unable to connect to configured peer {}! {}. Retry in {:?}.", - peer, - e, - delay - ); + Ok(Ok(transport)) => { + log::debug!("Successfully connected to configured peer {}", peer); + if let Ok(Some(orch_transport)) = transport.get_callback() { + if let Some(orch_transport) = orch_transport + .as_any() + .downcast_ref::() + { + *zwrite!(orch_transport.endpoint) = Some(peer); } } - async_std::task::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + break; } + Ok(Err(e)) => { + log::debug!( + "Unable to connect to configured peer {}! {}. Retry in {:?}.", + peer, + e, + delay + ); + } + Err(e) => { + log::debug!( + "Unable to connect to configured peer {}! {}. Retry in {:?}.", + peer, + e, + delay + ); + } + } + async_std::task::sleep(delay).await; + delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; + if delay > CONNECTION_RETRY_MAX_PERIOD { + delay = CONNECTION_RETRY_MAX_PERIOD; } } }