Skip to content

Commit

Permalink
Forbid multicast endpoint in connect list
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Sep 14, 2023
1 parent b143371 commit 4ce2bd8
Showing 1 changed file with 50 additions and 88 deletions.
138 changes: 50 additions & 88 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ impl Runtime {
self.bind_listeners(&listeners).await?;

for peer in peers {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}

if scouting {
Expand Down Expand Up @@ -188,8 +187,7 @@ impl Runtime {
self.bind_listeners(&listeners).await?;

for peer in peers {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}

if scouting {
Expand Down Expand Up @@ -283,8 +281,7 @@ impl Runtime {
}
false
}) {
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
self.spawn_peer_connector(peer).await?;
}
}
}
Expand Down Expand Up @@ -447,98 +444,63 @@ impl Runtime {
Ok(std::net::UdpSocket::from(socket).into())
}

async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> {
if !LocatorInspector::default()
.is_multicast(&peer.to_locator())
.await?
{
let this = self.clone();
self.spawn(async move { this.peer_connector(peer).await });
Ok(())
} else {
bail!("Forbidden multicast endpoint in connect list!")
}
}

async fn peer_connector(&self, peer: EndPoint) {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
loop {
log::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
if let Ok(is_mcast) = LocatorInspector::default()
.is_multicast(&endpoint.to_locator())
match self
.manager()
.open_transport_unicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
if is_mcast {
match self
.manager()
.open_transport_multicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>(
) {
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
} else {
match self
.manager()
.open_transport_unicast(endpoint)
.timeout(CONNECTION_TIMEOUT)
.await
{
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>(
) {
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>()
{
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
}
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
}
}
Expand Down

0 comments on commit 4ce2bd8

Please sign in to comment.