diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 22f201604a..4c1fdbb4eb 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -212,13 +212,25 @@ impl Default for SharedMemoryConf { } } -impl Default for ConnectionRetryConf { - fn default() -> Self { +impl ConnectionRetryModeDependentConf { + pub fn default_connect() -> Self { + Self { + timeout_ms: ModeDependentValue::Unique(-1), + try_timeout_ms: ModeDependentValue::Unique(10_000), + period_init_ms: ModeDependentValue::Unique(1000), + period_max_ms: ModeDependentValue::Unique(4000), + period_increase_factor: ModeDependentValue::Unique(2.), + fail_exit: ModeDependentValue::Unique(false), + } + } + pub fn default_listen() -> Self { Self { - count: 0, - timeout_ms: 1000, - timeout_increase_factor: 1., - exit_on_fail: ModeDependentValue::Unique(true), + timeout_ms: ModeDependentValue::Unique(0), + try_timeout_ms: ModeDependentValue::Unique(10_000), + period_init_ms: ModeDependentValue::Unique(1000), + period_max_ms: ModeDependentValue::Unique(4000), + period_increase_factor: ModeDependentValue::Unique(2.), + fail_exit: ModeDependentValue::Unique(true), } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index ed26993cd0..c8a483b2e3 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -15,12 +15,10 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; + use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; -use serde::{ - de::{self, MapAccess, Visitor}, - Deserialize, Serialize, -}; +use serde::{Deserialize, Serialize}; use serde_json::Value; #[allow(unused_imports)] use std::convert::TryFrom; // This is a false positive from the rust analyser @@ -29,7 +27,6 @@ use std::{ collections::HashSet, fmt, io::Read, - marker::PhantomData, net::SocketAddr, path::Path, sync::{Arc, Mutex, MutexGuard, Weak}, @@ -47,6 +44,9 @@ use zenoh_protocol::{ use zenoh_result::{bail, zerror, ZResult}; use zenoh_util::LibLoader; +pub mod mode_dependend; +pub use mode_dependend::*; + // Wrappers for secrecy of values #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct SecretString(String); @@ -99,15 +99,57 @@ pub struct DownsamplingItemConf { #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ConnectionRetryConf { - //TODO(sashacmc): add comments, and infinit value for counter - // ModeDependentValue: - // 1) only for exit_on_fail - // 2) for all - // 3) for no one (we have connect and listen config, they already separate) - pub count: u32, - pub timeout_ms: u64, - pub timeout_increase_factor: f32, - pub exit_on_fail: ModeDependentValue, + pub timeout_ms: i64, + pub try_timeout_ms: i64, + pub period_init_ms: i64, + pub period_max_ms: i64, + pub period_increase_factor: f64, + pub fail_exit: bool, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ConnectionRetryModeDependentConf { + pub timeout_ms: ModeDependentValue, + pub try_timeout_ms: ModeDependentValue, + pub period_init_ms: ModeDependentValue, + pub period_max_ms: ModeDependentValue, + pub period_increase_factor: ModeDependentValue, + pub fail_exit: ModeDependentValue, +} + +impl ConnectionRetryModeDependentConf { + pub fn get( + &self, + whatami: WhatAmI, + default: ConnectionRetryModeDependentConf, + ) -> ConnectionRetryConf { + ConnectionRetryConf { + timeout_ms: *self + .timeout_ms + .get(whatami) + .unwrap_or(&default.timeout_ms.get(whatami).unwrap()), + try_timeout_ms: *self + .try_timeout_ms + .get(whatami) + .unwrap_or(&default.timeout_ms.get(whatami).unwrap()), + period_init_ms: *self + .period_init_ms + .get(whatami) + .unwrap_or(&default.period_init_ms.get(whatami).unwrap()), + period_max_ms: *self + .period_max_ms + .get(whatami) + .unwrap_or(&default.period_max_ms.get(whatami).unwrap()), + period_increase_factor: *self + .period_increase_factor + .get(whatami) + .unwrap_or(&default.period_increase_factor.get(whatami).unwrap()), + fail_exit: *self + .fail_exit + .get(whatami) + .unwrap_or(&default.fail_exit.get(whatami).unwrap()), + } + } } pub trait ConfigValidator: Send + Sync { @@ -192,13 +234,13 @@ validated_struct::validator! { pub connect: #[derive(Default)] ConnectConfig { pub endpoints: Vec, - pub retry: Option, + pub retry: Option, }, /// Which endpoints to listen on. `zenohd` will add `tcp/[::]:7447` to these locators if left empty. pub listen: #[derive(Default)] ListenConfig { pub endpoints: Vec, - pub retry: Option, + pub retry: Option, }, pub scouting: #[derive(Default)] ScoutingConf { @@ -1260,190 +1302,6 @@ impl validated_struct::ValidatedMap for PluginsConfig { } } -pub trait ModeDependent { - fn router(&self) -> Option<&T>; - fn peer(&self) -> Option<&T>; - fn client(&self) -> Option<&T>; - #[inline] - fn get(&self, whatami: WhatAmI) -> Option<&T> { - match whatami { - WhatAmI::Router => self.router(), - WhatAmI::Peer => self.peer(), - WhatAmI::Client => self.client(), - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ModeValues { - #[serde(skip_serializing_if = "Option::is_none")] - router: Option, - #[serde(skip_serializing_if = "Option::is_none")] - peer: Option, - #[serde(skip_serializing_if = "Option::is_none")] - client: Option, -} - -impl ModeDependent for ModeValues { - #[inline] - fn router(&self) -> Option<&T> { - self.router.as_ref() - } - - #[inline] - fn peer(&self) -> Option<&T> { - self.peer.as_ref() - } - - #[inline] - fn client(&self) -> Option<&T> { - self.client.as_ref() - } -} - -#[derive(Clone, Debug)] -pub enum ModeDependentValue { - Unique(T), - Dependent(ModeValues), -} - -impl ModeDependent for ModeDependentValue { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.router(), - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.peer(), - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.client(), - } - } -} - -impl serde::Serialize for ModeDependentValue -where - T: Serialize, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - ModeDependentValue::Unique(value) => value.serialize(serializer), - ModeDependentValue::Dependent(options) => options.serialize(serializer), - } - } -} -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("bool or mode dependent bool") - } - - fn visit_bool(self, value: bool) -> Result - where - E: de::Error, - { - Ok(ModeDependentValue::Unique(value)) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - WhatAmIMatcherVisitor {} - .visit_str(value) - .map(ModeDependentValue::Unique) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl ModeDependent for Option> { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.router(), - None => None, - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.peer(), - None => None, - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.client(), - None => None, - } - } -} - #[macro_export] macro_rules! unwrap_or_default { ($val:ident$(.$field:ident($($param:ident)?))*) => { diff --git a/commons/zenoh-config/src/mode_dependend.rs b/commons/zenoh-config/src/mode_dependend.rs new file mode 100644 index 0000000000..24c9095408 --- /dev/null +++ b/commons/zenoh-config/src/mode_dependend.rs @@ -0,0 +1,260 @@ +use serde::{ + de::{self, MapAccess, Visitor}, + Deserialize, Serialize, +}; +use std::fmt; +use std::marker::PhantomData; +pub use zenoh_protocol::core::{ + whatami, EndPoint, Locator, Priority, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, ZenohId, +}; + +pub trait ModeDependent { + fn router(&self) -> Option<&T>; + fn peer(&self) -> Option<&T>; + fn client(&self) -> Option<&T>; + #[inline] + fn get(&self, whatami: WhatAmI) -> Option<&T> { + match whatami { + WhatAmI::Router => self.router(), + WhatAmI::Peer => self.peer(), + WhatAmI::Client => self.client(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ModeValues { + #[serde(skip_serializing_if = "Option::is_none")] + router: Option, + #[serde(skip_serializing_if = "Option::is_none")] + peer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + client: Option, +} + +impl ModeDependent for ModeValues { + #[inline] + fn router(&self) -> Option<&T> { + self.router.as_ref() + } + + #[inline] + fn peer(&self) -> Option<&T> { + self.peer.as_ref() + } + + #[inline] + fn client(&self) -> Option<&T> { + self.client.as_ref() + } +} + +#[derive(Clone, Debug)] +pub enum ModeDependentValue { + Unique(T), + Dependent(ModeValues), +} + +impl ModeDependent for ModeDependentValue { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.router(), + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.peer(), + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.client(), + } + } +} + +impl serde::Serialize for ModeDependentValue +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + ModeDependentValue::Unique(value) => value.serialize(serializer), + ModeDependentValue::Dependent(options) => options.serialize(serializer), + } + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("bool or mode dependent bool") + } + + fn visit_bool(self, value: bool) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("i64 or mode dependent i64") + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("f64 or mode dependent f64") + } + + fn visit_f64(self, value: f64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + WhatAmIMatcherVisitor {} + .visit_str(value) + .map(ModeDependentValue::Unique) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl ModeDependent for Option> { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.router(), + None => None, + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.peer(), + None => None, + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.client(), + None => None, + } + } +} diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 4a6f8cd9a7..7c5b3b61ac 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -21,7 +21,7 @@ use std::time::Duration; use zenoh_buffers::reader::DidntRead; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_config::{unwrap_or_default, ModeDependent, ModeDependentValue}; +use zenoh_config::{unwrap_or_default, ModeDependent, ModeDependentValue, ValidatedMap}; use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, WhatAmI, ZenohId}, @@ -141,7 +141,15 @@ impl Runtime { ) }; - self.bind_listeners(&listeners).await?; + match self + .bind_listeners(&listeners) + .timeout(CONNECTION_TIMEOUT) + .await + { + Ok(Ok(_)) => return Ok(()), + Ok(Err(e)) => log::warn!("Unable to listen to {}", e), //TODO(sashacmc): fix text + Err(e) => log::warn!("Unable to listen to {}", e), + } for peer in peers { self.spawn_peer_connector(peer).await?; @@ -297,41 +305,39 @@ impl Runtime { listen: bool, ) -> zenoh_config::ConnectionRetryConf { let guard = &self.state.config.lock(); - let mut res = zenoh_config::ConnectionRetryConf::default(); - if let Some(cfg) = if listen { - guard.listen.retry() + + let mut res: zenoh_config::ConnectionRetryConf; + if listen { + let default = zenoh_config::ConnectionRetryModeDependentConf::default_listen(); + res = guard + .listen() + .retry() + .unwrap_or_else(|| default) + .get(self.whatami(), default) } else { - guard.connect.retry() - } { - res = cfg.clone(); + let default = zenoh_config::ConnectionRetryModeDependentConf::default_connect(); + res = guard + .connect() + .retry() + .unwrap_or_else(|| default) + .get(self.whatami(), default) } - //TODO(sashacmc): - // "tcp/192.168.0.1:7447#retry_count=1;retry_timeout_ms=1000;retry_timeout_increase_factor=2;retry_exit_on_fail=false" - // seems too long, but I don't thick someone will configure it often like it - // alternatives: - // "tcp/192.168.0.1:7447#retry{count=1;timeout_ms=1000;timeout_increase_factor=2;exit_on_fail=false}" - // but we have no support for it now - // - // "tcp/192.168.0.1:7447#retry=1;tm=1000;incfactor=2;exit_on_fail=false" - // but it not consistent with global config - // let config = endpoint.config(); - if let Some(val) = config.get("retry_count") { - res.count = zparse_default!(val, res.count); - } if let Some(val) = config.get("retry_timeout_ms") { res.timeout_ms = zparse_default!(val, res.timeout_ms); } - if let Some(val) = config.get("retry_timeout_increase_factor") { - res.timeout_increase_factor = zparse_default!(val, res.timeout_increase_factor); + if let Some(val) = config.get("retry_period_init_ms") { + res.period_init_ms = zparse_default!(val, res.period_init_ms); } - if let Some(val) = config.get("retry_exit_on_fail") { - //TODO(sashacmc): rewrite to use separate structure or macro - res.exit_on_fail = ModeDependentValue::Unique(zparse_default!( - val, - *res.exit_on_fail.get(self.whatami()).unwrap_or(&false) - )); + if let Some(val) = config.get("retry_period_max_ms") { + res.period_max_ms = zparse_default!(val, res.period_max_ms); + } + if let Some(val) = config.get("retry_period_increase_factor") { + res.period_increase_factor = zparse_default!(val, res.period_increase_factor); + } + if let Some(val) = config.get("retry_fail_exit") { + res.fail_exit = zparse_default!(val, res.fail_exit); } res } @@ -340,13 +346,20 @@ impl Runtime { for listener in listeners { let endpoint = listener.clone(); let retry_config = self.get_retry_config(&endpoint, true); - log::info!( - "Retry!!!!!!!!!!!!!! {}: {}", - retry_config.count, - retry_config.timeout_ms - ); - if retry_config.count > 0 { - self.spawn_add_listener(endpoint, retry_config).await? + if retry_config.timeout_ms > 0 { + match self.spawn_add_listener(endpoint, retry_config).await { + Ok(Ok(_)) => return Ok(()), + Ok(Err(e)) => log::warn!("Unable to listen to {}! {}", listener, e), + Err(e) => log::warn!("Unable to listen to {}! {}", listener, e), + } + //TODO(sashacmc): Why it was implelneted like it? + //We agree rework it to the iteration count, or we should use this logic to connections + //listener too + async_std::task::sleep(delay).await; + delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; + if delay > CONNECTION_RETRY_MAX_PERIOD { + delay = CONNECTION_RETRY_MAX_PERIOD; + } } else { self.add_listener(endpoint).await? }