From 7afbc0d144a92fc0df983941de3537d157a68817 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Sun, 31 Mar 2024 17:33:01 +0200 Subject: [PATCH] unfinished --- Cargo.lock | 6 - zenoh-ext/Cargo.toml | 20 ++- zenoh/src/api.rs | 2 +- zenoh/src/api/publication.rs | 3 +- zenoh/src/api/query.rs | 3 +- zenoh/src/api/queryable.rs | 3 +- zenoh/src/api/sample.rs | 231 +---------------------------- zenoh/src/api/sample/attachment.rs | 217 +++++++++++++++++++++++++++ zenoh/src/api/session.rs | 10 +- zenoh/src/lib.rs | 50 ++++--- zenoh/src/prelude.rs | 45 ++++-- zenoh/tests/attachments.rs | 16 +- zenoh/tests/connection_retry.rs | 19 ++- zenoh/tests/events.rs | 12 +- zenoh/tests/formatters.rs | 11 +- zenoh/tests/handler.rs | 13 +- zenoh/tests/interceptors.rs | 13 +- zenoh/tests/liveliness.rs | 11 +- zenoh/tests/matching.rs | 28 ++-- zenoh/tests/qos.rs | 11 +- zenoh/tests/routing.rs | 20 ++- zenoh/tests/session.rs | 15 +- zenoh/tests/unicity.rs | 22 +-- 23 files changed, 422 insertions(+), 359 deletions(-) create mode 100644 zenoh/src/api/sample/attachment.rs diff --git a/Cargo.lock b/Cargo.lock index d3ea8978b5..3cbed04284 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4623,12 +4623,6 @@ dependencies = [ "serde_json", "tokio", "zenoh", - "zenoh-core", - "zenoh-macros", - "zenoh-result", - "zenoh-runtime", - "zenoh-sync", - "zenoh-util", ] [[package]] diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 65f1d47af1..5c6ab85c5c 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -31,7 +31,13 @@ unstable = [] default = [] [dependencies] -tokio = { workspace = true, features = ["rt", "sync", "time", "macros", "io-std"] } +tokio = { workspace = true, features = [ + "rt", + "sync", + "time", + "macros", + "io-std", +] } bincode = { workspace = true } env_logger = { workspace = true } flume = { workspace = true } @@ -42,12 +48,12 @@ serde = { workspace = true, features = ["default"] } serde_cbor = { workspace = true } serde_json = { workspace = true } zenoh = { workspace = true, features = ["unstable"], default-features = false } -zenoh-core = { workspace = true } -zenoh-macros = { workspace = true } -zenoh-result = { workspace = true } -zenoh-sync = { workspace = true } -zenoh-util = { workspace = true } -zenoh-runtime = { workspace = true } +# zenoh-core = { workspace = true } +# zenoh-macros = { workspace = true } +# zenoh-result = { workspace = true } +# zenoh-sync = { workspace = true } +# zenoh-util = { workspace = true } +# zenoh-runtime = { workspace = true } [dev-dependencies] clap = { workspace = true, features = ["derive"] } diff --git a/zenoh/src/api.rs b/zenoh/src/api.rs index 9219db364a..2585a682da 100644 --- a/zenoh/src/api.rs +++ b/zenoh/src/api.rs @@ -22,7 +22,7 @@ pub mod handlers; mod info; pub mod key_expr; #[cfg(feature = "unstable")] -mod liveliness; +pub mod liveliness; pub mod payload; pub mod publication; pub mod query; diff --git a/zenoh/src/api/publication.rs b/zenoh/src/api/publication.rs index 0fcdec8a55..7cbad9af84 100644 --- a/zenoh/src/api/publication.rs +++ b/zenoh/src/api/publication.rs @@ -19,8 +19,9 @@ use crate::api::{ key_expr::{KeyExpr, KeyExprInner}, payload::Payload, sample::{ + attachment::Attachment, builder::{QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait}, - Attachment, DataInfo, Locality, QoS, Sample, SampleFields, SampleKind, SourceInfo, + DataInfo, Locality, QoS, Sample, SampleFields, SampleKind, SourceInfo, }, session::{SessionRef, Undeclarable}, value::Value, diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index c8908ba490..9359750e76 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -20,8 +20,9 @@ use crate::api::{ payload::Payload, publication::Priority, sample::{ + attachment::Attachment, builder::{QoSBuilderTrait, SampleBuilderTrait, ValueBuilderTrait}, - Attachment, Locality, QoSBuilder, Sample, SourceInfo, + Locality, QoSBuilder, Sample, SourceInfo, }, selector::Selector, session::Session, diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index dcd5fa667b..557f79e4f7 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -21,11 +21,12 @@ use crate::api::{ publication::Priority, query::ReplyKeyExpr, sample::{ + attachment::Attachment, builder::{ DeleteSampleBuilder, PutSampleBuilder, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, }, - Attachment, Locality, Sample, SampleKind, SourceInfo, + Locality, Sample, SampleKind, SourceInfo, }, selector::{Parameters, Selector}, session::{SessionRef, Undeclarable}, diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 6c343745a9..dc76beea65 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -29,6 +29,9 @@ use zenoh_protocol::core::EntityGlobalId; use zenoh_protocol::network::declare::ext::QoSType; use zenoh_protocol::{core::CongestionControl, zenoh}; +use self::attachment::Attachment; + +pub mod attachment; pub mod builder; pub type SourceSn = u64; @@ -205,231 +208,6 @@ impl From> for SourceInfo { } } -mod attachment { - #[zenoh_macros::unstable] - use zenoh_buffers::{ - reader::{HasReader, Reader}, - writer::HasWriter, - ZBuf, ZBufReader, ZSlice, - }; - #[zenoh_macros::unstable] - use zenoh_codec::{RCodec, WCodec, Zenoh080}; - #[zenoh_macros::unstable] - use zenoh_protocol::zenoh::ext::AttachmentType; - - /// A builder for [`Attachment`] - #[zenoh_macros::unstable] - #[derive(Debug)] - pub struct AttachmentBuilder { - pub(crate) inner: Vec, - } - #[zenoh_macros::unstable] - impl Default for AttachmentBuilder { - fn default() -> Self { - Self::new() - } - } - #[zenoh_macros::unstable] - impl AttachmentBuilder { - pub fn new() -> Self { - Self { inner: Vec::new() } - } - fn _insert(&mut self, key: &[u8], value: &[u8]) { - let codec = Zenoh080; - let mut writer = self.inner.writer(); - codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure - codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure - } - /// Inserts a key-value pair to the attachment. - /// - /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. - pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( - &mut self, - key: &Key, - value: &Value, - ) { - self._insert(key.as_ref(), value.as_ref()) - } - pub fn build(self) -> Attachment { - Attachment { - inner: self.inner.into(), - } - } - } - #[zenoh_macros::unstable] - impl From for Attachment { - fn from(value: AttachmentBuilder) -> Self { - Attachment { - inner: value.inner.into(), - } - } - } - #[zenoh_macros::unstable] - impl From for Option { - fn from(value: AttachmentBuilder) -> Self { - if value.inner.is_empty() { - None - } else { - Some(value.into()) - } - } - } - - #[zenoh_macros::unstable] - #[derive(Clone)] - pub struct Attachment { - pub(crate) inner: ZBuf, - } - #[zenoh_macros::unstable] - impl Default for Attachment { - fn default() -> Self { - Self::new() - } - } - #[zenoh_macros::unstable] - impl From for AttachmentType { - fn from(this: Attachment) -> Self { - AttachmentType { buffer: this.inner } - } - } - #[zenoh_macros::unstable] - impl From> for Attachment { - fn from(this: AttachmentType) -> Self { - Attachment { inner: this.buffer } - } - } - #[zenoh_macros::unstable] - impl Attachment { - pub fn new() -> Self { - Self { - inner: ZBuf::empty(), - } - } - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - pub fn len(&self) -> usize { - self.iter().count() - } - pub fn iter(&self) -> AttachmentIterator { - self.into_iter() - } - fn _get(&self, key: &[u8]) -> Option { - self.iter() - .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) - } - pub fn get>(&self, key: &Key) -> Option { - self._get(key.as_ref()) - } - fn _insert(&mut self, key: &[u8], value: &[u8]) { - let codec = Zenoh080; - let mut writer = self.inner.writer(); - codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure - codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure - } - /// Inserts a key-value pair to the attachment. - /// - /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. - /// - /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] - pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( - &mut self, - key: &Key, - value: &Value, - ) { - self._insert(key.as_ref(), value.as_ref()) - } - fn _extend(&mut self, with: Self) -> &mut Self { - for slice in with.inner.zslices().cloned() { - self.inner.push_zslice(slice); - } - self - } - pub fn extend(&mut self, with: impl Into) -> &mut Self { - let with = with.into(); - self._extend(with) - } - } - #[zenoh_macros::unstable] - pub struct AttachmentIterator<'a> { - reader: ZBufReader<'a>, - } - #[zenoh_macros::unstable] - impl<'a> core::iter::IntoIterator for &'a Attachment { - type Item = (ZSlice, ZSlice); - type IntoIter = AttachmentIterator<'a>; - fn into_iter(self) -> Self::IntoIter { - AttachmentIterator { - reader: self.inner.reader(), - } - } - } - #[zenoh_macros::unstable] - impl core::fmt::Debug for Attachment { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{{")?; - for (key, value) in self { - let key = key.as_slice(); - let value = value.as_slice(); - match core::str::from_utf8(key) { - Ok(key) => write!(f, "\"{key}\": ")?, - Err(_) => { - write!(f, "0x")?; - for byte in key { - write!(f, "{byte:02X}")? - } - } - } - match core::str::from_utf8(value) { - Ok(value) => write!(f, "\"{value}\", ")?, - Err(_) => { - write!(f, "0x")?; - for byte in value { - write!(f, "{byte:02X}")? - } - write!(f, ", ")? - } - } - } - write!(f, "}}") - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::Iterator for AttachmentIterator<'a> { - type Item = (ZSlice, ZSlice); - fn next(&mut self) -> Option { - let key = Zenoh080.read(&mut self.reader).ok()?; - let value = Zenoh080.read(&mut self.reader).ok()?; - Some((key, value)) - } - fn size_hint(&self) -> (usize, Option) { - ( - (self.reader.remaining() != 0) as usize, - Some(self.reader.remaining() / 2), - ) - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { - fn from_iter>(iter: T) -> Self { - let codec = Zenoh080; - let mut buffer: Vec = Vec::new(); - let mut writer = buffer.writer(); - for (key, value) in iter { - codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures - codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures - } - Self { inner: buffer } - } - } - #[zenoh_macros::unstable] - impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { - fn from_iter>(iter: T) -> Self { - AttachmentBuilder::from_iter(iter).into() - } - } -} - /// The kind of a `Sample`. #[repr(u8)] #[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] @@ -461,9 +239,6 @@ impl TryFrom for SampleKind { } } -#[zenoh_macros::unstable] -pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; - /// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields. pub struct SampleFields { pub key_expr: KeyExpr<'static>, diff --git a/zenoh/src/api/sample/attachment.rs b/zenoh/src/api/sample/attachment.rs new file mode 100644 index 0000000000..8eefdf10d7 --- /dev/null +++ b/zenoh/src/api/sample/attachment.rs @@ -0,0 +1,217 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh_buffers::{ + reader::{HasReader, Reader}, + writer::HasWriter, + ZBuf, ZBufReader, ZSlice, +}; +use zenoh_codec::{RCodec, WCodec, Zenoh080}; +use zenoh_protocol::zenoh::ext::AttachmentType; + +/// A builder for [`Attachment`] +#[derive(Debug)] +pub struct AttachmentBuilder { + pub(crate) inner: Vec, +} +impl Default for AttachmentBuilder { + fn default() -> Self { + Self::new() + } +} +impl AttachmentBuilder { + pub fn new() -> Self { + Self { inner: Vec::new() } + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + pub fn build(self) -> Attachment { + Attachment { + inner: self.inner.into(), + } + } +} +impl From for Attachment { + fn from(value: AttachmentBuilder) -> Self { + Attachment { + inner: value.inner.into(), + } + } +} +impl From for Option { + fn from(value: AttachmentBuilder) -> Self { + if value.inner.is_empty() { + None + } else { + Some(value.into()) + } + } +} + +#[derive(Clone)] +pub struct Attachment { + pub(crate) inner: ZBuf, +} +impl Default for Attachment { + fn default() -> Self { + Self::new() + } +} +impl From for AttachmentType { + fn from(this: Attachment) -> Self { + AttachmentType { buffer: this.inner } + } +} +impl From> for Attachment { + fn from(this: AttachmentType) -> Self { + Attachment { inner: this.buffer } + } +} +impl Attachment { + pub fn new() -> Self { + Self { + inner: ZBuf::empty(), + } + } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn len(&self) -> usize { + self.iter().count() + } + pub fn iter(&self) -> AttachmentIterator { + self.into_iter() + } + fn _get(&self, key: &[u8]) -> Option { + self.iter() + .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) + } + pub fn get>(&self, key: &Key) -> Option { + self._get(key.as_ref()) + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + /// + /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + fn _extend(&mut self, with: Self) -> &mut Self { + for slice in with.inner.zslices().cloned() { + self.inner.push_zslice(slice); + } + self + } + pub fn extend(&mut self, with: impl Into) -> &mut Self { + let with = with.into(); + self._extend(with) + } +} +pub struct AttachmentIterator<'a> { + reader: ZBufReader<'a>, +} +impl<'a> core::iter::IntoIterator for &'a Attachment { + type Item = (ZSlice, ZSlice); + type IntoIter = AttachmentIterator<'a>; + fn into_iter(self) -> Self::IntoIter { + AttachmentIterator { + reader: self.inner.reader(), + } + } +} +impl core::fmt::Debug for Attachment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + for (key, value) in self { + let key = key.as_slice(); + let value = value.as_slice(); + match core::str::from_utf8(key) { + Ok(key) => write!(f, "\"{key}\": ")?, + Err(_) => { + write!(f, "0x")?; + for byte in key { + write!(f, "{byte:02X}")? + } + } + } + match core::str::from_utf8(value) { + Ok(value) => write!(f, "\"{value}\", ")?, + Err(_) => { + write!(f, "0x")?; + for byte in value { + write!(f, "{byte:02X}")? + } + write!(f, ", ")? + } + } + } + write!(f, "}}") + } +} +impl<'a> core::iter::Iterator for AttachmentIterator<'a> { + type Item = (ZSlice, ZSlice); + fn next(&mut self) -> Option { + let key = Zenoh080.read(&mut self.reader).ok()?; + let value = Zenoh080.read(&mut self.reader).ok()?; + Some((key, value)) + } + fn size_hint(&self) -> (usize, Option) { + ( + (self.reader.remaining() != 0) as usize, + Some(self.reader.remaining() / 2), + ) + } +} +impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { + fn from_iter>(iter: T) -> Self { + let codec = Zenoh080; + let mut buffer: Vec = Vec::new(); + let mut writer = buffer.writer(); + for (key, value) in iter { + codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures + codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures + } + Self { inner: buffer } + } +} +impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { + fn from_iter>(iter: T) -> Self { + AttachmentBuilder::from_iter(iter).into() + } +} diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 8549dfba9d..0022a3078e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -11,21 +11,21 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::api::liveliness::PREFIX_LIVELINESS; use crate::api::{ admin, encoding::Encoding, handlers::{Callback, DefaultHandler}, info::*, key_expr::{KeyExpr, KeyExprInner}, - liveliness::{Liveliness, LivelinessTokenState}, + liveliness::{Liveliness, LivelinessTokenState, PREFIX_LIVELINESS}, payload::Payload, publication::*, query::*, queryable::*, - sample::DataInfoIntoSample, - sample::{Attachment, DataInfo}, - sample::{Locality, QoS, Sample, SampleKind, SourceInfo}, + sample::{ + attachment::Attachment, DataInfo, DataInfoIntoSample, Locality, QoS, Sample, SampleKind, + SourceInfo, + }, selector::{Parameters, Selector, TIME_RANGE_KEY}, subscriber::*, value::Value, diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 94195b0c4f..6661c67721 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -105,40 +105,53 @@ pub const FEATURES: &str = concat_enabled_features!( "default" ] ); - -// Reexport prelude::common into root of "zenoh::" -pub mod prelude; -pub use prelude::common::*; - mod api; mod net; mod plugins; -pub use zenoh_result::ZResult as Result; +pub mod prelude; +// Reexport useful types from external zenoh crates into root namespace +// pub use prelude::common::*; + +// +// Explicitly define zenoh API split to logical modules +// + +pub mod core { + pub use zenoh_core::{zlock, ztimeout, AsyncResolve, Result, SyncResolve}; + pub use zenoh_result::bail; +} pub mod session { - pub use crate::api::session::open; - pub use crate::api::session::Session; - pub use crate::api::session::SessionDeclarations; + pub use crate::api::session::{open, Session, SessionDeclarations}; } pub mod key_expr { - pub use zenoh_keyexpr::*; - pub use zenoh_macros::kedefine; - pub use zenoh_macros::keformat; + pub use crate::api::key_expr::KeyExpr; + pub use zenoh_keyexpr::key_expr; + pub use zenoh_keyexpr::OwnedKeyExpr; + pub use zenoh_macros::{kedefine, keformat}; } pub mod sample { - pub use crate::api::sample::builder::{ - QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, + pub use crate::api::{ + sample::{ + builder::{ + QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, + }, + Sample, SampleKind, + }, + value::Value, }; + #[zenoh_macros::unstable] - pub use crate::api::sample::Attachment; + pub use crate::api::sample::attachment::Attachment; #[zenoh_macros::unstable] pub use crate::api::sample::Locality; #[zenoh_macros::unstable] pub use crate::api::sample::SourceInfo; - pub use crate::api::sample::{Sample, SampleKind}; + + pub use zenoh_protocol::core::CongestionControl; } pub mod queryable { @@ -159,5 +172,8 @@ pub mod handlers { } pub mod config { - pub use zenoh_config::*; + pub use zenoh_config::{ + client, default, peer, Config, ConnectionRetryConf, EndPoint, Locator, ModeDependentValue, + ValidatedMap, WhatAmI, WhatAmIMatcher, + }; } diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 3cccfac9e1..3c5b5a9bf4 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -18,12 +18,41 @@ //! almost always want to import its entire contents, but unlike the standard //! library's prelude you'll have to do so manually. An example of using this is: //! +//! There are 2 variants of prelude: sync and async. The only difference is that the async prelude +//! reexports the [`AsyncResolve`] trait and the sync prelude reexports the [`SyncResolve`] trait. +//! Both traits provides method `res()` for finalizing the operation, whcih is synchronous in the +//! sync API and asynchronous in the async API. +//! +//! If both sync and async preludes are imported, the [`res_sync()`] and [`res_async()`] methods +//! should be used explicitly. +//! +//! # Examples +//! //! ``` //! use zenoh::prelude::r#async::*; //! ``` +//! +//! ``` +//! use zenoh::prelude::sync::*; +//! ``` + +/// Prelude to import when using Zenoh's sync API. +pub mod sync { + pub use super::common::*; + pub use zenoh_core::SyncResolve; +} +/// Prelude to import when using Zenoh's async API. +pub mod r#async { + pub use super::common::*; + pub use zenoh_core::AsyncResolve; +} pub(crate) mod common { - pub use crate::config::{Config, ValidatedMap}; + pub use zenoh_config::Config; + pub use zenoh_config::Locator; + pub use zenoh_config::ValidatedMap; + pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI}; + pub use zenoh_result::ZResult as Result; #[zenoh_macros::unstable] pub use crate::sample::Attachment; @@ -38,7 +67,7 @@ pub(crate) mod common { pub use crate::publication::Priority; - pub use crate::key_expr::kedefine; + pub use crate::key_expr::{kedefine, keformat}; // /// A zenoh error. // // pub use zenoh_result::Error; @@ -83,7 +112,6 @@ pub(crate) mod common { // pub use crate::publication::Priority; // #[zenoh_macros::unstable] // pub use crate::publication::PublisherDeclarations; - // pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI}; // pub use crate::sample::builder::{ // QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait, @@ -94,14 +122,3 @@ pub(crate) mod common { // pub use scouting::scout; // pub use session::open; } - -/// Prelude to import when using Zenoh's sync API. -pub mod sync { - pub use super::common::*; - pub use zenoh_core::SyncResolve; -} -/// Prelude to import when using Zenoh's async API. -pub mod r#async { - pub use super::common::*; - pub use zenoh_core::AsyncResolve; -} diff --git a/zenoh/tests/attachments.rs b/zenoh/tests/attachments.rs index f37ed63688..957199010b 100644 --- a/zenoh/tests/attachments.rs +++ b/zenoh/tests/attachments.rs @@ -11,12 +11,18 @@ // Contributors: // ZettaScale Zenoh Team, // +use zenoh::config::Config; +use zenoh::core::SyncResolve; +use zenoh::sample::Attachment; +use zenoh::sample::SampleBuilderTrait; +use zenoh::sample::ValueBuilderTrait; +use zenoh::session::open; +use zenoh::session::SessionDeclarations; + #[cfg(feature = "unstable")] #[test] fn pubsub() { - use zenoh::prelude::sync::*; - - let zenoh = zenoh::open(Config::default()).res().unwrap(); + let zenoh = open(Config::default()).res().unwrap(); let _sub = zenoh .declare_subscriber("test/attachment") .callback(|sample| { @@ -61,9 +67,7 @@ fn pubsub() { #[cfg(feature = "unstable")] #[test] fn queries() { - use zenoh::prelude::sync::*; - - let zenoh = zenoh::open(Config::default()).res().unwrap(); + let zenoh = open(Config::default()).res().unwrap(); let _sub = zenoh .declare_queryable("test/attachment") .callback(|query| { diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs index 72b156c4df..435258f035 100644 --- a/zenoh/tests/connection_retry.rs +++ b/zenoh/tests/connection_retry.rs @@ -1,6 +1,9 @@ +use zenoh::config::Config; use zenoh::config::ConnectionRetryConf; -use zenoh::prelude::sync::*; -use zenoh_link::EndPoint; +use zenoh::config::EndPoint; +use zenoh::config::ValidatedMap; +use zenoh::core::SyncResolve; +use zenoh::session::open; #[test] fn retry_config_overriding() { @@ -22,7 +25,7 @@ fn retry_config_overriding() { .insert_json5( "listen/retry", r#" - { + { try_timeout_ms: 2000, period_init_ms: 3000, period_max_ms: 6000, @@ -72,7 +75,7 @@ fn retry_config_parsing() { .insert_json5( "listen/retry", r#" - { + { period_init_ms: 1000, period_max_ms: 6000, period_increase_factor: 2, @@ -100,7 +103,7 @@ fn retry_config_const_period() { .insert_json5( "listen/retry", r#" - { + { period_init_ms: 1000, period_increase_factor: 1, } @@ -127,7 +130,7 @@ fn retry_config_infinit_period() { .insert_json5( "listen/retry", r#" - { + { period_init_ms: -1, period_increase_factor: 1, } @@ -153,7 +156,7 @@ fn listen_no_retry() { .unwrap(); config.insert_json5("listen/timeout_ms", "0").unwrap(); - zenoh::open(config).res().unwrap(); + open(config).res().unwrap(); } #[test] @@ -166,5 +169,5 @@ fn listen_with_retry() { config.insert_json5("listen/timeout_ms", "1000").unwrap(); - zenoh::open(config).res().unwrap(); + open(config).res().unwrap(); } diff --git a/zenoh/tests/events.rs b/zenoh/tests/events.rs index f999800451..c8030a5ede 100644 --- a/zenoh/tests/events.rs +++ b/zenoh/tests/events.rs @@ -12,8 +12,14 @@ // ZettaScale Zenoh Team, // use std::time::Duration; -use zenoh::{config, prelude::r#async::*}; -use zenoh_core::ztimeout; + +use zenoh::config; +use zenoh::core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::query::Reply; +use zenoh::sample::SampleKind; +use zenoh::session::SessionDeclarations; +use zenoh::session::{open, Session}; const TIMEOUT: Duration = Duration::from_secs(10); @@ -29,7 +35,7 @@ async fn open_session(listen: &[&str], connect: &[&str]) -> Session { .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Opening session"); - ztimeout!(zenoh::open(config).res_async()).unwrap() + ztimeout!(open(config).res_async()).unwrap() } async fn close_session(session: Session) { diff --git a/zenoh/tests/formatters.rs b/zenoh/tests/formatters.rs index a70bee0a8d..d0ea81fc3e 100644 --- a/zenoh/tests/formatters.rs +++ b/zenoh/tests/formatters.rs @@ -11,14 +11,15 @@ // Contributors: // ZettaScale Zenoh Team, // +use zenoh::key_expr::{kedefine, keformat}; + #[test] fn reuse() { - use zenoh::prelude::r#async::*; - zenoh::kedefine!( + kedefine!( pub gkeys: "zenoh/${group:*}/${member:*}", ); let mut formatter = gkeys::formatter(); - let k1 = zenoh::keformat!(formatter, group = "foo", member = "bar").unwrap(); + let k1 = keformat!(formatter, group = "foo", member = "bar").unwrap(); assert_eq!(dbg!(k1).as_str(), "zenoh/foo/bar"); formatter.set("member", "*").unwrap(); @@ -30,8 +31,8 @@ fn reuse() { let k2 = dbg!(&mut formatter).build().unwrap(); assert_eq!(dbg!(k2).as_str(), "zenoh/foo/*"); - let k3 = zenoh::keformat!(formatter, group = "foo", member = "*").unwrap(); + let k3 = keformat!(formatter, group = "foo", member = "*").unwrap(); assert_eq!(dbg!(k3).as_str(), "zenoh/foo/*"); - zenoh::keformat!(formatter, group = "**", member = "**").unwrap_err(); + keformat!(formatter, group = "**", member = "**").unwrap_err(); } diff --git a/zenoh/tests/handler.rs b/zenoh/tests/handler.rs index ceed15e2c3..00309c4120 100644 --- a/zenoh/tests/handler.rs +++ b/zenoh/tests/handler.rs @@ -1,3 +1,9 @@ +use zenoh::config::Config; +use zenoh::core::SyncResolve; +use zenoh::handlers::RingBuffer; +use zenoh::sample::ValueBuilderTrait; +use zenoh::session::{open, SessionDeclarations}; + // // Copyright (c) 2024 ZettaScale Technology // @@ -14,9 +20,8 @@ #[test] fn pubsub_with_ringbuffer() { use std::{thread, time::Duration}; - use zenoh::{handlers::RingBuffer, prelude::sync::*}; - let zenoh = zenoh::open(Config::default()).res().unwrap(); + let zenoh = open(Config::default()).res().unwrap(); let sub = zenoh .declare_subscriber("test/ringbuffer") .with(RingBuffer::new(3)) @@ -46,9 +51,7 @@ fn pubsub_with_ringbuffer() { #[test] fn query_with_ringbuffer() { - use zenoh::{handlers::RingBuffer, prelude::sync::*}; - - let zenoh = zenoh::open(Config::default()).res().unwrap(); + let zenoh = open(Config::default()).res().unwrap(); let queryable = zenoh .declare_queryable("test/ringbuffer_query") .with(RingBuffer::new(1)) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 1ee93e4949..68e59bc032 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -12,7 +12,8 @@ // ZettaScale Zenoh Team, // use std::sync::{Arc, Mutex}; -use zenoh_core::zlock; +use zenoh::core::zlock; +use zenoh::session::open; struct IntervalCounter { first_tick: bool, @@ -90,7 +91,7 @@ fn downsampling_by_keyexpr_impl(egress: bool) { .multicast .set_enabled(Some(false)) .unwrap(); - let zenoh_sub = zenoh::open(config_sub).res().unwrap(); + let zenoh_sub = open(config_sub).res().unwrap(); let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); let counter_r100_clone = counter_r100.clone(); @@ -127,7 +128,7 @@ fn downsampling_by_keyexpr_impl(egress: bool) { .multicast .set_enabled(Some(false)) .unwrap(); - let zenoh_pub = zenoh::open(config_pub).res().unwrap(); + let zenoh_pub = open(config_pub).res().unwrap(); let publisher_r100 = zenoh_pub .declare_publisher("test/downsamples_by_keyexp/r100") .res() @@ -208,7 +209,7 @@ fn downsampling_by_interface_impl(egress: bool) { if !egress { config_sub.insert_json5("downsampling", &ds_cfg).unwrap(); }; - let zenoh_sub = zenoh::open(config_sub).res().unwrap(); + let zenoh_sub = open(config_sub).res().unwrap(); let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); let counter_r100_clone = counter_r100.clone(); @@ -236,7 +237,7 @@ fn downsampling_by_interface_impl(egress: bool) { if egress { config_pub.insert_json5("downsampling", &ds_cfg).unwrap(); } - let zenoh_pub = zenoh::open(config_pub).res().unwrap(); + let zenoh_pub = open(config_pub).res().unwrap(); let publisher_r100 = zenoh_pub .declare_publisher("test/downsamples_by_interface/r100") .res() @@ -300,5 +301,5 @@ fn downsampling_config_error_wrong_strategy() { ) .unwrap(); - zenoh::open(config).res().unwrap(); + open(config).res().unwrap(); } diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index 694b9ba211..f8f832f133 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -13,8 +13,11 @@ // use std::time::Duration; use zenoh::config; -use zenoh::prelude::r#async::*; -use zenoh_core::ztimeout; +use zenoh::core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::sample::SampleKind; +use zenoh::session::open; +use zenoh::session::SessionDeclarations; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -27,13 +30,13 @@ async fn zenoh_liveliness() { .set_endpoints(vec!["tcp/localhost:47447".parse().unwrap()]) .unwrap(); c1.scouting.multicast.set_enabled(Some(false)).unwrap(); - let session1 = ztimeout!(zenoh::open(c1).res_async()).unwrap(); + let session1 = ztimeout!(open(c1).res_async()).unwrap(); let mut c2 = config::peer(); c2.connect .set_endpoints(vec!["tcp/localhost:47447".parse().unwrap()]) .unwrap(); c2.scouting.multicast.set_enabled(Some(false)).unwrap(); - let session2 = ztimeout!(zenoh::open(c2).res_async()).unwrap(); + let session2 = ztimeout!(open(c2).res_async()).unwrap(); let sub = ztimeout!(session2 .liveliness() diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs index 2c9e22c7b7..20e6860736 100644 --- a/zenoh/tests/matching.rs +++ b/zenoh/tests/matching.rs @@ -11,11 +11,17 @@ // Contributors: // ZettaScale Zenoh Team, // +use flume::RecvTimeoutError; use std::str::FromStr; use std::time::Duration; -use zenoh::prelude::r#async::*; -use zenoh_core::ztimeout; -use zenoh_link::Locator; +use zenoh::config::Locator; +use zenoh::core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::core::Result; +use zenoh::sample::Locality; +use zenoh::session::open; +use zenoh::session::Session; +use zenoh::session::SessionDeclarations; const TIMEOUT: Duration = Duration::from_secs(60); const RECV_TIMEOUT: Duration = Duration::from_secs(1); @@ -33,16 +39,14 @@ async fn create_session_pair(locator: &str) -> (Session, Session) { }; let config2 = zenoh::config::client([Locator::from_str(locator).unwrap()]); - let session1 = ztimeout!(zenoh::open(config1).res_async()).unwrap(); - let session2 = ztimeout!(zenoh::open(config2).res_async()).unwrap(); + let session1 = ztimeout!(open(config1).res_async()).unwrap(); + let session2 = ztimeout!(open(config2).res_async()).unwrap(); (session1, session2) } #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_any() -> Result<()> { - use flume::RecvTimeoutError; - let (session1, session2) = create_session_pair("tcp/127.0.0.1:18001").await; let publisher1 = ztimeout!(session1 @@ -102,11 +106,9 @@ async fn zenoh_matching_status_any() -> Result<()> { #[cfg(feature = "unstable")] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn zenoh_matching_status_remote() -> Result<()> { - use flume::RecvTimeoutError; - - let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session1 = ztimeout!(open(zenoh::config::peer()).res_async()).unwrap(); - let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(open(zenoh::config::peer()).res_async()).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_remote_test") @@ -168,9 +170,9 @@ async fn zenoh_matching_status_remote() -> Result<()> { async fn zenoh_matching_status_local() -> Result<()> { use flume::RecvTimeoutError; - let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session1 = ztimeout!(open(zenoh::config::peer()).res_async()).unwrap(); - let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(open(zenoh::config::peer()).res_async()).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("zenoh_matching_status_local_test") diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 5e45cdaec7..ae16fa9300 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -12,16 +12,19 @@ // ZettaScale Zenoh Team, // use std::time::Duration; -use zenoh::prelude::r#async::*; -use zenoh_core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::publication::Priority; +use zenoh::sample::{CongestionControl, QoSBuilderTrait}; +use zenoh::session::SessionDeclarations; +use zenoh::{core::ztimeout, session::open}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn pubsub() { - let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); - let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); + let session1 = ztimeout!(open(zenoh_config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(open(zenoh_config::peer()).res_async()).unwrap(); let publisher1 = ztimeout!(session1 .declare_publisher("test/qos") diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 830f22a475..f5611bf189 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -16,13 +16,17 @@ use std::sync::atomic::Ordering; use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use zenoh::config::{Config, ModeDependentValue}; -use zenoh::prelude::r#async::*; -use zenoh::sample::builder::QoSBuilderTrait; -use zenoh::Result; -use zenoh_core::ztimeout; -use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher}; -use zenoh_result::bail; +use zenoh::config::{Config, WhatAmI}; +use zenoh::config::{ModeDependentValue, WhatAmIMatcher}; +use zenoh::core::bail; +use zenoh::core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::core::Result; +use zenoh::key_expr::KeyExpr; +use zenoh::sample::CongestionControl; +use zenoh::sample::QoSBuilderTrait; +use zenoh::session::SessionDeclarations; +use zenoh::session::{open, Session}; const TIMEOUT: Duration = Duration::from_secs(10); const MSG_COUNT: usize = 50; @@ -280,7 +284,7 @@ impl Recipe { // In case of client can't connect to some peers/routers loop { - if let Ok(session) = zenoh::open(config.clone()).res_async().await { + if let Ok(session) = open(config.clone()).res_async().await { break session.into_arc(); } else { tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index cd7335c28e..69612686df 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -14,9 +14,12 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use zenoh::config; +use zenoh::core::ztimeout; +use zenoh::key_expr::KeyExpr; use zenoh::prelude::r#async::*; -use zenoh::sample::builder::QoSBuilderTrait; -use zenoh_core::ztimeout; +use zenoh::sample::Value; +use zenoh::session::open; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -33,7 +36,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Opening peer01 session: {:?}", endpoints); - let peer01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let peer01 = ztimeout!(open(config).res_async()).unwrap(); let mut config = config::peer(); config.connect.endpoints = endpoints @@ -42,7 +45,7 @@ async fn open_session_unicast(endpoints: &[&str]) -> (Session, Session) { .collect::>(); config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][02a] Opening peer02 session: {:?}", endpoints); - let peer02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let peer02 = ztimeout!(open(config).res_async()).unwrap(); (peer01, peer02) } @@ -53,13 +56,13 @@ async fn open_session_multicast(endpoint01: &str, endpoint02: &str) -> (Session, config.listen.endpoints = vec![endpoint01.parse().unwrap()]; config.scouting.multicast.set_enabled(Some(true)).unwrap(); println!("[ ][01a] Opening peer01 session: {}", endpoint01); - let peer01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let peer01 = ztimeout!(open(config).res_async()).unwrap(); let mut config = config::peer(); config.listen.endpoints = vec![endpoint02.parse().unwrap()]; config.scouting.multicast.set_enabled(Some(true)).unwrap(); println!("[ ][02a] Opening peer02 session: {}", endpoint02); - let peer02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let peer02 = ztimeout!(open(config).res_async()).unwrap(); (peer01, peer02) } diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index a71a0a8034..42f7d476e4 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -15,9 +15,11 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Handle; -use zenoh::prelude::r#async::*; -use zenoh::sample::builder::QoSBuilderTrait; -use zenoh_core::ztimeout; +use zenoh::config; +use zenoh::core::ztimeout; +use zenoh::core::AsyncResolve; +use zenoh::sample::QoSBuilderTrait; +use zenoh::session::{open, Session}; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); @@ -30,14 +32,14 @@ async fn open_p2p_sessions() -> (Session, Session, Session) { config.listen.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][01a] Opening s01 session"); - let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s01 = ztimeout!(open(config).res_async()).unwrap(); let mut config = config::peer(); config.listen.endpoints = vec!["tcp/127.0.0.1:27448".parse().unwrap()]; config.connect.endpoints = vec!["tcp/127.0.0.1:27447".parse().unwrap()]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][02a] Opening s02 session"); - let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s02 = ztimeout!(open(config).res_async()).unwrap(); let mut config = config::peer(); config.connect.endpoints = vec![ @@ -46,7 +48,7 @@ async fn open_p2p_sessions() -> (Session, Session, Session) { ]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][03a] Opening s03 session"); - let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s03 = ztimeout!(open(config).res_async()).unwrap(); (s01, s02, s03) } @@ -58,7 +60,7 @@ async fn open_router_session() -> Session { config.listen.endpoints = vec!["tcp/127.0.0.1:37447".parse().unwrap()]; config.scouting.multicast.set_enabled(Some(false)).unwrap(); println!("[ ][00a] Opening router session"); - ztimeout!(zenoh::open(config).res_async()).unwrap() + ztimeout!(open(config).res_async()).unwrap() } async fn close_router_session(s: Session) { @@ -70,15 +72,15 @@ async fn open_client_sessions() -> (Session, Session, Session) { // Open the sessions let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); println!("[ ][01a] Opening s01 session"); - let s01 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s01 = ztimeout!(open(config).res_async()).unwrap(); let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); println!("[ ][02a] Opening s02 session"); - let s02 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s02 = ztimeout!(open(config).res_async()).unwrap(); let config = config::client(["tcp/127.0.0.1:37447".parse::().unwrap()]); println!("[ ][03a] Opening s03 session"); - let s03 = ztimeout!(zenoh::open(config).res_async()).unwrap(); + let s03 = ztimeout!(open(config).res_async()).unwrap(); (s01, s02, s03) }