From 199219fce967c1c1a439cb15e7ba23bd42f4e672 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 27 Feb 2024 19:42:14 +0100 Subject: [PATCH] [skip ci] Start draft implementation --- commons/zenoh-config/src/lib.rs | 10 +++++ zenoh/src/net/runtime/orchestrator.rs | 59 +++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index eba3a4aa55..c5ab959f4d 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -15,6 +15,7 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; +use core::time; use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{ @@ -97,6 +98,15 @@ pub struct DownsamplingItemConf { pub flow: DownsamplingFlow, } +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ConnectionRetryConf { + //TODO(sashacmc): add comments + // Do we need some skip/fail/etc.? + pub count: i32, + pub timeout_ms: time::Duration, + pub timeout_increase_factor: f32, +} + pub trait ConfigValidator: Send + Sync { fn check_config( &self, diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index a1a2c8db48..c79bd1b413 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -14,6 +14,7 @@ use super::{Runtime, RuntimeSession}; use async_std::net::UdpSocket; use async_std::prelude::FutureExt; +use core::time; use futures::prelude::*; use socket2::{Domain, Socket, Type}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; @@ -291,24 +292,67 @@ impl Runtime { Ok(()) } + fn get_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + let config = endpoint.config(); + let retry = config.get("TODO"); + + zenoh_config::ConnectionRetryConf { + count: 0, + timeout_ms: time::Duration::from_micros(0), + timeout_increase_factor: 1., + } + } + async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> { for listener in listeners { let endpoint = listener.clone(); - match self.manager().add_listener(endpoint).await { - Ok(listener) => log::debug!("Listener added: {}", listener), - Err(err) => { - log::error!("Unable to open listener {}: {}", listener, err); - return Err(err); - } + let retry_config = self.get_retry_config(&endpoint); + if retry_config.count > 0 { + self.spawn_add_listener(endpoint, retry_config).await? + } else { + self.add_listener(endpoint).await? + } + } + + self.print_locators(); + + Ok(()) + } + + async fn spawn_add_listener( + &self, + listener: EndPoint, + retry_config: zenoh_config::ConnectionRetryConf, + ) -> ZResult<()> { + let this = self.clone(); + self.spawn(async move { + // TODO(sashacmc): do retry + if this.add_listener(listener).await.is_ok() { + this.print_locators(); + } + }); + Ok(()) + } + + async fn add_listener(&self, listener: EndPoint) -> ZResult<()> { + let endpoint = listener.clone(); + match self.manager().add_listener(endpoint).await { + Ok(listener) => log::debug!("Listener added: {}", listener), + Err(err) => { + log::error!("Unable to open listener {}: {}", listener, err); + return Err(err); } } + Ok(()) + } + fn print_locators(&self) { let mut locators = self.state.locators.write().unwrap(); *locators = self.manager().get_locators(); for locator in &*locators { log::info!("Zenoh can be reached at: {}", locator); } - Ok(()) } pub fn get_interfaces(names: &str) -> Vec { @@ -499,6 +543,7 @@ impl Runtime { ); } } + //TODO(sashacmc): rework async_std::task::sleep(delay).await; delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; if delay > CONNECTION_RETRY_MAX_PERIOD {