diff --git a/Cargo.lock b/Cargo.lock index 7c140c1c31..71547ab506 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5350,6 +5350,7 @@ dependencies = [ "tracing", "uhlc", "unwrap-infallible", + "validated_struct", "vec_map", "zenoh-buffers", "zenoh-codec", @@ -5479,6 +5480,7 @@ dependencies = [ "tokio", "tracing", "zenoh", + "zenoh-config", "zenoh-macros", "zenoh-util", ] @@ -5491,6 +5493,7 @@ dependencies = [ "futures", "tokio", "zenoh", + "zenoh-config", "zenoh-ext", ] @@ -5987,6 +5990,7 @@ dependencies = [ "tracing-subscriber", "url", "zenoh", + "zenoh-config", "zenoh-util", ] diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b57c62edde..799c4bbad0 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -20,13 +20,7 @@ pub mod wrappers; #[allow(unused_imports)] use std::convert::TryFrom; // This is a false positive from the rust analyser use std::{ - any::Any, - collections::HashSet, - fmt, - io::Read, - net::SocketAddr, - path::Path, - sync::{Arc, Mutex, MutexGuard, Weak}, + any::Any, collections::HashSet, fmt, io::Read, net::SocketAddr, ops, path::Path, sync::Weak, }; use include::recursive_include; @@ -36,7 +30,6 @@ use serde_json::{Map, Value}; use validated_struct::ValidatedMapAssociatedTypes; pub use validated_struct::{GetError, ValidatedMap}; pub use wrappers::ZenohId; -use zenoh_core::zlock; pub use zenoh_protocol::core::{ whatami, EndPoint, Locator, WhatAmI, WhatAmIMatcher, WhatAmIMatcherVisitor, }; @@ -57,7 +50,7 @@ pub use connection_retry::*; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct SecretString(String); -impl Deref for SecretString { +impl ops::Deref for SecretString { type Target = String; fn deref(&self) -> &Self::Target { @@ -229,16 +222,12 @@ fn config_keys() { } validated_struct::validator! { - /// The main configuration structure for Zenoh. - /// - /// Most fields are optional as a way to keep defaults flexible. Some of the fields have different default values depending on the rest of the configuration. - /// - /// To construct a configuration, we advise that you use a configuration file (JSON, JSON5 and YAML are currently supported, please use the proper extension for your format as the deserializer will be picked according to it). #[derive(Default)] #[recursive_attrs] #[derive(serde::Deserialize, serde::Serialize, Clone, Debug)] #[serde(default)] #[serde(deny_unknown_fields)] + #[doc(hidden)] Config { /// The Zenoh ID of the instance. This ID MUST be unique throughout your Zenoh infrastructure and cannot exceed 16 bytes of length. If left unset, a random u128 will be generated. id: ZenohId, @@ -708,10 +697,7 @@ impl Config { pub fn remove>(&mut self, key: K) -> ZResult<()> { let key = key.as_ref(); - self._remove(key) - } - fn _remove(&mut self, key: &str) -> ZResult<()> { let key = key.strip_prefix('/').unwrap_or(key); if !key.starts_with("plugins/") { bail!( @@ -820,226 +806,92 @@ fn config_from_json() { println!("{}", serde_json::to_string_pretty(&config).unwrap()); } -pub type Notification = Arc; - -struct NotifierInner { - inner: Mutex, - subscribers: Mutex>>, -} -pub struct Notifier { - inner: Arc>, -} -impl Clone for Notifier { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} -impl Notifier { - pub fn remove>(&self, key: K) -> ZResult<()> { - let key = key.as_ref(); - self._remove(key) - } - - fn _remove(&self, key: &str) -> ZResult<()> { - { - let mut guard = zlock!(self.inner.inner); - guard.remove(key)?; - } - self.notify(key); - Ok(()) - } -} -impl Notifier { - pub fn new(inner: T) -> Self { - Notifier { - inner: Arc::new(NotifierInner { - inner: Mutex::new(inner), - subscribers: Mutex::new(Vec::new()), - }), - } - } - pub fn subscribe(&self) -> flume::Receiver { - let (tx, rx) = flume::unbounded(); - { - zlock!(self.inner.subscribers).push(tx); - } - rx - } - pub fn notify>(&self, key: K) { - let key = key.as_ref(); - self._notify(key); - } - fn _notify(&self, key: &str) { - let key: Arc = Arc::from(key); - let mut marked = Vec::new(); - let mut guard = zlock!(self.inner.subscribers); - for (i, sub) in guard.iter().enumerate() { - if sub.send(key.clone()).is_err() { - marked.push(i) - } - } - for i in marked.into_iter().rev() { - guard.swap_remove(i); - } - } - - pub fn lock(&self) -> MutexGuard { - zlock!(self.inner.inner) - } -} - -impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for Notifier { - type Accessor = GetGuard<'a, T>; -} -impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for &Notifier { - type Accessor = GetGuard<'a, T>; -} -impl ValidatedMap for Notifier -where - T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>, -{ - fn insert<'d, D: serde::Deserializer<'d>>( - &mut self, - key: &str, - value: D, - ) -> Result<(), validated_struct::InsertionError> - where - validated_struct::InsertionError: From, - { - { - let mut guard = zlock!(self.inner.inner); - guard.insert(key, value)?; - } - self.notify(key); - Ok(()) - } - fn get<'a>( - &'a self, - key: &str, - ) -> Result<>::Accessor, GetError> - { - let guard: MutexGuard<'a, T> = zlock!(self.inner.inner); - // SAFETY: MutexGuard pins the mutex behind which the value is held. - let subref = guard.get(key.as_ref())? as *const _; - Ok(GetGuard { - _guard: guard, - subref, - }) - } - fn get_json(&self, key: &str) -> Result { - self.lock().get_json(key) - } - type Keys = T::Keys; - fn keys(&self) -> Self::Keys { - self.lock().keys() - } -} -impl ValidatedMap for &Notifier -where - T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>, -{ - fn insert<'d, D: serde::Deserializer<'d>>( - &mut self, - key: &str, - value: D, - ) -> Result<(), validated_struct::InsertionError> - where - validated_struct::InsertionError: From, - { - { - let mut guard = zlock!(self.inner.inner); - guard.insert(key, value)?; - } - self.notify(key); - Ok(()) - } - fn get<'a>( - &'a self, - key: &str, - ) -> Result<>::Accessor, GetError> - { - let guard: MutexGuard<'a, T> = zlock!(self.inner.inner); - // SAFETY: MutexGuard pins the mutex behind which the value is held. - let subref = guard.get(key.as_ref())? as *const _; - Ok(GetGuard { - _guard: guard, - subref, - }) - } - fn get_json(&self, key: &str) -> Result { - self.lock().get_json(key) - } - type Keys = T::Keys; - fn keys(&self) -> Self::Keys { - self.lock().keys() - } -} -impl Notifier -where - T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>, -{ - pub fn insert<'d, D: serde::Deserializer<'d>>( - &self, - key: &str, - value: D, - ) -> Result<(), validated_struct::InsertionError> - where - validated_struct::InsertionError: From, - { - self.lock().insert(key, value)?; - self.notify(key); - Ok(()) - } - - pub fn get( - &self, - key: &str, - ) -> Result<::Accessor, GetError> { - let guard = zlock!(self.inner.inner); - // SAFETY: MutexGuard pins the mutex behind which the value is held. - let subref = guard.get(key.as_ref())? as *const _; - Ok(GetGuard { - _guard: guard, - subref, - }) - } - - pub fn get_json(&self, key: &str) -> Result { - self.lock().get_json(key) - } - - pub fn insert_json5( - &self, - key: &str, - value: &str, - ) -> Result<(), validated_struct::InsertionError> { - self.insert(key, &mut json5::Deserializer::from_str(value)?) - } - - pub fn keys(&self) -> impl Iterator { - self.lock().keys().into_iter() - } -} - -pub struct GetGuard<'a, T> { - _guard: MutexGuard<'a, T>, - subref: *const dyn Any, -} -use std::ops::Deref; -impl<'a, T> Deref for GetGuard<'a, T> { - type Target = dyn Any; - - fn deref(&self) -> &Self::Target { - unsafe { &*self.subref } - } -} -impl<'a, T> AsRef for GetGuard<'a, T> { - fn as_ref(&self) -> &dyn Any { - self.deref() - } -} +// impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for Notifier { +// type Accessor = GetGuard<'a, T>; +// } +// impl<'a, T: 'a> ValidatedMapAssociatedTypes<'a> for &Notifier { +// type Accessor = GetGuard<'a, T>; +// } +// impl ValidatedMap for Notifier +// where +// T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>, +// { +// fn insert<'d, D: serde::Deserializer<'d>>( +// &mut self, +// key: &str, +// value: D, +// ) -> Result<(), validated_struct::InsertionError> +// where +// validated_struct::InsertionError: From, +// { +// { +// let mut guard = zlock!(self.inner.inner); +// guard.insert(key, value)?; +// } +// self.notify(key); +// Ok(()) +// } +// fn get<'a>( +// &'a self, +// key: &str, +// ) -> Result<>::Accessor, GetError> +// { +// let guard: MutexGuard<'a, T> = zlock!(self.inner.inner); +// // SAFETY: MutexGuard pins the mutex behind which the value is held. +// let subref = guard.get(key.as_ref())? as *const _; +// Ok(GetGuard { +// _guard: guard, +// subref, +// }) +// } +// fn get_json(&self, key: &str) -> Result { +// self.lock().get_json(key) +// } +// type Keys = T::Keys; +// fn keys(&self) -> Self::Keys { +// self.lock().keys() +// } +// } +// impl ValidatedMap for &Notifier +// where +// T: for<'a> ValidatedMapAssociatedTypes<'a, Accessor = &'a dyn Any>, +// { +// fn insert<'d, D: serde::Deserializer<'d>>( +// &mut self, +// key: &str, +// value: D, +// ) -> Result<(), validated_struct::InsertionError> +// where +// validated_struct::InsertionError: From, +// { +// { +// let mut guard = zlock!(self.inner.inner); +// guard.insert(key, value)?; +// } +// self.notify(key); +// Ok(()) +// } +// fn get<'a>( +// &'a self, +// key: &str, +// ) -> Result<>::Accessor, GetError> +// { +// let guard: MutexGuard<'a, T> = zlock!(self.inner.inner); +// // SAFETY: MutexGuard pins the mutex behind which the value is held. +// let subref = guard.get(key.as_ref())? as *const _; +// Ok(GetGuard { +// _guard: guard, +// subref, +// }) +// } +// fn get_json(&self, key: &str) -> Result { +// self.lock().get_json(key) +// } +// type Keys = T::Keys; +// fn keys(&self) -> Self::Keys { +// self.lock().keys() +// } +// } fn sequence_number_resolution_validator(b: &Bits) -> bool { b <= &Bits::from(TransportSn::MAX) diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs index f3533a6aea..da007e1f18 100644 --- a/commons/zenoh-macros/src/lib.rs +++ b/commons/zenoh-macros/src/lib.rs @@ -181,6 +181,27 @@ pub fn unstable(attr: TokenStream, tokens: TokenStream) -> TokenStream { TokenStream::from(item.to_token_stream()) } +// FIXME(fuzzypixelz): refactor `unstable` macro to accept arguments +#[proc_macro_attribute] +pub fn unstable_config(args: TokenStream, tokens: TokenStream) -> TokenStream { + let tokens = unstable_doc(args, tokens); + let mut item = match parse_annotable_item!(tokens) { + Ok(item) => item, + Err(err) => return err.into_compile_error().into(), + }; + + let attrs = match item.attributes_mut() { + Ok(attrs) => attrs, + Err(err) => return err.into_compile_error().into(), + }; + + let feature_gate: Attribute = + parse_quote!(#[cfg(any(feature = "unstable", feature = "unstable_config"))]); + attrs.push(feature_gate); + + TokenStream::from(item.to_token_stream()) +} + #[proc_macro_attribute] /// Adds a `#[cfg(feature = "internal")]` and `#[doc(hidden)]` attributes to the item. pub fn internal(_attr: TokenStream, tokens: TokenStream) -> TokenStream { diff --git a/examples/src/lib.rs b/examples/src/lib.rs index 20409b4f85..7766d379d2 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -3,7 +3,7 @@ //! Check ../README.md for usage. //! -use zenoh::config::Config; +use zenoh::{config::WhatAmI, Config}; #[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum Wai { @@ -59,9 +59,9 @@ impl From<&CommonArgs> for Config { None => Config::default(), }; match args.mode { - Some(Wai::Peer) => config.set_mode(Some(zenoh::config::WhatAmI::Peer)), - Some(Wai::Client) => config.set_mode(Some(zenoh::config::WhatAmI::Client)), - Some(Wai::Router) => config.set_mode(Some(zenoh::config::WhatAmI::Router)), + Some(Wai::Peer) => config.set_mode(Some(WhatAmI::Peer)), + Some(Wai::Client) => config.set_mode(Some(WhatAmI::Client)), + Some(Wai::Router) => config.set_mode(Some(WhatAmI::Router)), None => Ok(None), } .unwrap(); diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 63516bb253..4b06a97264 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -50,6 +50,7 @@ zenoh-macros = { workspace = true } [dev-dependencies] zenoh = { workspace = true, features = ["unstable"], default-features = true } +zenoh-config = { workspace = true } [package.metadata.docs.rs] features = ["unstable"] diff --git a/zenoh-ext/examples/Cargo.toml b/zenoh-ext/examples/Cargo.toml index 8dae03e596..de8c5ad49a 100644 --- a/zenoh-ext/examples/Cargo.toml +++ b/zenoh-ext/examples/Cargo.toml @@ -39,6 +39,7 @@ clap = { workspace = true, features = ["derive"] } zenoh-ext = { workspace = true } [dev-dependencies] +zenoh-config = { workspace = true } [[example]] name = "z_query_sub" diff --git a/zenoh-ext/examples/examples/z_pub_cache.rs b/zenoh-ext/examples/examples/z_pub_cache.rs index 6c47fb0862..d8e13faec4 100644 --- a/zenoh-ext/examples/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/examples/z_pub_cache.rs @@ -14,10 +14,8 @@ use std::time::Duration; use clap::{arg, Parser}; -use zenoh::{ - config::{Config, ModeDependentValue}, - key_expr::KeyExpr, -}; +use zenoh::{config::Config, key_expr::KeyExpr}; +use zenoh_config::ModeDependentValue; use zenoh_ext::*; use zenoh_ext_examples::CommonArgs; diff --git a/zenoh-ext/examples/src/lib.rs b/zenoh-ext/examples/src/lib.rs index 881d60c138..04d1223022 100644 --- a/zenoh-ext/examples/src/lib.rs +++ b/zenoh-ext/examples/src/lib.rs @@ -2,7 +2,7 @@ //! See the code in ../examples/ //! Check ../README.md for usage. //! -use zenoh::config::Config; +use zenoh::{config::WhatAmI, Config}; #[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum Wai { @@ -43,9 +43,9 @@ impl From<&CommonArgs> for Config { None => Config::default(), }; match value.mode { - Some(Wai::Peer) => config.set_mode(Some(zenoh::config::WhatAmI::Peer)), - Some(Wai::Client) => config.set_mode(Some(zenoh::config::WhatAmI::Client)), - Some(Wai::Router) => config.set_mode(Some(zenoh::config::WhatAmI::Router)), + Some(Wai::Peer) => config.set_mode(Some(WhatAmI::Peer)), + Some(Wai::Client) => config.set_mode(Some(WhatAmI::Client)), + Some(Wai::Router) => config.set_mode(Some(WhatAmI::Router)), None => Ok(None), } .unwrap(); diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 625494a757..f35c2ecbdb 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -633,7 +633,7 @@ where /// use zenoh::Wait; /// use zenoh_ext::*; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expr") /// .fetching( |cb| { @@ -777,7 +777,7 @@ impl FetchingSubscriber { /// use zenoh::Wait; /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut subscriber = session /// .declare_subscriber("key/expr") /// .fetching( |cb| { @@ -855,7 +855,7 @@ impl Drop for RepliesHandler { /// # use zenoh::Wait; /// # use zenoh_ext::*; /// # -/// # let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// # let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// # let mut fetching_subscriber = session /// # .declare_subscriber("key/expr") /// # .fetching( |cb| { diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 7c68f88ddb..facf0ebed1 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -18,14 +18,16 @@ use super::PublicationCacheBuilder; /// Some extensions to the [`zenoh::Session`](zenoh::Session) pub trait SessionExt<'s, 'a> { + // REVIEW(fuzzypixelz): this doc test is the only one to use the programmatic configuration API.. /// Examples: /// ``` /// # #[tokio::main] /// # async fn main() { - /// use zenoh::config::ModeDependentValue::Unique; /// use zenoh_ext::SessionExt; + /// use zenoh_config::ModeDependentValue::Unique; /// - /// let mut config = zenoh::config::default(); + /// + /// let mut config = zenoh::Config::default(); /// config.timestamping.set_enabled(Some(Unique(true))); /// let session = zenoh::open(config).await.unwrap(); /// let publication_cache = session.declare_publication_cache("key/expression").await.unwrap(); diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a48c4fe500..1c14f31b82 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -63,7 +63,7 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { /// use zenoh::Wait; /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expr") /// .fetching( |cb| { @@ -106,7 +106,7 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { /// # async fn main() { /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expr") /// .querying() @@ -140,7 +140,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde /// use zenoh::Wait; /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expr") /// .fetching( |cb| { @@ -195,7 +195,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde /// # async fn main() { /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expr") /// .querying() @@ -249,7 +249,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> /// use zenoh::Wait; /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .liveliness() /// .declare_subscriber("key/expr") @@ -307,7 +307,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> /// # async fn main() { /// use zenoh_ext::*; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .liveliness() /// .declare_subscriber("key/expr") diff --git a/zenoh-ext/tests/liveliness.rs b/zenoh-ext/tests/liveliness.rs index d211505c4c..14b6101f79 100644 --- a/zenoh-ext/tests/liveliness.rs +++ b/zenoh-ext/tests/liveliness.rs @@ -12,11 +12,8 @@ // ZettaScale Zenoh Team, // -use zenoh::{ - config::{self, EndPoint, WhatAmI}, - sample::SampleKind, - Wait, -}; +use zenoh::{sample::SampleKind, Wait}; +use zenoh_config::{EndPoint, WhatAmI}; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_liveliness_querying_subscriber_clique() { @@ -36,7 +33,7 @@ async fn test_liveliness_querying_subscriber_clique() { zenoh_util::init_log_from_env_or("error"); let peer1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -49,7 +46,7 @@ async fn test_liveliness_querying_subscriber_clique() { }; let peer2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -114,7 +111,7 @@ async fn test_liveliness_querying_subscriber_brokered() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -127,7 +124,7 @@ async fn test_liveliness_querying_subscriber_brokered() { }; let client1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -140,7 +137,7 @@ async fn test_liveliness_querying_subscriber_brokered() { }; let client2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -153,7 +150,7 @@ async fn test_liveliness_querying_subscriber_brokered() { }; let client3 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -220,7 +217,7 @@ async fn test_liveliness_fetching_subscriber_clique() { zenoh_util::init_log_from_env_or("error"); let peer1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -233,7 +230,7 @@ async fn test_liveliness_fetching_subscriber_clique() { }; let peer2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -302,7 +299,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -315,7 +312,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { }; let client1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -328,7 +325,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { }; let client2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -341,7 +338,7 @@ async fn test_liveliness_fetching_subscriber_brokered() { }; let client3 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 32371823e1..1d557b8530 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -64,6 +64,7 @@ transport_unixsock-stream = ["zenoh-transport/transport_unixsock-stream"] transport_ws = ["zenoh-transport/transport_ws"] transport_vsock = ["zenoh-transport/transport_vsock"] unstable = ["zenoh-keyexpr/unstable"] +unstable_config = [] [dependencies] tokio = { workspace = true, features = ["rt", "macros", "time"] } @@ -108,6 +109,7 @@ zenoh-util = { workspace = true } zenoh-runtime = { workspace = true } zenoh-task = { workspace = true } once_cell = { workspace = true } +validated_struct = { workspace = true } [dev-dependencies] tokio = { workspace = true } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 1eef911fd9..40389ff546 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -61,7 +61,7 @@ pub struct PublicationBuilderDelete; /// # async fn main() { /// use zenoh::{bytes::Encoding, qos::CongestionControl}; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// session /// .put("key/expression", "payload") /// .encoding(Encoding::TEXT_PLAIN) @@ -253,7 +253,7 @@ impl IntoFuture for PublicationBuilder, PublicationBuil /// # async fn main() { /// use zenoh::qos::CongestionControl; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session /// .declare_publisher("key/expression") /// .congestion_control(CongestionControl::Block) diff --git a/zenoh/src/api/config.rs b/zenoh/src/api/config.rs new file mode 100644 index 0000000000..ddbd33ccc2 --- /dev/null +++ b/zenoh/src/api/config.rs @@ -0,0 +1,200 @@ +use std::{ + any::Any, + error::Error, + fmt, + ops::{self, Deref}, + path::Path, + sync::{Arc, Mutex, MutexGuard}, +}; + +use zenoh_result::ZResult; + +/// Zenoh configuration. +/// +/// Most options are optional as a way to keep defaults flexible. Some of the options have different +/// default values depending on the rest of the configuration. +/// +/// To construct a configuration, we advise that you use a configuration file (JSON, JSON5 and YAML +/// are currently supported, please use the proper extension for your format as the deserializer +/// will be picked according to it). +#[derive(Default, Debug, Clone)] +pub struct Config(pub(crate) zenoh_config::Config); + +impl Config { + pub fn from_env() -> ZResult { + Ok(Config(zenoh_config::Config::from_env()?)) + } + + pub fn from_file>(path: P) -> ZResult { + Ok(Config(zenoh_config::Config::from_file(path)?)) + } + + pub fn insert_json5(&mut self, key: &str, value: &str) -> Result<(), InsertionError> { + ::insert_json5( + &mut self.0, + key, + value, + ) + .map_err(InsertionError) + } + + pub fn get<'a>(&'a self, key: &str) -> Result<&'a dyn Any, LookupError> { + ::get(&self.0, key) + .map_err(LookupError) + } + + pub fn remove>(&mut self, key: K) -> ZResult<()> { + self.0.remove(key) + } +} + +#[derive(Debug)] +pub struct InsertionError(validated_struct::InsertionError); + +impl fmt::Display for InsertionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", &self.0) + } +} + +impl Error for InsertionError {} + +#[derive(Debug)] +pub struct LookupError(validated_struct::GetError); + +impl fmt::Display for LookupError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", &self.0) + } +} + +impl Error for LookupError {} + +#[zenoh_macros::unstable_config] +impl std::ops::Deref for Config { + type Target = zenoh_config::Config; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[zenoh_macros::unstable_config] +impl std::ops::DerefMut for Config { + fn deref_mut(&mut self) -> &mut ::Target { + &mut self.0 + } +} + +impl fmt::Display for Config { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", &self.0) + } +} + +pub type Notification = Arc; + +struct NotifierInner { + inner: Mutex, + subscribers: Mutex>>, +} +pub struct Notifier { + inner: Arc>, +} +impl Clone for Notifier { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} +impl Notifier { + pub fn new(inner: Config) -> Self { + Notifier { + inner: Arc::new(NotifierInner { + inner: Mutex::new(inner), + subscribers: Mutex::new(Vec::new()), + }), + } + } + + pub fn subscribe(&self) -> flume::Receiver { + let (tx, rx) = flume::unbounded(); + self.lock_subscribers().push(tx); + rx + } + + pub fn notify>(&self, key: K) { + let key = key.as_ref(); + let key: Arc = Arc::from(key); + let mut marked = Vec::new(); + let mut subscribers = self.lock_subscribers(); + + for (i, sub) in subscribers.iter().enumerate() { + if sub.send(key.clone()).is_err() { + marked.push(i) + } + } + + for i in marked.into_iter().rev() { + subscribers.swap_remove(i); + } + } + + pub fn lock(&self) -> MutexGuard { + self.lock_config() + } + + fn lock_subscribers(&self) -> MutexGuard>> { + self.inner + .subscribers + .lock() + .expect("acquiring Notifier's subscribers Mutex should not fail") + } + + fn lock_config(&self) -> MutexGuard { + self.inner + .inner + .lock() + .expect("acquiring Notifier's Config Mutex should not fail") + } + + pub fn remove>(&self, key: K) -> ZResult<()> { + self.lock_config().remove(key.as_ref())?; + self.notify(key); + Ok(()) + } + + pub fn insert_json5(&self, key: &str, value: &str) -> Result<(), InsertionError> { + self.lock_config().insert_json5(key, value) + } + + pub fn get<'a>(&'a self, key: &str) -> Result, LookupError> { + let config = self.lock_config(); + // SAFETY: MutexGuard pins the mutex behind which the value is held. + let subref = config.get(key.as_ref())? as *const _; + Ok(LookupGuard { + _guard: config, + subref, + }) + } +} + +pub struct LookupGuard<'a, T> { + _guard: MutexGuard<'a, T>, + subref: *const dyn Any, +} + +impl<'a, T> ops::Deref for LookupGuard<'a, T> { + type Target = dyn Any; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.subref } + } +} + +impl<'a, T> AsRef for LookupGuard<'a, T> { + fn as_ref(&self) -> &dyn Any { + self.deref() + } +} diff --git a/zenoh/src/api/info.rs b/zenoh/src/api/info.rs index a63797c86c..95131eb6c6 100644 --- a/zenoh/src/api/info.rs +++ b/zenoh/src/api/info.rs @@ -29,7 +29,7 @@ use crate::net::runtime::Runtime; /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let zid = session.info().zid().await; /// # } /// ``` @@ -66,7 +66,7 @@ impl<'a> IntoFuture for ZenohIdBuilder<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut routers_zid = session.info().routers_zid().await; /// while let Some(router_zid) = routers_zid.next() {} /// # } @@ -113,7 +113,7 @@ impl<'a> IntoFuture for RoutersZenohIdBuilder<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let zid = session.info().zid().await; /// let mut peers_zid = session.info().peers_zid().await; /// while let Some(peer_zid) = peers_zid.next() {} @@ -161,7 +161,7 @@ impl<'a> IntoFuture for PeersZenohIdBuilder<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let info = session.info(); /// let zid = info.zid().await; /// # } @@ -178,7 +178,7 @@ impl SessionInfo { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let zid = session.info().zid().await; /// # } /// ``` @@ -196,7 +196,7 @@ impl SessionInfo { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut routers_zid = session.info().routers_zid().await; /// while let Some(router_zid) = routers_zid.next() {} /// # } @@ -214,7 +214,7 @@ impl SessionInfo { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut peers_zid = session.info().peers_zid().await; /// while let Some(peer_zid) = peers_zid.next() {} /// # } diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index fff40a5286..e94e61d6be 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -567,7 +567,7 @@ impl<'a> UndeclarableSealed<&'a Session> for KeyExpr<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let key_expr = session.declare_keyexpr("key/expression").await.unwrap(); /// session.undeclare(key_expr).await.unwrap(); /// # } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index c8a0b6f194..b5acc52bb7 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -52,7 +52,7 @@ use crate::api::session::WeakSession; /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -66,7 +66,7 @@ use crate::api::session::WeakSession; /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session.liveliness().get("key/**").await.unwrap(); /// while let Ok(reply) = replies.recv_async().await { /// if let Ok(sample) = reply.result() { @@ -82,7 +82,7 @@ use crate::api::session::WeakSession; /// # async fn main() { /// use zenoh::sample::SampleKind; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.liveliness().declare_subscriber("key/**").await.unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { /// match sample.kind() { @@ -110,7 +110,7 @@ impl<'a> Liveliness<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -145,7 +145,7 @@ impl<'a> Liveliness<'a> { /// # async fn main() { /// use zenoh::sample::SampleKind; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.liveliness().declare_subscriber("key/expression").await.unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { /// match sample.kind() { @@ -184,7 +184,7 @@ impl<'a> Liveliness<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session.liveliness().get("key/expression").await.unwrap(); /// while let Ok(reply) = replies.recv_async().await { /// if let Ok(sample) = reply.result() { @@ -223,7 +223,7 @@ impl<'a> Liveliness<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -295,7 +295,7 @@ pub(crate) struct LivelinessTokenState { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -318,7 +318,7 @@ pub struct LivelinessToken { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -363,7 +363,7 @@ impl LivelinessToken { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -412,7 +412,7 @@ impl Drop for LivelinessToken { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .liveliness() /// .declare_subscriber("key/expression") @@ -446,7 +446,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .liveliness() /// .declare_subscriber("key/expression") @@ -480,7 +480,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut n = 0; /// let subscriber = session /// .liveliness() @@ -509,7 +509,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .liveliness() /// .declare_subscriber("key/expression") @@ -631,7 +631,7 @@ where /// # async fn main() { /// # use std::convert::TryFrom; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let tokens = session /// .liveliness() /// .get("key/expression") @@ -662,7 +662,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session /// .liveliness() /// .get("key/expression") @@ -689,7 +689,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut n = 0; /// let queryable = session /// .liveliness() @@ -717,7 +717,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session /// .liveliness() /// .get("key/expression") diff --git a/zenoh/src/api/mod.rs b/zenoh/src/api/mod.rs index d3053cb3c9..c755a45f26 100644 --- a/zenoh/src/api/mod.rs +++ b/zenoh/src/api/mod.rs @@ -17,6 +17,7 @@ pub(crate) type Id = u32; pub(crate) mod admin; pub(crate) mod builders; pub(crate) mod bytes; +pub(crate) mod config; pub(crate) mod encoding; pub(crate) mod handlers; pub(crate) mod info; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 4870fac445..7db74eb89d 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -75,7 +75,7 @@ impl fmt::Debug for PublisherState { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.put("value").await.unwrap(); /// # } @@ -89,7 +89,7 @@ impl fmt::Debug for PublisherState { /// # async fn main() { /// use futures::StreamExt; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut subscriber = session.declare_subscriber("key/expression").await.unwrap(); /// let publisher = session.declare_publisher("another/key/expression").await.unwrap(); /// subscriber.stream().map(Ok).forward(publisher).await.unwrap(); @@ -120,7 +120,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression") /// .await /// .unwrap(); @@ -173,7 +173,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.put("value").await.unwrap(); /// # } @@ -203,7 +203,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.delete().await.unwrap(); /// # } @@ -229,7 +229,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_subscribers: bool = publisher /// .matching_status() @@ -256,7 +256,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher.matching_listener().await.unwrap(); /// while let Ok(matching_status) = matching_listener.recv_async().await { @@ -284,7 +284,7 @@ impl<'a> Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.undeclare().await.unwrap(); /// # } @@ -322,7 +322,7 @@ impl<'a> UndeclarableSealed<()> for Publisher<'a> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// publisher.undeclare().await.unwrap(); /// # } @@ -503,7 +503,7 @@ impl TryFrom for Priority { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_status = publisher.matching_status().await.unwrap(); /// # } @@ -523,7 +523,7 @@ impl MatchingStatus { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_subscribers: bool = publisher /// .matching_status() @@ -555,7 +555,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher /// .matching_listener() @@ -587,7 +587,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// # async fn main() { /// /// let mut n = 0; - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher /// .matching_listener() @@ -615,7 +615,7 @@ impl<'a, 'b> MatchingListenerBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher /// .matching_listener() @@ -754,7 +754,7 @@ pub(crate) struct MatchingListenerInner { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher.matching_listener().await.unwrap(); /// while let Ok(matching_status) = matching_listener.recv_async().await { @@ -781,7 +781,7 @@ impl MatchingListener { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression").await.unwrap(); /// let matching_listener = publisher.matching_listener().await.unwrap(); /// matching_listener.undeclare().await.unwrap(); @@ -856,10 +856,7 @@ impl IntoFuture for MatchingListenerUndeclaration { #[cfg(test)] mod tests { - use zenoh_config::Config; - use zenoh_core::Wait; - - use crate::api::sample::SampleKind; + use crate::{sample::SampleKind, Config, Wait}; #[cfg(feature = "internal")] #[test] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 7cb8d5eb84..ff9aba7520 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -180,7 +180,7 @@ impl QueryState { /// # async fn main() { /// use zenoh::{query::{ConsolidationMode, QueryTarget}}; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session /// .get("key/expression?value>1") /// .target(QueryTarget::All) @@ -266,7 +266,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session /// .get("key/expression") /// .callback(|reply| {println!("Received {:?}", reply.result());}) @@ -292,7 +292,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut n = 0; /// let queryable = session /// .get("key/expression") @@ -319,7 +319,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session /// .get("key/expression") /// .with(flume::bounded(32)) diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 97675336b7..cca8558cef 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -558,7 +558,7 @@ pub(crate) struct QueryableInner { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session.declare_queryable("key/expression").await.unwrap(); /// queryable.undeclare().await.unwrap(); /// # } @@ -592,7 +592,7 @@ impl IntoFuture for QueryableUndeclaration { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session.declare_queryable("key/expression").await.unwrap(); /// # } /// ``` @@ -621,7 +621,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session /// .declare_queryable("key/expression") /// .callback(|query| {println!(">> Handling query '{}'", query.selector());}) @@ -650,7 +650,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut n = 0; /// let queryable = session /// .declare_queryable("key/expression") @@ -677,7 +677,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session /// .declare_queryable("key/expression") /// .with(flume::bounded(32)) @@ -759,7 +759,7 @@ impl QueryableBuilder<'_, '_, Handler> { /// # async fn main() { /// use futures::prelude::*; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let (tx, rx) = flume::bounded(32); /// session /// .declare_queryable("key/expression") @@ -781,7 +781,7 @@ impl QueryableBuilder<'_, '_, Handler> { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session /// .declare_queryable("key/expression") /// .with(flume::bounded(32)) @@ -811,7 +811,7 @@ impl Queryable { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session.declare_queryable("key/expression") /// .await /// .unwrap(); @@ -848,7 +848,7 @@ impl Queryable { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session.declare_queryable("key/expression") /// .await /// .unwrap(); diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 73c0afcbdf..a9447bbc91 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -29,6 +29,7 @@ use zenoh_task::TerminatableTask; use crate::{ api::handlers::{locked, Callback, DefaultHandler, IntoHandler}, net::runtime::{orchestrator::Loop, Runtime}, + Config, }; /// A builder for initializing a [`Scout`]. @@ -39,7 +40,7 @@ use crate::{ /// # async fn main() { /// use zenoh::config::WhatAmI; /// -/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) +/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .await /// .unwrap(); /// while let Ok(hello) = receiver.recv_async().await { @@ -64,7 +65,7 @@ impl ScoutBuilder { /// # async fn main() { /// use zenoh::config::WhatAmI; /// - /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) + /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .callback(|hello| { println!("{}", hello); }) /// .await /// .unwrap(); @@ -90,7 +91,7 @@ impl ScoutBuilder { /// use zenoh::config::WhatAmI; /// /// let mut n = 0; - /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) + /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .callback_mut(move |_hello| { n += 1; }) /// .await /// .unwrap(); @@ -115,7 +116,7 @@ impl ScoutBuilder { /// # async fn main() { /// use zenoh::config::WhatAmI; /// - /// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) + /// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .with(flume::bounded(32)) /// .await /// .unwrap(); @@ -182,7 +183,7 @@ where /// # async fn main() { /// use zenoh::config::WhatAmI; /// -/// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) +/// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .callback(|hello| { println!("{}", hello); }) /// .await /// .unwrap(); @@ -202,7 +203,7 @@ impl ScoutInner { /// # async fn main() { /// use zenoh::config::WhatAmI; /// - /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) + /// let scout = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .callback(|hello| { println!("{}", hello); }) /// .await /// .unwrap(); @@ -237,7 +238,7 @@ impl fmt::Debug for ScoutInner { /// # async fn main() { /// use zenoh::config::WhatAmI; /// -/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) +/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .with(flume::bounded(32)) /// .await /// .unwrap(); @@ -270,7 +271,7 @@ impl Scout { /// # async fn main() { /// use zenoh::config::WhatAmI; /// - /// let scout = zenoh::scout(WhatAmI::Router, zenoh::config::default()) + /// let scout = zenoh::scout(WhatAmI::Router, zenoh::Config::default()) /// .with(flume::bounded(32)) /// .await /// .unwrap(); @@ -283,11 +284,7 @@ impl Scout { } } -fn _scout( - what: WhatAmIMatcher, - config: zenoh_config::Config, - callback: Callback, -) -> ZResult { +fn _scout(what: WhatAmIMatcher, config: Config, callback: Callback) -> ZResult { tracing::trace!("scout({}, {})", what, &config); let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address); let addr = config.scouting.multicast.address().unwrap_or(default_addr); @@ -352,7 +349,7 @@ fn _scout( /// # async fn main() { /// use zenoh::config::WhatAmI; /// -/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::config::default()) +/// let receiver = zenoh::scout(WhatAmI::Peer | WhatAmI::Router, zenoh::Config::default()) /// .await /// .unwrap(); /// while let Ok(hello) = receiver.recv_async().await { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4d98820468..6518c61fa5 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -30,7 +30,7 @@ use tracing::{error, info, trace, warn}; use uhlc::{Timestamp, HLC}; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; -use zenoh_config::{unwrap_or_default, wrappers::ZenohId, Config, Notifier}; +use zenoh_config::{unwrap_or_default, wrappers::ZenohId}; use zenoh_core::{zconfigurable, zread, Resolvable, Resolve, ResolveClosure, ResolveFuture, Wait}; #[cfg(feature = "unstable")] use zenoh_protocol::network::{ @@ -75,6 +75,7 @@ use super::{ SessionPutBuilder, }, bytes::ZBytes, + config::Notifier, encoding::Encoding, handlers::{Callback, DefaultHandler}, info::SessionInfo, @@ -100,10 +101,13 @@ use super::{ }; #[cfg(feature = "unstable")] use crate::api::selector::ZenohParameters; -use crate::net::{ - primitives::Primitives, - routing::dispatcher::face::Face, - runtime::{Runtime, RuntimeBuilder}, +use crate::{ + net::{ + primitives::Primitives, + routing::dispatcher::face::Face, + runtime::{Runtime, RuntimeBuilder}, + }, + Config, }; zconfigurable! { @@ -424,7 +428,7 @@ impl fmt::Debug for SessionInner { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// session.put("key/expression", "value").await.unwrap(); /// # } pub struct Session(pub(crate) Arc); @@ -577,7 +581,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .await @@ -604,7 +608,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// assert!(!session.is_closed()); /// session.close().await.unwrap(); /// assert!(session.is_closed()); @@ -633,7 +637,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let peers = session.config().get("connect/endpoints").unwrap(); /// # } /// ``` @@ -643,7 +647,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let _ = session.config().insert_json5("connect/endpoints", r#"["tcp/127.0.0.1/7447"]"#); /// # } /// ``` @@ -661,7 +665,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let timestamp = session.new_timestamp(); /// # } /// ``` @@ -692,7 +696,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let info = session.info(); /// # } /// ``` @@ -713,7 +717,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.declare_subscriber("key/expression") /// .await /// .unwrap(); @@ -753,7 +757,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let queryable = session.declare_queryable("key/expression") /// .await /// .unwrap(); @@ -796,7 +800,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let publisher = session.declare_publisher("key/expression") /// .await /// .unwrap(); @@ -831,7 +835,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -856,7 +860,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let key_expr = session.declare_keyexpr("key/expression").await.unwrap(); /// # } /// ``` @@ -911,7 +915,7 @@ impl Session { /// # async fn main() { /// use zenoh::bytes::Encoding; /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// session /// .put("key/expression", "payload") /// .encoding(Encoding::TEXT_PLAIN) @@ -954,7 +958,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// session.delete("key/expression").await.unwrap(); /// # } /// ``` @@ -990,7 +994,7 @@ impl Session { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session.get("key/expression").await.unwrap(); /// while let Ok(reply) = replies.recv_async().await { /// println!(">> Received {:?}", reply.result()); @@ -2809,7 +2813,7 @@ impl crate::net::primitives::EPrimitives for WeakSession { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// # } /// ``` /// @@ -2819,7 +2823,7 @@ impl crate::net::primitives::EPrimitives for WeakSession { /// use std::str::FromStr; /// use zenoh::session::ZenohId; /// -/// let mut config = zenoh::config::peer(); +/// let mut config = zenoh::Config::default(); /// config.set_id(ZenohId::from_str("221b72df20924c15b8794c6bdb471150").unwrap()); /// config.connect.endpoints.set( /// ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"].iter().map(|s|s.parse().unwrap()).collect()); @@ -2846,7 +2850,7 @@ where /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// # } /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 3cbe16044e..dbe1b63dc0 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -69,7 +69,7 @@ pub(crate) struct SubscriberInner { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .await @@ -106,7 +106,7 @@ impl IntoFuture for SubscriberUndeclaration { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .await @@ -156,7 +156,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()); }) @@ -185,7 +185,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let mut n = 0; /// let subscriber = session /// .declare_subscriber("key/expression") @@ -212,7 +212,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .with(flume::bounded(32)) @@ -364,7 +364,7 @@ where /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// session /// .declare_subscriber("key/expression") /// .callback(|sample| { println!("Received: {} {:?}", sample.key_expr(), sample.payload()) }) @@ -379,7 +379,7 @@ where /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session /// .declare_subscriber("key/expression") /// .with(flume::bounded(32)) @@ -406,7 +406,7 @@ impl Subscriber { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.declare_subscriber("key/expression") /// .await /// .unwrap(); @@ -448,7 +448,7 @@ impl Subscriber { /// # #[tokio::main] /// # async fn main() { /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.declare_subscriber("key/expression") /// .await /// .unwrap(); diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 3fa24e3c36..c67a82b4a3 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -37,7 +37,7 @@ //! //! #[tokio::main] //! async fn main() { -//! let session = zenoh::open(zenoh::config::default()).await.unwrap(); +//! let session = zenoh::open(zenoh::Config::default()).await.unwrap(); //! session.put("key/expression", "value").await.unwrap(); //! session.close().await.unwrap(); //! } @@ -50,7 +50,7 @@ //! //! #[tokio::main] //! async fn main() { -//! let session = zenoh::open(zenoh::config::default()).await.unwrap(); +//! let session = zenoh::open(zenoh::Config::default()).await.unwrap(); //! let subscriber = session.declare_subscriber("key/expression").await.unwrap(); //! while let Ok(sample) = subscriber.recv_async().await { //! println!("Received: {:?}", sample); @@ -66,7 +66,7 @@ //! //! #[tokio::main] //! async fn main() { -//! let session = zenoh::open(zenoh::config::default()).await.unwrap(); +//! let session = zenoh::open(zenoh::Config::default()).await.unwrap(); //! let replies = session.get("key/expression").await.unwrap(); //! while let Ok(reply) = replies.recv_async().await { //! println!(">> Received {:?}", reply.result()); @@ -296,7 +296,7 @@ pub mod scouting { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let liveliness = session /// .liveliness() /// .declare_token("key/expression") @@ -310,7 +310,7 @@ pub mod scouting { /// # #[tokio::main] /// # async fn main() { /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let replies = session.liveliness().get("key/**").await.unwrap(); /// while let Ok(reply) = replies.recv_async().await { /// if let Ok(sample) = reply.result() { @@ -326,7 +326,7 @@ pub mod scouting { /// # async fn main() { /// use zenoh::sample::SampleKind; /// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); /// let subscriber = session.liveliness().declare_subscriber("key/**").await.unwrap(); /// while let Ok(sample) = subscriber.recv_async().await { /// match sample.kind() { @@ -351,11 +351,9 @@ pub mod time { /// Configuration to pass to [`open`] and [`scout`] functions and associated constants pub mod config { - // pub use zenoh_config::{ - // client, default, peer, Config, EndPoint, Locator, ModeDependentValue, PermissionsConf, - // PluginLoad, ValidatedMap, ZenohId, - // }; - pub use zenoh_config::*; + pub use zenoh_config::{WhatAmI, WhatAmIMatcher}; + + pub use crate::api::config::{Config, InsertionError, LookupError, LookupGuard, Notifier}; } #[cfg(all( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index f9e1674c3e..3dedbfa462 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -180,7 +180,8 @@ pub(crate) struct HatCode {} impl HatBaseTrait for HatCode { fn init(&self, tables: &mut Tables, runtime: Runtime) { - let config = runtime.config().lock(); + let config_guard = runtime.config().lock(); + let config = &config_guard.0; let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); @@ -194,7 +195,7 @@ impl HatBaseTrait for HatCode { unwrap_or_default!(config.routing().peer().mode()) == *"linkstate"; let router_peers_failover_brokering = unwrap_or_default!(config.routing().router().peers_failover_brokering()); - drop(config); + drop(config_guard); hat_mut!(tables).linkstatepeers_net = Some(Network::new( "[Peers network]".to_string(), diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index a2d3c66aa3..7ffc568d5c 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -307,7 +307,8 @@ pub(crate) struct HatCode {} impl HatBaseTrait for HatCode { fn init(&self, tables: &mut Tables, runtime: Runtime) { - let config = runtime.config().lock(); + let config_guard = runtime.config().lock(); + let config = &config_guard.0; let whatami = tables.whatami; let gossip = unwrap_or_default!(config.scouting().gossip().enabled()); let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop()); @@ -322,7 +323,7 @@ impl HatBaseTrait for HatCode { unwrap_or_default!(config.routing().peer().mode()) == *"linkstate"; let router_peers_failover_brokering = unwrap_or_default!(config.routing().router().peers_failover_brokering()); - drop(config); + drop(config_guard); if router_full_linkstate | gossip { hat_mut!(tables).routers_net = Some(Network::new( diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 1a02513494..c86e9665cb 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -36,7 +36,7 @@ use futures::{stream::StreamExt, Future}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use uhlc::{HLCBuilder, HLC}; -use zenoh_config::wrappers::ZenohId; +use zenoh_config::{unwrap_or_default, ModeDependent, ZenohId}; use zenoh_link::{EndPoint, Link}; use zenoh_plugin_trait::{PluginStartArgs, StructVersion}; use zenoh_protocol::{ @@ -61,10 +61,7 @@ use super::{primitives::DeMux, routing, routing::router::Router}; use crate::api::loader::{load_plugins, start_plugins}; #[cfg(feature = "plugins")] use crate::api::plugins::PluginsManager; -use crate::{ - config::{unwrap_or_default, Config, ModeDependent, Notifier}, - GIT_VERSION, LONG_VERSION, -}; +use crate::{config::Notifier, Config, GIT_VERSION, LONG_VERSION}; pub(crate) struct RuntimeState { zid: ZenohId, diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 6bff4e75be..ce08e1b6ba 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -1180,6 +1180,7 @@ impl Runtime { .state .config .lock() + .0 .connect() .endpoints() .get(session.runtime.state.whatami) diff --git a/zenoh/tests/acl.rs b/zenoh/tests/acl.rs index 285e68b254..7c2d1aed94 100644 --- a/zenoh/tests/acl.rs +++ b/zenoh/tests/acl.rs @@ -19,12 +19,8 @@ mod test { }; use tokio::runtime::Handle; - use zenoh::{ - config, - config::{EndPoint, WhatAmI}, - sample::SampleKind, - Config, Session, - }; + use zenoh::{config::WhatAmI, sample::SampleKind, Config, Session}; + use zenoh_config::{EndPoint, ModeDependentValue}; use zenoh_core::{zlock, ztimeout}; const TIMEOUT: Duration = Duration::from_secs(60); @@ -59,7 +55,7 @@ mod test { } async fn get_basic_router_config(port: u16) -> Config { - let mut config = config::default(); + let mut config = Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -77,9 +73,29 @@ mod test { async fn get_client_sessions(port: u16) -> (Session, Session) { println!("Opening client sessions"); - let config = config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); + let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); + + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); let s02 = ztimeout!(zenoh::open(config)).unwrap(); (s01, s02) } diff --git a/zenoh/tests/authentication.rs b/zenoh/tests/authentication.rs index 63ddfcc03c..b755bbec70 100644 --- a/zenoh/tests/authentication.rs +++ b/zenoh/tests/authentication.rs @@ -21,11 +21,8 @@ mod test { use once_cell::sync::Lazy; use tokio::runtime::Handle; - use zenoh::{ - config, - config::{EndPoint, WhatAmI}, - Config, Session, - }; + use zenoh::{config::WhatAmI, Config, Session}; + use zenoh_config::{EndPoint, ModeDependentValue}; use zenoh_core::{zlock, ztimeout}; const TIMEOUT: Duration = Duration::from_secs(60); @@ -270,7 +267,7 @@ client2name:client2passwd"; async fn get_basic_router_config_tls(port: u16, lowlatency: bool) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); - let mut config = config::default(); + let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -323,7 +320,7 @@ client2name:client2passwd"; } async fn get_basic_router_config_quic(port: u16) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); - let mut config = config::default(); + let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -369,7 +366,7 @@ client2name:client2passwd"; } async fn get_basic_router_config_usrpswd(port: u16) -> Config { - let mut config = config::default(); + let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -408,7 +405,7 @@ client2name:client2passwd"; async fn get_basic_router_config_quic_usrpswd(port: u16) -> Config { let cert_path = TESTFILES_PATH.to_string_lossy(); - let mut config = config::default(); + let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -474,9 +471,16 @@ client2name:client2passwd"; async fn get_client_sessions_tls(port: u16, lowlatency: bool) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); - let mut config = config::client([format!("tls/127.0.0.1:{}", port) + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tls/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -520,9 +524,16 @@ client2name:client2passwd"; .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client([format!("tls/127.0.0.1:{}", port) + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tls/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -571,9 +582,16 @@ client2name:client2passwd"; async fn get_client_sessions_quic(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); - let mut config = config::client([format!("quic/127.0.0.1:{}", port) + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "quic/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -609,9 +627,16 @@ client2name:client2passwd"; .set_root_ca_certificate(Some(format!("{}/ca.pem", cert_path))) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client([format!("quic/127.0.0.1:{}", port) + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "quic/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -652,8 +677,16 @@ client2name:client2passwd"; async fn get_client_sessions_usrpswd(port: u16) -> (Session, Session) { println!("Opening client sessions"); - let mut config = - config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -668,8 +701,16 @@ client2name:client2passwd"; ) .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = - config::client([format!("tcp/127.0.0.1:{port}").parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "tcp/127.0.0.1:{port}" + ) + .parse::() + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -690,9 +731,16 @@ client2name:client2passwd"; async fn get_client_sessions_quic_usrpswd(port: u16) -> (Session, Session) { let cert_path = TESTFILES_PATH.to_string_lossy(); println!("Opening client sessions"); - let mut config = config::client([format!("quic/127.0.0.1:{port}") + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "quic/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", @@ -735,9 +783,16 @@ client2name:client2passwd"; .unwrap(); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::client([format!("quic/127.0.0.1:{}", port) + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec![format!( + "quic/127.0.0.1:{port}" + ) .parse::() - .unwrap()]); + .unwrap()])) + .unwrap(); config .insert_json5( "transport", diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs index 6bb655851b..6f0d8a6e47 100644 --- a/zenoh/tests/connection_retry.rs +++ b/zenoh/tests/connection_retry.rs @@ -11,10 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh::{ - config::{ConnectionRetryConf, EndPoint, ModeDependent}, - Config, Wait, -}; +use zenoh::{Config, Wait}; +use zenoh_config::{ConnectionRetryConf, EndPoint, ModeDependent}; #[test] fn retry_config_overriding() { diff --git a/zenoh/tests/events.rs b/zenoh/tests/events.rs index e3a4d61656..f8d1801ee6 100644 --- a/zenoh/tests/events.rs +++ b/zenoh/tests/events.rs @@ -13,13 +13,13 @@ // use std::time::Duration; -use zenoh::{config, query::Reply, sample::SampleKind, Session}; +use zenoh::{query::Reply, sample::SampleKind, Session}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(10); async fn open_session(listen: &[&str], connect: &[&str]) -> Session { - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 57ba51d5ba..7559faf22d 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -21,11 +21,8 @@ use std::{ }, }; -use zenoh::{ - config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}, - key_expr::KeyExpr, - Config, Wait, -}; +use zenoh::{key_expr::KeyExpr, Config, Wait}; +use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; // Tokio's time granularity on different platforms #[cfg(target_os = "windows")] diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index ff0bb4ee99..3982e10ee3 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -19,9 +19,8 @@ use zenoh_core::ztimeout; async fn test_liveliness_subscriber_clique() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; - use zenoh_config::WhatAmI; - use zenoh_link::EndPoint; + use zenoh::{config::WhatAmI, sample::SampleKind}; + use zenoh_config::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); const PEER1_ENDPOINT: &str = "tcp/localhost:47447"; @@ -30,10 +29,10 @@ async fn test_liveliness_subscriber_clique() { zenoh_util::init_log_from_env_or("error"); let peer1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints - .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) + .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) .unwrap(); c.scouting.multicast.set_enabled(Some(false)).unwrap(); let _ = c.set_mode(Some(WhatAmI::Peer)); @@ -43,7 +42,7 @@ async fn test_liveliness_subscriber_clique() { }; let peer2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -83,7 +82,7 @@ async fn test_liveliness_subscriber_clique() { async fn test_liveliness_query_clique() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); @@ -94,7 +93,7 @@ async fn test_liveliness_query_clique() { zenoh_util::init_log_from_env_or("error"); let peer1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -107,7 +106,7 @@ async fn test_liveliness_query_clique() { }; let peer2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![PEER1_ENDPOINT.parse::().unwrap()]) @@ -140,7 +139,7 @@ async fn test_liveliness_query_clique() { async fn test_liveliness_subscriber_brokered() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -152,7 +151,7 @@ async fn test_liveliness_subscriber_brokered() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -165,7 +164,7 @@ async fn test_liveliness_subscriber_brokered() { }; let client1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -178,7 +177,7 @@ async fn test_liveliness_subscriber_brokered() { }; let client2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -219,7 +218,7 @@ async fn test_liveliness_subscriber_brokered() { async fn test_liveliness_query_brokered() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); @@ -230,7 +229,7 @@ async fn test_liveliness_query_brokered() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -243,7 +242,7 @@ async fn test_liveliness_query_brokered() { }; let client1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -256,7 +255,7 @@ async fn test_liveliness_query_brokered() { }; let client2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -290,7 +289,7 @@ async fn test_liveliness_query_brokered() { async fn test_liveliness_subscriber_local() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -299,7 +298,7 @@ async fn test_liveliness_subscriber_local() { zenoh_util::init_log_from_env_or("error"); let peer = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.scouting.multicast.set_enabled(Some(false)).unwrap(); let _ = c.set_mode(Some(WhatAmI::Peer)); let s = ztimeout!(zenoh::open(c)).unwrap(); @@ -333,8 +332,7 @@ async fn test_liveliness_subscriber_local() { async fn test_liveliness_query_local() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; - use zenoh_config::WhatAmI; + use zenoh::{config::WhatAmI, sample::SampleKind}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); const LIVELINESS_KEYEXPR: &str = "test/liveliness/query/local"; @@ -342,7 +340,7 @@ async fn test_liveliness_query_local() { zenoh_util::init_log_from_env_or("error"); let peer = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.scouting.multicast.set_enabled(Some(false)).unwrap(); let _ = c.set_mode(Some(WhatAmI::Peer)); let s = ztimeout!(zenoh::open(c)).unwrap(); diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index d7825e85b8..efa377863d 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -12,10 +12,12 @@ // ZettaScale Zenoh Team, // #![cfg(feature = "unstable")] -use std::{str::FromStr, time::Duration}; + +use std::time::Duration; use flume::RecvTimeoutError; -use zenoh::{config, config::Locator, sample::Locality, Result as ZResult, Session}; +use zenoh::{sample::Locality, Result as ZResult, Session}; +use zenoh_config::{ModeDependentValue, WhatAmI}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(60); @@ -23,7 +25,7 @@ const RECV_TIMEOUT: Duration = Duration::from_secs(1); async fn create_session_pair(locator: &str) -> (Session, Session) { let config1 = { - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); config .listen @@ -32,7 +34,12 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { .unwrap(); config }; - let config2 = config::client([Locator::from_str(locator).unwrap()]); + let mut config2 = zenoh::Config::default(); + config2.set_mode(Some(WhatAmI::Client)).unwrap(); + config2 + .connect + .set_endpoints(ModeDependentValue::Unique(vec![locator.parse().unwrap()])) + .unwrap(); let session1 = ztimeout!(zenoh::open(config1)).unwrap(); let session2 = ztimeout!(zenoh::open(config2)).unwrap(); @@ -95,8 +102,8 @@ async fn zenoh_matching_status_any() -> ZResult<()> { async fn zenoh_matching_status_remote() -> ZResult<()> { zenoh_util::init_log_from_env_or("error"); - let session1 = ztimeout!(zenoh::open(config::peer())).unwrap(); - let session2 = ztimeout!(zenoh::open(config::peer())).unwrap(); + let session1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_remote_test") @@ -150,8 +157,8 @@ async fn zenoh_matching_status_remote() -> ZResult<()> { async fn zenoh_matching_status_local() -> ZResult<()> { zenoh_util::init_log_from_env_or("error"); - let session1 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); - let session2 = ztimeout!(zenoh::open(zenoh::config::peer())).unwrap(); + let session1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_local_test") diff --git a/zenoh/tests/open_time.rs b/zenoh/tests/open_time.rs index 98714c39c5..876489ff37 100644 --- a/zenoh/tests/open_time.rs +++ b/zenoh/tests/open_time.rs @@ -37,7 +37,7 @@ async fn time_open( lowlatency: bool, ) { /* [ROUTER] */ - let mut router_config = Config::default(); + let mut router_config = zenoh::Config::default(); router_config.set_mode(Some(WhatAmI::Router)).unwrap(); router_config .listen @@ -67,7 +67,7 @@ async fn time_open( ); /* [APP] */ - let mut app_config = Config::default(); + let mut app_config = zenoh::Config::default(); app_config.set_mode(Some(connect_mode)).unwrap(); app_config .connect diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 2ba3226310..4995d05e70 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -24,8 +24,8 @@ const SLEEP: Duration = Duration::from_secs(1); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn qos_pubsub() { - let session1 = ztimeout!(zenoh::open(zenoh_config::peer())).unwrap(); - let session2 = ztimeout!(zenoh::open(zenoh_config::peer())).unwrap(); + let session1 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh::Config::default())).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("test/qos") diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index e9c8fd899f..c291ef4bf1 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -20,11 +20,8 @@ use std::{ }; use tokio_util::sync::CancellationToken; -use zenoh::{ - config::{ModeDependentValue, WhatAmI, WhatAmIMatcher}, - qos::CongestionControl, - Config, Result, Session, -}; +use zenoh::{config::WhatAmI, qos::CongestionControl, Config, Result, Session}; +use zenoh_config::{ModeDependentValue, WhatAmIMatcher}; use zenoh_core::ztimeout; use zenoh_result::bail; diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 04f3bde4f9..a13018683a 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -23,7 +23,7 @@ use std::{ use zenoh::internal::runtime::{Runtime, RuntimeBuilder}; #[cfg(feature = "unstable")] use zenoh::pubsub::Reliability; -use zenoh::{config, key_expr::KeyExpr, qos::CongestionControl, sample::SampleKind, Session}; +use zenoh::{key_expr::KeyExpr, qos::CongestionControl, sample::SampleKind, Session}; use zenoh_core::ztimeout; #[cfg(not(feature = "unstable"))] use zenoh_protocol::core::Reliability; @@ -36,7 +36,7 @@ const MSG_SIZE: [usize; 2] = [1_024, 100_000]; async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -51,7 +51,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { println!("[ ][01a] Opening peer01 session: {:?}", endpoints); let peer01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .connect .endpoints @@ -71,7 +71,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, Session) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -81,7 +81,7 @@ async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, println!("[ ][01a] Opening peer01 session: {}", endpoint01); let peer01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -286,7 +286,7 @@ async fn zenoh_session_multicast() { #[cfg(feature = "internal")] async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -302,7 +302,7 @@ async fn open_session_unicast_runtime(endpoints: &[&str]) -> (Runtime, Runtime) let mut r1 = RuntimeBuilder::new(config).build().await.unwrap(); r1.start().await.unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .connect .endpoints diff --git a/zenoh/tests/shm.rs b/zenoh/tests/shm.rs index 8e1205f711..ace7a0ce49 100644 --- a/zenoh/tests/shm.rs +++ b/zenoh/tests/shm.rs @@ -21,7 +21,6 @@ use std::{ }; use zenoh::{ - config, pubsub::Reliability, qos::CongestionControl, shm::{ @@ -40,7 +39,7 @@ const MSG_SIZE: [usize; 2] = [1_024, 100_000]; async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -55,7 +54,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { println!("[ ][01a] Opening peer01 session: {:?}", endpoints); let peer01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .connect .endpoints @@ -75,7 +74,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, Session) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -85,7 +84,7 @@ async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, println!("[ ][01a] Opening peer01 session: {}", endpoint01); let peer01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index de63b0a97e..efa4d57a24 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -20,13 +20,8 @@ use std::{ }; use tokio::runtime::Handle; -use zenoh::{ - config, - config::{EndPoint, WhatAmI}, - key_expr::KeyExpr, - qos::CongestionControl, - Session, -}; +use zenoh::{config::WhatAmI, key_expr::KeyExpr, qos::CongestionControl, Session}; +use zenoh_config::{EndPoint, ModeDependentValue}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(60); @@ -36,7 +31,7 @@ const MSG_SIZE: [usize; 2] = [1_024, 100_000]; async fn open_p2p_sessions() -> (Session, Session, Session) { // Open the sessions - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -46,7 +41,7 @@ async fn open_p2p_sessions() -> (Session, Session, Session) { println!("[ ][01a] Opening s01 session"); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .listen .endpoints @@ -61,7 +56,7 @@ async fn open_p2p_sessions() -> (Session, Session, Session) { println!("[ ][02a] Opening s02 session"); let s02 = ztimeout!(zenoh::open(config)).unwrap(); - let mut config = config::peer(); + let mut config = zenoh::Config::default(); config .connect .endpoints @@ -79,7 +74,7 @@ async fn open_p2p_sessions() -> (Session, Session, Session) { async fn open_router_session() -> Session { // Open the sessions - let mut config = config::default(); + let mut config = zenoh::Config::default(); config.set_mode(Some(WhatAmI::Router)).unwrap(); config .listen @@ -98,15 +93,36 @@ async fn close_router_session(s: Session) { async fn open_client_sessions() -> (Session, Session, Session) { // Open the sessions - let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .parse::() + .unwrap()])) + .unwrap(); println!("[ ][01a] Opening s01 session"); let s01 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .parse::() + .unwrap()])) + .unwrap(); println!("[ ][02a] Opening s02 session"); let s02 = ztimeout!(zenoh::open(config)).unwrap(); - let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); + let mut config = zenoh::Config::default(); + config.set_mode(Some(WhatAmI::Client)).unwrap(); + config + .connect + .set_endpoints(ModeDependentValue::Unique(vec!["tcp/127.0.0.1:37447" + .parse::() + .unwrap()])) + .unwrap(); println!("[ ][03a] Opening s03 session"); let s03 = ztimeout!(zenoh::open(config)).unwrap(); diff --git a/zenohd/Cargo.toml b/zenohd/Cargo.toml index b0320ce648..9347c7f8ee 100644 --- a/zenohd/Cargo.toml +++ b/zenohd/Cargo.toml @@ -43,6 +43,7 @@ tracing-subscriber = {workspace = true} tracing-loki = {workspace = true, optional = true } url = {workspace = true, optional = true } zenoh = { workspace = true, features = ["unstable", "internal", "plugins"] } +zenoh-config = { workspace = true } [dev-dependencies] rand = { workspace = true, features = ["default"] } diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 85a063e3a2..e618b6f3db 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -17,10 +17,8 @@ use git_version::git_version; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; #[cfg(feature = "loki")] use url::Url; -use zenoh::{ - config::{Config, EndPoint, ModeDependentValue, PermissionsConf, WhatAmI}, - Result, -}; +use zenoh::{config::WhatAmI, Config, Result}; +use zenoh_config::{EndPoint, ModeDependentValue, PermissionsConf}; use zenoh_util::LibSearchDirs; #[cfg(feature = "loki")]