From 7fc04e7e70d08c0b51e91536cd76cab7be33bea6 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 19 Mar 2024 15:48:13 +0100 Subject: [PATCH] Add connection retry (#770) - Introduce connection retry config for listen and connect endpoints. - Listener initialisation reworked to add the connection retry. - Connection initialization reworked to be consistent with listening where it posible - Some configuration refactoring (ModeDependent related code moved to separate file) --- Cargo.lock | 121 +------- DEFAULT_CONFIG.json5 | 44 +++ commons/zenoh-config/Cargo.toml | 1 + commons/zenoh-config/src/connection_retry.rs | 200 +++++++++++++ commons/zenoh-config/src/defaults.rs | 27 ++ commons/zenoh-config/src/lib.rs | 207 ++----------- commons/zenoh-config/src/mode_dependent.rs | 281 ++++++++++++++++++ commons/zenoh-core/src/macros.rs | 19 ++ zenoh/src/net/runtime/orchestrator.rs | 291 +++++++++++++++---- zenoh/tests/connection_retry.rs | 171 +++++++++++ 10 files changed, 996 insertions(+), 366 deletions(-) create mode 100644 commons/zenoh-config/src/connection_retry.rs create mode 100644 commons/zenoh-config/src/mode_dependent.rs create mode 100644 zenoh/tests/connection_retry.rs diff --git a/Cargo.lock b/Cargo.lock index ba84d51ee8..8878aa255a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1091,15 +1091,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -[[package]] -name = "encoding_rs" -version = "0.8.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "env_filter" version = "0.1.0" @@ -1463,25 +1454,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "h2" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http 0.2.9", - "indexmap 1.9.3", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "half" version = "1.8.2" @@ -1567,17 +1539,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "http" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.0.0" @@ -1589,17 +1550,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" -dependencies = [ - "bytes", - "http 0.2.9", - "pin-project-lite 0.2.13", -] - [[package]] name = "http-client" version = "6.5.3" @@ -1646,30 +1596,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "hyper" -version = "0.14.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "h2", - "http 0.2.9", - "http-body", - "httparse", - "httpdate", - "itoa", - "pin-project-lite 0.2.13", - "socket2 0.4.9", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "iana-time-zone" version = "0.1.57" @@ -2752,40 +2678,6 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" -[[package]] -name = "reqwest" -version = "0.11.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" -dependencies = [ - "base64 0.21.4", - "bytes", - "encoding_rs", - "futures-core", - "futures-util", - "h2", - "http 0.2.9", - "http-body", - "hyper", - "ipnet", - "js-sys", - "log", - "mime", - "once_cell", - "percent-encoding", - "pin-project-lite 0.2.13", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", - "winreg", -] - [[package]] name = "ring" version = "0.16.20" @@ -3816,15 +3708,8 @@ dependencies = [ "hashbrown 0.14.0", "pin-project-lite 0.2.13", "tokio", - "tracing", ] -[[package]] -name = "tower-service" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" - [[package]] name = "tracing" version = "0.1.37" @@ -3867,7 +3752,7 @@ dependencies = [ "byteorder", "bytes", "data-encoding", - "http 1.0.0", + "http", "httparse", "log", "rand 0.8.5", @@ -4562,6 +4447,7 @@ version = "0.11.0-dev" dependencies = [ "flume", "json5", + "log", "num_cpus", "secrecy", "serde", @@ -4631,6 +4517,7 @@ dependencies = [ "zenoh-core", "zenoh-macros", "zenoh-result", + "zenoh-runtime", "zenoh-sync", "zenoh-util", ] @@ -4679,13 +4566,11 @@ dependencies = [ "flume", "futures", "log", - "lz4_flex", "rustls 0.22.2", "rustls-webpki 0.102.2", "serde", "tokio", "tokio-util", - "typenum", "zenoh-buffers", "zenoh-codec", "zenoh-core", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 66352fe141..1f9094efa6 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -22,9 +22,31 @@ /// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to: /// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0 connect: { + /// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout) + /// Accepts a single value or different values for router, peer and client. + timeout_ms: { router: -1, peer: -1, client: 0 }, + endpoints: [ // "/
" ], + + /// Global connect configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000" + + /// exit from application, if timeout exceed + exit_on_failure: { router: false, peer: false, client: true }, + /// connect establishing retry configuration + retry: { + /// intial wait timeout until next connect try + period_init_ms: 1000, + /// maximum wait timeout until next connect try + period_max_ms: 4000, + /// increase factor for the next timeout until nexti connect try + period_increase_factor: 2, + }, }, /// Which endpoints to listen on. E.g. tcp/localhost:7447. @@ -33,9 +55,31 @@ /// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to: /// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0 listen: { + /// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout) + /// Accepts a single value or different values for router, peer and client. + timeout_ms: 0, + endpoints: [ // "/
" ], + + /// Global listen configuration, + /// Accepts a single value or different values for router, peer and client. + /// The configuration can also be specified for the separate endpoint + /// it will override the global one + /// E.g. tcp/192.168.0.1:7447#exit_on_failure=false;retry_period_max_ms=1000" + + /// exit from application, if timeout exceed + exit_on_failure: true, + /// listen retry configuration + retry: { + /// intial wait timeout until next try + period_init_ms: 1000, + /// maximum wait timeout until next try + period_max_ms: 4000, + /// increase factor for the next timeout until next try + period_increase_factor: 2, + }, }, /// Configure the scouting mechanisms and their behaviours scouting: { diff --git a/commons/zenoh-config/Cargo.toml b/commons/zenoh-config/Cargo.toml index f0189ff3e7..feade8cc10 100644 --- a/commons/zenoh-config/Cargo.toml +++ b/commons/zenoh-config/Cargo.toml @@ -24,6 +24,7 @@ categories = { workspace = true } description = "Internal crate for zenoh." [dependencies] +log = { workspace = true } flume = { workspace = true } json5 = { workspace = true } num_cpus = { workspace = true } diff --git a/commons/zenoh-config/src/connection_retry.rs b/commons/zenoh-config/src/connection_retry.rs new file mode 100644 index 0000000000..a845fbfe6a --- /dev/null +++ b/commons/zenoh-config/src/connection_retry.rs @@ -0,0 +1,200 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use crate::{ + defaults::{ + self, DEFAULT_CONNECT_EXIT_ON_FAIL, DEFAULT_CONNECT_TIMEOUT_MS, + DEFAULT_LISTEN_EXIT_ON_FAIL, DEFAULT_LISTEN_TIMEOUT_MS, + }, + Config, +}; +use serde::{Deserialize, Serialize}; +use zenoh_core::zparse_default; +use zenoh_protocol::core::WhatAmI; + +use crate::mode_dependent::*; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ConnectionRetryModeDependentConf { + // intial wait timeout until next try + pub period_init_ms: Option>, + // maximum wait timeout until next try + pub period_max_ms: Option>, + // increase factor for the next timeout until next try + pub period_increase_factor: Option>, +} + +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] +pub struct ConnectionRetryConf { + pub exit_on_failure: bool, + pub period_init_ms: i64, + pub period_max_ms: i64, + pub period_increase_factor: f64, +} + +impl ConnectionRetryConf { + pub fn new( + whatami: WhatAmI, + exit_on_failure: bool, + retry: ConnectionRetryModeDependentConf, + default_retry: ConnectionRetryModeDependentConf, + ) -> ConnectionRetryConf { + ConnectionRetryConf { + exit_on_failure, + period_init_ms: *retry + .period_init_ms + .get(whatami) + .unwrap_or(default_retry.period_init_ms.get(whatami).unwrap()), + period_max_ms: *retry + .period_max_ms + .get(whatami) + .unwrap_or(default_retry.period_max_ms.get(whatami).unwrap()), + period_increase_factor: *retry + .period_increase_factor + .get(whatami) + .unwrap_or(default_retry.period_increase_factor.get(whatami).unwrap()), + } + } + + pub fn timeout(&self) -> std::time::Duration { + ms_to_duration(self.period_init_ms) + } + + pub fn period(&self) -> ConnectionRetryPeriod { + ConnectionRetryPeriod::new(self) + } +} + +pub struct ConnectionRetryPeriod { + conf: ConnectionRetryConf, + delay: i64, +} + +impl ConnectionRetryPeriod { + pub fn new(conf: &ConnectionRetryConf) -> ConnectionRetryPeriod { + ConnectionRetryPeriod { + conf: conf.clone(), + delay: conf.period_init_ms, + } + } + + pub fn duration(&self) -> std::time::Duration { + if self.conf.period_init_ms < 0 { + return std::time::Duration::MAX; + } + + if self.conf.period_init_ms == 0 { + return std::time::Duration::from_millis(0); + } + + std::time::Duration::from_millis(self.delay as u64) + } + + pub fn next_duration(&mut self) -> std::time::Duration { + let res = self.duration(); + + self.delay = (self.delay as f64 * self.conf.period_increase_factor) as i64; + if self.conf.period_max_ms > 0 && self.delay > self.conf.period_max_ms { + self.delay = self.conf.period_max_ms; + } + + res + } +} + +fn ms_to_duration(ms: i64) -> std::time::Duration { + if ms >= 0 { + std::time::Duration::from_millis(ms as u64) + } else { + std::time::Duration::MAX + } +} + +pub fn get_global_listener_timeout(config: &Config) -> std::time::Duration { + let whatami = config.mode().unwrap_or(defaults::mode); + ms_to_duration( + *config + .listen() + .timeout_ms() + .get(whatami) + .unwrap_or(DEFAULT_LISTEN_TIMEOUT_MS.get(whatami).unwrap()), + ) +} + +pub fn get_global_connect_timeout(config: &Config) -> std::time::Duration { + let whatami = config.mode().unwrap_or(defaults::mode); + ms_to_duration( + *config + .connect() + .timeout_ms() + .get(whatami) + .unwrap_or(DEFAULT_CONNECT_TIMEOUT_MS.get(whatami).unwrap()), + ) +} + +pub fn get_retry_config( + config: &Config, + endpoint: Option<&EndPoint>, + listen: bool, +) -> ConnectionRetryConf { + let whatami = config.mode().unwrap_or(defaults::mode); + + let default_retry = ConnectionRetryModeDependentConf::default(); + let retry: ConnectionRetryModeDependentConf; + let exit_on_failure: bool; + if listen { + retry = config + .listen() + .retry() + .clone() + .unwrap_or_else(|| default_retry.clone()); + + exit_on_failure = *config + .listen() + .exit_on_failure() + .get(whatami) + .unwrap_or(DEFAULT_LISTEN_EXIT_ON_FAIL.get(whatami).unwrap()); + } else { + retry = config + .connect() + .retry() + .clone() + .unwrap_or_else(|| default_retry.clone()); + + exit_on_failure = *config + .connect() + .exit_on_failure() + .get(whatami) + .unwrap_or(DEFAULT_CONNECT_EXIT_ON_FAIL.get(whatami).unwrap()); + } + + let mut res = ConnectionRetryConf::new(whatami, exit_on_failure, retry, default_retry); + + if let Some(endpoint) = endpoint { + let config = endpoint.config(); + if let Some(val) = config.get("exit_on_failure") { + res.exit_on_failure = zparse_default!(val, res.exit_on_failure); + } + if let Some(val) = config.get("retry_period_init_ms") { + res.period_init_ms = zparse_default!(val, res.period_init_ms); + } + if let Some(val) = config.get("retry_period_max_ms") { + res.period_max_ms = zparse_default!(val, res.period_max_ms); + } + if let Some(val) = config.get("retry_period_increase_factor") { + res.period_increase_factor = zparse_default!(val, res.period_increase_factor); + } + } + res +} diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..7c9184702f 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -211,3 +211,30 @@ impl Default for SharedMemoryConf { Self { enabled: false } } } + +pub const DEFAULT_CONNECT_TIMEOUT_MS: ModeDependentValue = + ModeDependentValue::Dependent(ModeValues { + client: Some(0), + peer: Some(-1), + router: Some(-1), + }); + +pub const DEFAULT_CONNECT_EXIT_ON_FAIL: ModeDependentValue = + ModeDependentValue::Dependent(ModeValues { + client: Some(true), + peer: Some(false), + router: Some(false), + }); + +pub const DEFAULT_LISTEN_TIMEOUT_MS: ModeDependentValue = ModeDependentValue::Unique(0); +pub const DEFAULT_LISTEN_EXIT_ON_FAIL: ModeDependentValue = ModeDependentValue::Unique(true); + +impl Default for ConnectionRetryModeDependentConf { + fn default() -> Self { + Self { + period_init_ms: Some(ModeDependentValue::Unique(1000)), + period_max_ms: Some(ModeDependentValue::Unique(4000)), + period_increase_factor: Some(ModeDependentValue::Unique(2.)), + } + } +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 9346e9825e..3a0a9c3c20 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -15,12 +15,10 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; + use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; -use serde::{ - de::{self, MapAccess, Visitor}, - Deserialize, Serialize, -}; +use serde::{Deserialize, Serialize}; use serde_json::Value; #[allow(unused_imports)] use std::convert::TryFrom; // This is a false positive from the rust analyser @@ -29,7 +27,6 @@ use std::{ collections::HashSet, fmt, io::Read, - marker::PhantomData, net::SocketAddr, path::Path, sync::{Arc, Mutex, MutexGuard, Weak}, @@ -47,6 +44,12 @@ use zenoh_protocol::{ use zenoh_result::{bail, zerror, ZResult}; use zenoh_util::LibLoader; +pub mod mode_dependent; +pub use mode_dependent::*; + +pub mod connection_retry; +pub use connection_retry::*; + // Wrappers for secrecy of values #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct SecretString(String); @@ -178,12 +181,22 @@ validated_struct::validator! { /// Which zenoh nodes to connect to. pub connect: #[derive(Default)] ConnectConfig { + /// global timeout for full connect cycle + pub timeout_ms: Option>, pub endpoints: Vec, + /// if connection timeout exceed, exit from application + pub exit_on_failure: Option>, + pub retry: Option, }, /// Which endpoints to listen on. `zenohd` will add `tcp/[::]:7447` to these locators if left empty. pub listen: #[derive(Default)] ListenConfig { + /// global timeout for full listen cycle + pub timeout_ms: Option>, pub endpoints: Vec, + /// if connection timeout exceed, exit from application + pub exit_on_failure: Option>, + pub retry: Option, }, pub scouting: #[derive(Default)] ScoutingConf { @@ -1245,190 +1258,6 @@ impl validated_struct::ValidatedMap for PluginsConfig { } } -pub trait ModeDependent { - fn router(&self) -> Option<&T>; - fn peer(&self) -> Option<&T>; - fn client(&self) -> Option<&T>; - #[inline] - fn get(&self, whatami: WhatAmI) -> Option<&T> { - match whatami { - WhatAmI::Router => self.router(), - WhatAmI::Peer => self.peer(), - WhatAmI::Client => self.client(), - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ModeValues { - #[serde(skip_serializing_if = "Option::is_none")] - router: Option, - #[serde(skip_serializing_if = "Option::is_none")] - peer: Option, - #[serde(skip_serializing_if = "Option::is_none")] - client: Option, -} - -impl ModeDependent for ModeValues { - #[inline] - fn router(&self) -> Option<&T> { - self.router.as_ref() - } - - #[inline] - fn peer(&self) -> Option<&T> { - self.peer.as_ref() - } - - #[inline] - fn client(&self) -> Option<&T> { - self.client.as_ref() - } -} - -#[derive(Clone, Debug)] -pub enum ModeDependentValue { - Unique(T), - Dependent(ModeValues), -} - -impl ModeDependent for ModeDependentValue { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.router(), - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.peer(), - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Self::Unique(v) => Some(v), - Self::Dependent(o) => o.client(), - } - } -} - -impl serde::Serialize for ModeDependentValue -where - T: Serialize, -{ - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self { - ModeDependentValue::Unique(value) => value.serialize(serializer), - ModeDependentValue::Dependent(options) => options.serialize(serializer), - } - } -} -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("bool or mode dependent bool") - } - - fn visit_bool(self, value: bool) -> Result - where - E: de::Error, - { - Ok(ModeDependentValue::Unique(value)) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl<'a> serde::Deserialize<'a> for ModeDependentValue { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'a>, - { - struct UniqueOrDependent(PhantomData U>); - - impl<'de> Visitor<'de> for UniqueOrDependent> { - type Value = ModeDependentValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") - } - - fn visit_str(self, value: &str) -> Result - where - E: de::Error, - { - WhatAmIMatcherVisitor {} - .visit_str(value) - .map(ModeDependentValue::Unique) - } - - fn visit_map(self, map: M) -> Result - where - M: MapAccess<'de>, - { - ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) - .map(ModeDependentValue::Dependent) - } - } - deserializer.deserialize_any(UniqueOrDependent(PhantomData)) - } -} - -impl ModeDependent for Option> { - #[inline] - fn router(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.router(), - None => None, - } - } - - #[inline] - fn peer(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.peer(), - None => None, - } - } - - #[inline] - fn client(&self) -> Option<&T> { - match self { - Some(ModeDependentValue::Unique(v)) => Some(v), - Some(ModeDependentValue::Dependent(o)) => o.client(), - None => None, - } - } -} - #[macro_export] macro_rules! unwrap_or_default { ($val:ident$(.$field:ident($($param:ident)?))*) => { diff --git a/commons/zenoh-config/src/mode_dependent.rs b/commons/zenoh-config/src/mode_dependent.rs new file mode 100644 index 0000000000..91e366f452 --- /dev/null +++ b/commons/zenoh-config/src/mode_dependent.rs @@ -0,0 +1,281 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use serde::{ + de::{self, MapAccess, Visitor}, + Deserialize, Serialize, +}; +use std::fmt; +use std::marker::PhantomData; +pub use zenoh_protocol::core::{ + whatami, EndPoint, Locator, Priority, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, ZenohId, +}; + +pub trait ModeDependent { + fn router(&self) -> Option<&T>; + fn peer(&self) -> Option<&T>; + fn client(&self) -> Option<&T>; + #[inline] + fn get(&self, whatami: WhatAmI) -> Option<&T> { + match whatami { + WhatAmI::Router => self.router(), + WhatAmI::Peer => self.peer(), + WhatAmI::Client => self.client(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ModeValues { + #[serde(skip_serializing_if = "Option::is_none")] + pub router: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub peer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub client: Option, +} + +impl ModeDependent for ModeValues { + #[inline] + fn router(&self) -> Option<&T> { + self.router.as_ref() + } + + #[inline] + fn peer(&self) -> Option<&T> { + self.peer.as_ref() + } + + #[inline] + fn client(&self) -> Option<&T> { + self.client.as_ref() + } +} + +#[derive(Clone, Debug)] +pub enum ModeDependentValue { + Unique(T), + Dependent(ModeValues), +} + +impl ModeDependent for ModeDependentValue { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.router(), + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.peer(), + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Self::Unique(v) => Some(v), + Self::Dependent(o) => o.client(), + } + } +} + +impl serde::Serialize for ModeDependentValue +where + T: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + ModeDependentValue::Unique(value) => value.serialize(serializer), + ModeDependentValue::Dependent(options) => options.serialize(serializer), + } + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("bool or mode dependent bool") + } + + fn visit_bool(self, value: bool) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("i64 or mode dependent i64") + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("f64 or mode dependent f64") + } + + fn visit_f64(self, value: f64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value)) + } + + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(ModeDependentValue::Unique(value as f64)) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl<'a> serde::Deserialize<'a> for ModeDependentValue { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'a>, + { + struct UniqueOrDependent(PhantomData U>); + + impl<'de> Visitor<'de> for UniqueOrDependent> { + type Value = ModeDependentValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("WhatAmIMatcher or mode dependent WhatAmIMatcher") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + WhatAmIMatcherVisitor {} + .visit_str(value) + .map(ModeDependentValue::Unique) + } + + fn visit_map(self, map: M) -> Result + where + M: MapAccess<'de>, + { + ModeValues::deserialize(de::value::MapAccessDeserializer::new(map)) + .map(ModeDependentValue::Dependent) + } + } + deserializer.deserialize_any(UniqueOrDependent(PhantomData)) + } +} + +impl ModeDependent for Option> { + #[inline] + fn router(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.router(), + None => None, + } + } + + #[inline] + fn peer(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.peer(), + None => None, + } + } + + #[inline] + fn client(&self) -> Option<&T> { + match self { + Some(ModeDependentValue::Unique(v)) => Some(v), + Some(ModeDependentValue::Dependent(o)) => o.client(), + None => None, + } + } +} diff --git a/commons/zenoh-core/src/macros.rs b/commons/zenoh-core/src/macros.rs index 20b84f213f..5e8cefcf5a 100644 --- a/commons/zenoh-core/src/macros.rs +++ b/commons/zenoh-core/src/macros.rs @@ -192,6 +192,25 @@ macro_rules! zparse { }; } +// This macro allows to parse a string to the target type +// No faili, but log the error and use default +#[macro_export] +macro_rules! zparse_default { + ($str:expr, $default:expr) => { + match $str.parse() { + Ok(value) => value, + Err(_) => { + let e = zenoh_result::zerror!( + "Failed to read configuration: {} is not a valid value", + $str + ); + log::warn!("{}", e); + $default + } + } + }; +} + // This macro allows to do conditional compilation #[macro_export] macro_rules! zcondfeat { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 298548f3b7..3feee6fb1b 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -20,7 +20,9 @@ use tokio::net::UdpSocket; use zenoh_buffers::reader::DidntRead; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_config::{unwrap_or_default, ModeDependent}; +use zenoh_config::{ + get_global_connect_timeout, get_global_listener_timeout, unwrap_or_default, ModeDependent, +}; use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, WhatAmI, ZenohId}, @@ -32,10 +34,6 @@ const RCV_BUF_SIZE: usize = u16::MAX as usize; const SCOUT_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); const SCOUT_MAX_PERIOD: Duration = Duration::from_millis(8_000); const SCOUT_PERIOD_INCREASE_FACTOR: u32 = 2; -const CONNECTION_TIMEOUT: Duration = Duration::from_millis(10_000); -const CONNECTION_RETRY_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); -const CONNECTION_RETRY_MAX_PERIOD: Duration = Duration::from_millis(4_000); -const CONNECTION_RETRY_PERIOD_INCREASE_FACTOR: u32 = 2; const ROUTER_DEFAULT_LISTENER: &str = "tcp/[::]:7447"; const PEER_DEFAULT_LISTENER: &str = "tcp/[::]:0"; @@ -87,27 +85,7 @@ impl Runtime { bail!("No peer specified and multicast scouting desactivated!") } } - _ => { - for locator in &peers { - match tokio::time::timeout( - CONNECTION_TIMEOUT, - self.manager().open_transport_unicast(locator.clone()), - ) - .await - { - Ok(Ok(_)) => return Ok(()), - Ok(Err(e)) => log::warn!("Unable to connect to {}! {}", locator, e), - Err(e) => log::warn!("Unable to connect to {}! {}", locator, e), - } - } - let e = zerror!( - "{:?} Unable to connect to any of {:?}! ", - self.manager().get_locators(), - peers - ); - log::error!("{}", &e); - Err(e.into()) - } + _ => self.connect_peers(&peers, true).await, } } @@ -146,9 +124,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - for peer in peers { - self.spawn_peer_connector(peer).await?; - } + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -191,9 +167,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - for peer in peers { - self.spawn_peer_connector(peer).await?; - } + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -244,6 +218,116 @@ impl Runtime { Ok(()) } + async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + let timeout = self.get_global_connect_timeout(); + if timeout.is_zero() { + self.connect_peers_impl(peers, single_link).await + } else { + let res = tokio::time::timeout(timeout, async { + self.connect_peers_impl(peers, single_link).await.ok() + }) + .await; + match res { + Ok(_) => Ok(()), + Err(_) => { + let e = zerror!( + "{:?} Unable to connect to any of {:?}! ", + self.manager().get_locators(), + peers + ); + log::error!("{}", &e); + Err(e.into()) + } + } + } + } + + async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + if single_link { + self.connect_peers_single_link(peers).await + } else { + self.connect_peers_multiply_links(peers).await + } + } + + async fn connect_peers_single_link(&self, peers: &[EndPoint]) -> ZResult<()> { + for peer in peers { + let endpoint = peer.clone(); + let retry_config = self.get_connect_retry_config(&endpoint); + log::debug!( + "Try to connect: {:?}: global timeout: {:?}, retry: {:?}", + endpoint, + self.get_global_connect_timeout(), + retry_config + ); + if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { + // try to connect and exit immediately without retry + if self + .peer_connector(endpoint, retry_config.timeout()) + .await + .is_ok() + { + return Ok(()); + } + } else { + // try to connect with retry waiting + self.peer_connector_retry(endpoint).await; + return Ok(()); + } + } + let e = zerror!( + "{:?} Unable to connect to any of {:?}! ", + self.manager().get_locators(), + peers + ); + log::error!("{}", &e); + Err(e.into()) + } + + async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> { + for peer in peers { + let endpoint = peer.clone(); + let retry_config = self.get_connect_retry_config(&endpoint); + log::debug!( + "Try to connect: {:?}: global timeout: {:?}, retry: {:?}", + endpoint, + self.get_global_connect_timeout(), + retry_config + ); + if retry_config.timeout().is_zero() || self.get_global_connect_timeout().is_zero() { + // try to connect and exit immediately without retry + if let Err(e) = self.peer_connector(endpoint, retry_config.timeout()).await { + if retry_config.exit_on_failure { + return Err(e); + } + } + } else if retry_config.exit_on_failure { + // try to connect with retry waiting + self.peer_connector_retry(endpoint).await; + } else { + // try to connect in background + self.spawn_peer_connector(endpoint).await? + } + } + Ok(()) + } + + async fn peer_connector(&self, peer: EndPoint, timeout: std::time::Duration) -> ZResult<()> { + match tokio::time::timeout(timeout, self.manager().open_transport_unicast(peer.clone())) + .await + { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + log::warn!("Unable to connect to {}! {}", peer, e); + Err(e) + } + Err(e) => { + log::warn!("Unable to connect to {}! {}", peer, e); + Err(e.into()) + } + } + } + pub(crate) async fn update_peers(&self) -> ZResult<()> { let peers = { self.state.config.lock().connect().endpoints().clone() }; let tranports = self.manager().get_transports_unicast().await; @@ -293,24 +377,118 @@ impl Runtime { Ok(()) } + fn get_listen_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, Some(endpoint), true) + } + + fn get_connect_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, Some(endpoint), false) + } + + fn get_global_connect_retry_config(&self) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, None, false) + } + + fn get_global_listener_timeout(&self) -> std::time::Duration { + let guard = &self.state.config.lock(); + get_global_listener_timeout(guard) + } + + fn get_global_connect_timeout(&self) -> std::time::Duration { + let guard = &self.state.config.lock(); + get_global_connect_timeout(guard) + } + async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> { + let timeout = self.get_global_listener_timeout(); + if timeout.is_zero() { + self.bind_listeners_impl(listeners).await + } else { + let res = tokio::time::timeout(timeout, async { + self.bind_listeners_impl(listeners).await.ok() + }) + .await; + match res { + Ok(_) => Ok(()), + Err(e) => { + log::error!("Unable to open listeners: {}", e); + Err(Box::new(e)) + } + } + } + } + + async fn bind_listeners_impl(&self, listeners: &[EndPoint]) -> ZResult<()> { for listener in listeners { let endpoint = listener.clone(); - match self.manager().add_listener(endpoint).await { - Ok(listener) => log::debug!("Listener added: {}", listener), - Err(err) => { - log::error!("Unable to open listener {}: {}", listener, err); - return Err(err); - } + let retry_config = self.get_listen_retry_config(&endpoint); + log::debug!("Try to add listener: {:?}: {:?}", endpoint, retry_config); + if retry_config.timeout().is_zero() || self.get_global_listener_timeout().is_zero() { + // try to add listener and exit immediately without retry + if let Err(e) = self.add_listener(endpoint).await { + if retry_config.exit_on_failure { + return Err(e); + } + }; + } else if retry_config.exit_on_failure { + // try to add listener with retry waiting + self.add_listener_retry(endpoint, retry_config).await + } else { + // try to add listener in background + self.spawn_add_listener(endpoint, retry_config).await } } + self.print_locators(); + Ok(()) + } + async fn spawn_add_listener( + &self, + listener: EndPoint, + retry_config: zenoh_config::ConnectionRetryConf, + ) { + let this = self.clone(); + self.spawn(async move { + this.add_listener_retry(listener, retry_config).await; + this.print_locators(); + }); + } + + async fn add_listener_retry( + &self, + listener: EndPoint, + retry_config: zenoh_config::ConnectionRetryConf, + ) { + let mut period = retry_config.period(); + loop { + if self.add_listener(listener.clone()).await.is_ok() { + break; + } + tokio::time::sleep(period.next_duration()).await; + } + } + + async fn add_listener(&self, listener: EndPoint) -> ZResult<()> { + let endpoint = listener.clone(); + match self.manager().add_listener(endpoint).await { + Ok(listener) => log::debug!("Listener added: {}", listener), + Err(err) => { + log::warn!("Unable to open listener {}: {}", listener, err); + return Err(err); + } + } + Ok(()) + } + + fn print_locators(&self) { let mut locators = self.state.locators.write().unwrap(); *locators = self.manager().get_locators(); for locator in &*locators { log::info!("Zenoh can be reached at: {}", locator); } - Ok(()) } pub fn get_interfaces(names: &str) -> Vec { @@ -470,20 +648,21 @@ impl Runtime { .await? { let this = self.clone(); - self.spawn(async move { this.peer_connector(peer).await }); + self.spawn(async move { this.peer_connector_retry(peer).await }); Ok(()) } else { bail!("Forbidden multicast endpoint in connect list!") } } - async fn peer_connector(&self, peer: EndPoint) { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + async fn peer_connector_retry(&self, peer: EndPoint) { + let retry_config = self.get_connect_retry_config(&peer); + let mut period = retry_config.period(); loop { log::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.timeout(), self.manager().open_transport_unicast(endpoint), ) .await @@ -505,7 +684,7 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } Err(e) => { @@ -513,15 +692,11 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } } - tokio::time::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + tokio::time::sleep(period.next_duration()).await; } } @@ -636,10 +811,11 @@ impl Runtime { }; let endpoint = locator.to_owned().into(); + let retry_config = self.get_connect_retry_config(&endpoint); let manager = self.manager(); if is_multicast { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.timeout(), manager.open_transport_multicast(endpoint), ) .await @@ -656,7 +832,7 @@ impl Runtime { } } else { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.timeout(), manager.open_transport_unicast(endpoint), ) .await @@ -843,13 +1019,10 @@ impl Runtime { WhatAmI::Client => { let runtime = session.runtime.clone(); session.runtime.spawn(async move { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + let retry_config = runtime.get_global_connect_retry_config(); + let mut period = retry_config.period(); while runtime.start_client().await.is_err() { - tokio::time::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + tokio::time::sleep(period.next_duration()).await; } }); } @@ -870,7 +1043,7 @@ impl Runtime { let runtime = session.runtime.clone(); session .runtime - .spawn(async move { runtime.peer_connector(endpoint).await }); + .spawn(async move { runtime.peer_connector_retry(endpoint).await }); } } } diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs new file mode 100644 index 0000000000..db84d7bd5d --- /dev/null +++ b/zenoh/tests/connection_retry.rs @@ -0,0 +1,171 @@ +use config::ConnectionRetryConf; + +use zenoh::prelude::sync::*; + +#[test] +fn retry_config_overriding() { + let mut config = Config::default(); + config + .insert_json5( + "listen/endpoints", + r#" + [ + "tcp/1.2.3.4:0", + "tcp/1.2.3.4:0#retry_period_init_ms=30000", + "tcp/1.2.3.4:0#retry_period_init_ms=30000;retry_period_max_ms=60000;retry_period_increase_factor=15;exit_on_failure=true", + ] + "#, + ) + .unwrap(); + + config + .insert_json5( + "listen/retry", + r#" + { + try_timeout_ms: 2000, + period_init_ms: 3000, + period_max_ms: 6000, + period_increase_factor: 1.5, + } + "#, + ) + .unwrap(); + + config + .insert_json5("listen/exit_on_failure", "false") + .unwrap(); + + let expected = vec![ + // global value + ConnectionRetryConf { + period_init_ms: 3000, + period_max_ms: 6000, + period_increase_factor: 1.5, + exit_on_failure: false, + }, + // override one key + ConnectionRetryConf { + period_init_ms: 30000, + period_max_ms: 6000, + period_increase_factor: 1.5, + exit_on_failure: false, + }, + // override all keys + ConnectionRetryConf { + period_init_ms: 30000, + period_max_ms: 60000, + period_increase_factor: 15., + exit_on_failure: true, + }, + ]; + + for (i, endpoint) in config.listen().endpoints().iter().enumerate() { + let retry_config = zenoh_config::get_retry_config(&config, Some(endpoint), true); + assert_eq!(retry_config, expected[i]); + } +} + +#[test] +fn retry_config_parsing() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: 1000, + period_max_ms: 6000, + period_increase_factor: 2, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + let expected = vec![1000, 2000, 4000, 6000, 6000, 6000, 6000]; + + for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); + assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); + } +} + +#[test] +fn retry_config_const_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: 1000, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + let expected = vec![1000, 1000, 1000, 1000]; + + for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); + assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); + } +} + +#[test] +fn retry_config_infinit_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: -1, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + + assert_eq!(period.duration(), std::time::Duration::MAX); + assert_eq!(period.next_duration(), std::time::Duration::MAX); +} + +#[test] +#[should_panic(expected = "Can not create a new TCP listener")] +fn listen_no_retry() { + let mut config = Config::default(); + config + .insert_json5("listen/endpoints", r#"["tcp/8.8.8.8:8"]"#) + .unwrap(); + + config.insert_json5("listen/timeout_ms", "0").unwrap(); + zenoh::open(config).res().unwrap(); +} + +#[test] +#[should_panic(expected = "value: Elapsed(())")] +fn listen_with_retry() { + let mut config = Config::default(); + config + .insert_json5("listen/endpoints", r#"["tcp/8.8.8.8:8"]"#) + .unwrap(); + + config.insert_json5("listen/timeout_ms", "1000").unwrap(); + + zenoh::open(config).res().unwrap(); +}