From 2378d5c3f9eb16a6bd6a815bc1478f705f848484 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Tue, 2 Apr 2024 14:25:06 +0200 Subject: [PATCH] session move unfinished --- plugins/zenoh-plugin-example/src/lib.rs | 2 +- plugins/zenoh-plugin-rest/src/lib.rs | 4 +- .../src/backends_mgt.rs | 2 +- .../zenoh-plugin-storage-manager/src/lib.rs | 4 +- .../src/replica/align_queryable.rs | 2 +- .../src/replica/aligner.rs | 2 +- .../src/replica/mod.rs | 2 +- .../src/replica/storage.rs | 378 ++++++++++-------- .../src/storages_mgt.rs | 2 +- zenoh-ext/src/group.rs | 2 +- zenoh-ext/src/publication_cache.rs | 2 +- zenoh-ext/src/querying_subscriber.rs | 2 +- zenoh-ext/src/session_ext.rs | 2 +- zenoh/src/admin.rs | 4 +- zenoh/src/api.rs | 3 +- zenoh/src/api/key_expr.rs | 2 +- zenoh/src/api/session.rs | 156 ++++++++ zenoh/src/handlers.rs | 3 +- zenoh/src/info.rs | 2 +- zenoh/src/lib.rs | 171 +------- zenoh/src/liveliness.rs | 2 +- zenoh/src/publication.rs | 8 +- zenoh/src/queryable.rs | 4 +- zenoh/src/subscriber.rs | 6 +- zenoh/tests/qos.rs | 2 +- 25 files changed, 416 insertions(+), 353 deletions(-) diff --git a/plugins/zenoh-plugin-example/src/lib.rs b/plugins/zenoh-plugin-example/src/lib.rs index ad254278e3..5615ce68af 100644 --- a/plugins/zenoh-plugin-example/src/lib.rs +++ b/plugins/zenoh-plugin-example/src/lib.rs @@ -144,7 +144,7 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc) { env_logger::init(); // create a zenoh Session that shares the same Runtime than zenohd - let session = zenoh::init(runtime).res().await.unwrap(); + let session = zenoh::session::init(runtime).res().await.unwrap(); // the HasMap used as a storage by this example of storage plugin let mut stored: HashMap = HashMap::new(); diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 43c3f33776..49c58f5074 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -35,7 +35,7 @@ use zenoh::prelude::r#async::*; use zenoh::query::{QueryConsolidation, Reply}; use zenoh::runtime::Runtime; use zenoh::selector::TIME_RANGE_KEY; -use zenoh::Session; +use zenoh::session::Session; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; use zenoh_result::{bail, zerror, ZResult}; @@ -490,7 +490,7 @@ pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> { let _ = env_logger::try_init(); let zid = runtime.zid().to_string(); - let session = zenoh::init(runtime).res().await.unwrap(); + let session = zenoh::session::init(runtime).res().await.unwrap(); let mut app = Server::with_state((Arc::new(session), zid)); app.with( diff --git a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs index 90a6ae6250..dcce49f5da 100644 --- a/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/backends_mgt.rs @@ -14,7 +14,7 @@ use super::storages_mgt::*; use flume::Sender; use std::sync::Arc; -use zenoh::Session; +use zenoh::session::Session; use zenoh_backend_traits::config::StorageConfig; use zenoh_backend_traits::{Capability, VolumeInstance}; use zenoh_result::ZResult; diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index 91df2f108d..78a9814179 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -30,7 +30,7 @@ use storages_mgt::StorageMessage; use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::sync::*; use zenoh::runtime::Runtime; -use zenoh::Session; +use zenoh::session::Session; use zenoh_backend_traits::config::ConfigDiff; use zenoh_backend_traits::config::PluginConfig; use zenoh_backend_traits::config::StorageConfig; @@ -114,7 +114,7 @@ impl StorageRuntimeInner { let plugins_manager = PluginsManager::dynamic(lib_loader.clone(), BACKEND_LIB_PREFIX) .declare_static_plugin::(); - let session = Arc::new(zenoh::init(runtime.clone()).res_sync()?); + let session = Arc::new(zenoh::session::init(runtime.clone()).res_sync()?); // After this moment result should be only Ok. Failure of loading of one voulme or storage should not affect others. diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs index 1ce6a1cb16..8654927f9f 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/align_queryable.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use zenoh::payload::StringOrBase64; use zenoh::prelude::r#async::*; use zenoh::time::Timestamp; -use zenoh::Session; +use zenoh::session::Session; pub struct AlignQueryable { session: Arc, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index ca93651e46..9c54bcf461 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -22,7 +22,7 @@ use zenoh::payload::StringOrBase64; use zenoh::prelude::r#async::*; use zenoh::sample::builder::SampleBuilder; use zenoh::time::Timestamp; -use zenoh::Session; +use zenoh::session::Session; pub struct Aligner { session: Arc, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs index 5dda032029..9a4fd35a11 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/mod.rs @@ -29,7 +29,7 @@ use urlencoding::encode; use zenoh::payload::StringOrBase64; use zenoh::prelude::r#async::*; use zenoh::time::Timestamp; -use zenoh::Session; +use zenoh::session::Session; use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig}; pub mod align_queryable; diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 06c5882408..69ecf9477c 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -19,19 +19,15 @@ use async_trait::async_trait; use flume::{Receiver, Sender}; use futures::select; use std::collections::{HashMap, HashSet}; -use std::str::{self, FromStr}; +use std::str::FromStr; use std::time::{SystemTime, UNIX_EPOCH}; -use zenoh::buffers::buffer::SplitBuffer; use zenoh::buffers::ZBuf; use zenoh::prelude::r#async::*; -use zenoh::query::{ConsolidationMode, QueryTarget}; -use zenoh::sample::builder::SampleBuilder; -use zenoh::sample::{Sample, SampleKind}; +use zenoh::query::ConsolidationMode; use zenoh::time::{new_reception_timestamp, Timestamp, NTP64}; -use zenoh::value::Value; -use zenoh::{Result as ZResult, Session}; +use zenoh::{Result as ZResult, session::Session}; use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig}; -use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData}; +use zenoh_backend_traits::{Capability, History, Persistence, Storage, StorageInsertionResult, StoredData}; use zenoh_keyexpr::key_expr::OwnedKeyExpr; use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider; use zenoh_keyexpr::keyexpr_tree::{support::NonWild, support::UnknownWildness, KeBoxTree}; @@ -42,15 +38,148 @@ use zenoh_util::{zenoh_home, Timed, TimedEvent, Timer}; pub const WILDCARD_UPDATES_FILENAME: &str = "wildcard_updates"; pub const TOMBSTONE_FILENAME: &str = "tombstones"; +#[derive(Clone, Debug)] +pub enum StorageSampleKind { + Put(Value), + Delete, +} + +#[derive(Clone, Debug)] +pub struct StorageSample { + pub key_expr: KeyExpr<'static>, + pub timestamp: Timestamp, + pub kind: StorageSampleKind, +} + +impl From for StorageSample { + fn from(sample: Sample) -> Self { + let timestamp = *sample.timestamp().unwrap_or(&new_reception_timestamp()); + // TODO: add API for disassembly of Sample + let key_expr = sample.key_expr().clone(); + let payload = sample.payload().clone(); + let encoding = sample.encoding().clone(); + let kind = match sample.kind() { + SampleKind::Put => StorageSampleKind::Put(Value::new(payload).with_encoding(encoding)), + SampleKind::Delete => StorageSampleKind::Delete, + }; + StorageSample { + key_expr, + timestamp, + kind, + } + } +} + #[derive(Clone)] -struct Update { - kind: SampleKind, - data: StoredData, +enum Update { + Put(StoredData), + Delete(Timestamp), +} + +impl From for Update { + fn from(value: StorageSample) -> Self { + match value.kind { + StorageSampleKind::Put(data) => Update::Put(StoredData { + value: data, + timestamp: value.timestamp, + }), + StorageSampleKind::Delete => Update::Delete(value.timestamp), + } + } +} + +impl Update { + fn timestamp(&self) -> &Timestamp { + match self { + Update::Put(data) => &data.timestamp, + Update::Delete(ts) => ts, + } + } +} + +// implement from String for Update +impl TryFrom for Update { + type Error = zenoh::Error; + + fn try_from(value: String) -> Result { + let result: (String, String, String, Vec<&[u8]>) = serde_json::from_str(&value)?; + let mut payload = ZBuf::default(); + for slice in result.3 { + payload.push_zslice(slice.to_vec().into()); + } + let value = Value::new(payload).with_encoding(result.2); + let timestamp = Timestamp::from_str(&result.1).map_err(|_| "Error parsing timestamp")?; + if result.0.eq(&(SampleKind::Put).to_string()) { + Ok(Update::Put(StoredData { value, timestamp })) + } else { + Ok(Update::Delete(timestamp)) + } + } +} + +// implement to_string for Update +impl ToString for Update { + fn to_string(&self) -> String { + let result = match self { + Update::Put(data) => ( + SampleKind::Put.to_string(), + data.timestamp.to_string(), + data.value.encoding.to_string(), + data.value.payload.slices().collect::>(), + ), + Update::Delete(ts) => ( + SampleKind::Delete.to_string(), + ts.to_string(), + "".to_string(), + vec![], + ), + }; + serde_json::to_string_pretty(&result).unwrap() + } +} + +trait IntoStorageSample { + fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample + where + IntoKeyExpr: Into>; +} + +impl IntoStorageSample for StoredData { + fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample + where + IntoKeyExpr: Into>, + { + StorageSample { + key_expr: key_expr.into(), + timestamp: self.timestamp, + kind: StorageSampleKind::Put(self.value), + } + } +} + +impl IntoStorageSample for Update { + fn into_sample(self, key_expr: IntoKeyExpr) -> StorageSample + where + IntoKeyExpr: Into>, + { + match self { + Update::Put(data) => StorageSample { + key_expr: key_expr.into(), + timestamp: data.timestamp, + kind: StorageSampleKind::Put(data.value), + }, + Update::Delete(ts) => StorageSample { + key_expr: key_expr.into(), + timestamp: ts, + kind: StorageSampleKind::Delete, + }, + } + } } pub struct ReplicationService { pub empty_start: bool, - pub aligner_updates: Receiver, + pub aligner_updates: Receiver, pub log_propagation: Sender<(OwnedKeyExpr, Timestamp)>, } @@ -109,10 +238,11 @@ impl StorageService { let saved_wc = std::fs::read_to_string(zenoh_home().join(WILDCARD_UPDATES_FILENAME)).unwrap(); let saved_wc: HashMap = - serde_json::from_str(&saved_wc).unwrap(); + serde_json::from_str(&saved_wc).unwrap(); // TODO: Remove unwrap let mut wildcard_updates = storage_service.wildcard_updates.write().await; for (k, data) in saved_wc { - wildcard_updates.insert(&k, construct_update(data)); + wildcard_updates.insert(&k, Update::try_from(data).unwrap()); + // TODO: Remove unwrap } } } @@ -183,7 +313,7 @@ impl StorageService { log::error!("Sample {:?} is not timestamped. Please timestamp samples meant for replicated storage.", sample); } else { - self.process_sample(sample).await; + self.process_sample(sample.into()).await; } }, // on query on key_expr @@ -223,16 +353,15 @@ impl StorageService { select!( // on sample for key_expr sample = storage_sub.recv_async() => { - let sample = match sample { + let mut sample = match sample { Ok(sample) => sample, Err(e) => { log::error!("Error in sample: {}", e); continue; } }; - let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp()); - let sample = SampleBuilder::from(sample).timestamp(timestamp).into(); - self.process_sample(sample).await; + sample.ensure_timestamp(); + self.process_sample(sample.into()).await; }, // on query on key_expr query = storage_queryable.recv_async() => { @@ -262,61 +391,48 @@ impl StorageService { // The storage should only simply save the key, sample pair while put and retrieve the same during get // the trimming during PUT and GET should be handled by the plugin - async fn process_sample(&self, sample: Sample) { + async fn process_sample(&self, sample: StorageSample) { log::trace!("[STORAGE] Processing sample: {:?}", sample); + // if wildcard, update wildcard_updates - if sample.key_expr().is_wild() { + if sample.key_expr.is_wild() { self.register_wildcard_update(sample.clone()).await; } - let matching_keys = if sample.key_expr().is_wild() { - self.get_matching_keys(sample.key_expr()).await + let matching_keys = if sample.key_expr.is_wild() { + self.get_matching_keys(&sample.key_expr).await } else { - vec![sample.key_expr().clone().into()] + vec![sample.key_expr.clone().into()] }; log::trace!( "The list of keys matching `{}` is : {:?}", - sample.key_expr(), + sample.key_expr, matching_keys ); for k in matching_keys { if !self - .is_deleted(&k.clone(), sample.timestamp().unwrap()) + .is_deleted(&k.clone(), &sample.timestamp) .await && (self.capability.history.eq(&History::All) || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample.timestamp().unwrap()).await)) + && self.is_latest(&k, &sample.timestamp).await)) { log::trace!( "Sample `{:?}` identified as neded processing for key {}", sample, - k + &k ); // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. // get the relevant wild card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = if let Some(update) = self - .ovderriding_wild_update(&k, sample.timestamp().unwrap()) - .await - { - match update.kind { - SampleKind::Put => { - SampleBuilder::put(KeyExpr::from(k.clone()), update.data.value.payload) - .encoding(update.data.value.encoding) - .timestamp(update.data.timestamp) - .into() - } - SampleKind::Delete => SampleBuilder::delete(KeyExpr::from(k.clone())) - .timestamp(update.data.timestamp) - .into(), - } - } else { - SampleBuilder::from(sample.clone()) - .keyexpr(k.clone()) - .into() - }; + let sample_to_store = + match self.ovderriding_wild_update(&k, &sample.timestamp).await { + Some(overriding_update) => overriding_update.into_sample(k.clone()), - let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { + None => sample.clone().into(), + }; + + let stripped_key = match self.strip_prefix(&sample_to_store.key_expr) { Ok(stripped) => stripped, Err(e) => { log::error!("{}", e); @@ -324,25 +440,21 @@ impl StorageService { } }; let mut storage = self.storage.lock().await; - let result = match sample.kind() { - SampleKind::Put => { + let result = match sample_to_store.kind { + StorageSampleKind::Put(data) => { storage .put( stripped_key, - Value::new(sample_to_store.payload().clone()) - .encoding(sample_to_store.encoding().clone()), - *sample_to_store.timestamp().unwrap(), + data, + sample_to_store.timestamp, ) .await - } - SampleKind::Delete => { + }, + StorageSampleKind::Delete => { // register a tombstone - self.mark_tombstone(&k, *sample_to_store.timestamp().unwrap()) - .await; - storage - .delete(stripped_key, *sample_to_store.timestamp().unwrap()) - .await - } + self.mark_tombstone(&k, sample_to_store.timestamp).await; + storage.delete(stripped_key, sample_to_store.timestamp).await + }, }; drop(storage); if self.replication.is_some() @@ -354,7 +466,7 @@ impl StorageService { .as_ref() .unwrap() .log_propagation - .send((k.clone(), *sample_to_store.timestamp().unwrap())); + .send((k.clone(), sample_to_store.timestamp)); match sending { Ok(_) => (), Err(e) => { @@ -385,26 +497,16 @@ impl StorageService { } } - async fn register_wildcard_update(&self, sample: Sample) { + async fn register_wildcard_update(&self, sample: StorageSample) { // @TODO: change into a better store that does incremental writes - let key = sample.key_expr().clone(); + let key = sample.key_expr.clone(); let mut wildcards = self.wildcard_updates.write().await; - let timestamp = *sample.timestamp().unwrap(); - wildcards.insert( - &key, - Update { - kind: sample.kind(), - data: StoredData { - value: Value::from(sample), - timestamp, - }, - }, - ); + wildcards.insert(&key, sample.into()); if self.capability.persistence.eq(&Persistence::Durable) { // flush to disk to makeit durable let mut serialized_data = HashMap::new(); for (k, update) in wildcards.key_value_pairs() { - serialized_data.insert(k, serialize_update(update)); + serialized_data.insert(k, update.to_string()); } if let Err(e) = std::fs::write( zenoh_home().join(WILDCARD_UPDATES_FILENAME), @@ -433,34 +535,36 @@ impl StorageService { let mut update = None; for node in wildcards.intersecting_keys(key_expr) { let weight = wildcards.weight_at(&node); - if weight.is_some() && weight.unwrap().data.timestamp > *ts { - // if the key matches a wild card update, check whether it was saved in storage - // remember that wild card updates change only existing keys - let stripped_key = match self.strip_prefix(&key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - log::error!("{}", e); - break; - } - }; - let mut storage = self.storage.lock().await; - match storage.get(stripped_key, "").await { - Ok(stored_data) => { - for entry in stored_data { - if entry.timestamp > *ts { - return None; + if let Some(weight) = weight { + if weight.timestamp() > ts { + // if the key matches a wild card update, check whether it was saved in storage + // remember that wild card updates change only existing keys + let stripped_key = match self.strip_prefix(&key_expr.into()) { + Ok(stripped) => stripped, + Err(e) => { + log::error!("{}", e); + break; + } + }; + let mut storage = self.storage.lock().await; + match storage.get(stripped_key, "").await { + Ok(stored_data) => { + for entry in stored_data { + if entry.timestamp > *ts { + return None; + } } } - } - Err(e) => { - log::warn!( - "Storage '{}' raised an error fetching a query on key {} : {}", - self.name, - key_expr, - e - ); - ts = &weight.unwrap().data.timestamp; - update = Some(weight.unwrap().clone()); + Err(e) => { + log::warn!( + "Storage '{}' raised an error fetching a query on key {} : {}", + self.name, + key_expr, + e + ); + ts = weight.timestamp(); + update = Some(weight.clone()); + } } } } @@ -513,13 +617,8 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - if let Err(e) = q - .reply(key.clone(), entry.value.payload) - .encoding(entry.value.encoding) - .timestamp(entry.timestamp) - .res() - .await - { + let sample = entry.into_sample(key.clone()); + if let Err(e) = q.reply_sample(sample).res().await { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -545,13 +644,13 @@ impl StorageService { match storage.get(stripped_key, q.parameters()).await { Ok(stored_data) => { for entry in stored_data { - if let Err(e) = q - .reply(q.key_expr().clone(), entry.value.payload) - .encoding(entry.value.encoding) - .timestamp(entry.timestamp) - .res() - .await - { + let Value { + payload, encoding, .. + } = entry.value; + let sample = Sample::put(q.key_expr().clone(), payload) + .with_encoding(encoding) + .with_timestamp(entry.timestamp); + if let Err(e) = q.reply_sample(sample).res().await { log::warn!( "Storage '{}' raised an error replying a query: {}", self.name, @@ -658,7 +757,7 @@ impl StorageService { while let Ok(reply) = replies.recv_async().await { match reply.sample { Ok(sample) => { - self.process_sample(sample).await; + self.process_sample(sample.into()).await; } Err(e) => log::warn!( "Storage '{}' received an error to align query: {:?}", @@ -671,47 +770,6 @@ impl StorageService { } } -fn serialize_update(update: &Update) -> String { - let Update { - kind, - data: - StoredData { - value: Value { - payload, encoding, .. - }, - timestamp, - }, - } = update; - let zbuf: ZBuf = payload.into(); - - let result = ( - kind.to_string(), - timestamp.to_string(), - encoding.to_string(), - zbuf.slices().collect::>(), - ); - serde_json::to_string_pretty(&result).unwrap() -} - -fn construct_update(data: String) -> Update { - let result: (String, String, String, Vec<&[u8]>) = serde_json::from_str(&data).unwrap(); // @TODO: remove the unwrap() - let mut payload = ZBuf::default(); - for slice in result.3 { - payload.push_zslice(slice.to_vec().into()); - } - let value = Value::new(payload).encoding(result.2); - let data = StoredData { - value, - timestamp: Timestamp::from_str(&result.1).unwrap(), // @TODO: remove the unwrap() - }; - let kind = if result.0.eq(&(SampleKind::Put).to_string()) { - SampleKind::Put - } else { - SampleKind::Delete - }; - Update { kind, data } -} - // Periodic event cleaning-up data info for old metadata struct GarbageCollectionEvent { config: GarbageCollectionConfig, @@ -743,7 +801,7 @@ impl Timed for GarbageCollectionEvent { let mut to_be_removed = HashSet::new(); for (k, update) in wildcard_updates.key_value_pairs() { - let ts = update.data.timestamp; + let ts = update.timestamp(); if ts.get_time() < &time_limit { // mark key to be removed to_be_removed.insert(k); diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs index 6de5e2f2ca..8643429a65 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt.rs @@ -12,7 +12,7 @@ // ZettaScale Zenoh Team, // use async_std::sync::Arc; -use zenoh::Session; +use zenoh::session::Session; use zenoh_backend_traits::config::StorageConfig; use zenoh_result::ZResult; diff --git a/zenoh-ext/src/group.rs b/zenoh-ext/src/group.rs index 8a7823ed72..f74d9d547a 100644 --- a/zenoh-ext/src/group.rs +++ b/zenoh-ext/src/group.rs @@ -31,7 +31,7 @@ use zenoh::publication::Publisher; use zenoh::query::ConsolidationMode; use zenoh::Error as ZError; use zenoh::Result as ZResult; -use zenoh::Session; +use zenoh::session::Session; use zenoh_result::bail; use zenoh_sync::Condition; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 03f0814e5c..fdba3af231 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -18,7 +18,7 @@ use std::future::Ready; use zenoh::prelude::r#async::*; use zenoh::queryable::{Query, Queryable}; use zenoh::subscriber::FlumeSubscriber; -use zenoh::SessionRef; +use zenoh::session::SessionRef; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; use zenoh_result::{bail, ZResult}; use zenoh_util::core::ResolveFuture; diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index d749a94ed9..4a9469c835 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -24,7 +24,7 @@ use zenoh::sample::builder::SampleBuilder; use zenoh::subscriber::{Reliability, Subscriber}; use zenoh::time::{new_reception_timestamp, Timestamp}; use zenoh::Result as ZResult; -use zenoh::SessionRef; +use zenoh::session::SessionRef; use zenoh_core::{zlock, AsyncResolve, Resolvable, SyncResolve}; use crate::ExtractSample; diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 73fbd7dfc4..2c9826c98b 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -15,7 +15,7 @@ use super::PublicationCacheBuilder; use std::convert::TryInto; use std::sync::Arc; use zenoh::prelude::KeyExpr; -use zenoh::{Session, SessionRef}; +use zenoh::session::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) pub trait SessionExt<'s, 'a> { diff --git a/zenoh/src/admin.rs b/zenoh/src/admin.rs index 3c76ca468a..260617cda2 100644 --- a/zenoh/src/admin.rs +++ b/zenoh/src/admin.rs @@ -17,14 +17,14 @@ use crate::{ prelude::sync::{KeyExpr, Locality, SampleKind}, queryable::Query, sample::DataInfo, - Payload, Session, ZResult, + Payload, Session }; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, sync::Arc, }; -use zenoh_core::SyncResolve; +use zenoh_core::{Result as ZResult, SyncResolve}; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, diff --git a/zenoh/src/api.rs b/zenoh/src/api.rs index 94893aca68..1e7cec5380 100644 --- a/zenoh/src/api.rs +++ b/zenoh/src/api.rs @@ -12,4 +12,5 @@ // ZettaScale Zenoh Team, // -pub(crate) mod key_expr; \ No newline at end of file +pub(crate) mod key_expr; +pub(crate) mod session; \ No newline at end of file diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 4cbe6409f2..47d3a71c56 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -57,7 +57,7 @@ use zenoh_protocol::{ }; use zenoh_result::ZResult; -use crate::{net::primitives::Primitives, prelude::Selector, Session, Undeclarable}; +use crate::{net::primitives::Primitives, prelude::Selector, Session, api::session::Undeclarable}; #[derive(Clone, Debug)] pub(crate) enum KeyExprInner<'a> { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 9af5ee1d5c..89cd249bdb 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -44,10 +44,12 @@ use crate::Selector; use crate::SourceInfo; use crate::Value; use log::{error, trace, warn}; +use zenoh_core::Resolvable; use std::collections::HashMap; use std::convert::TryFrom; use std::convert::TryInto; use std::fmt; +use std::future::Ready; use std::ops::Deref; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; @@ -2595,3 +2597,157 @@ impl crate::net::primitives::EPrimitives for Session { self } } + +/// Open a zenoh [`Session`]. +/// +/// # Arguments +/// +/// * `config` - The [`Config`] for the zenoh session +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap(); +/// # } +/// ``` +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use std::str::FromStr; +/// use zenoh::prelude::r#async::*; +/// +/// let mut config = config::peer(); +/// config.set_id(ZenohId::from_str("221b72df20924c15b8794c6bdb471150").unwrap()); +/// config.connect.endpoints.extend("tcp/10.10.10.10:7447,tcp/11.11.11.11:7447".split(',').map(|s|s.parse().unwrap())); +/// +/// let session = zenoh::open(config).res().await.unwrap(); +/// # } +/// ``` +pub fn open(config: TryIntoConfig) -> OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + OpenBuilder { config } +} + +/// A builder returned by [`open`] used to open a zenoh [`Session`]. +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap(); +/// # } +/// ``` +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +pub struct OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + config: TryIntoConfig, +} + +impl Resolvable for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + type To = ZResult; +} + +impl SyncResolve for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + fn res_sync(self) -> ::To { + let config: crate::config::Config = self + .config + .try_into() + .map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?; + Session::new(config).res_sync() + } +} + +impl AsyncResolve for OpenBuilder +where + TryIntoConfig: std::convert::TryInto + Send + 'static, + >::Error: std::fmt::Debug, +{ + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +/// Initialize a Session with an existing Runtime. +/// This operation is used by the plugins to share the same Runtime as the router. +#[doc(hidden)] +#[zenoh_macros::unstable] +pub fn init(runtime: Runtime) -> InitBuilder { + InitBuilder { + runtime, + aggregated_subscribers: vec![], + aggregated_publishers: vec![], + } +} + +/// A builder returned by [`init`] and used to initialize a Session with an existing Runtime. +#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] +#[doc(hidden)] +#[zenoh_macros::unstable] +pub struct InitBuilder { + runtime: Runtime, + aggregated_subscribers: Vec, + aggregated_publishers: Vec, +} + +#[zenoh_macros::unstable] +impl InitBuilder { + #[inline] + pub fn aggregated_subscribers(mut self, exprs: Vec) -> Self { + self.aggregated_subscribers = exprs; + self + } + + #[inline] + pub fn aggregated_publishers(mut self, exprs: Vec) -> Self { + self.aggregated_publishers = exprs; + self + } +} + +#[zenoh_macros::unstable] +impl Resolvable for InitBuilder { + type To = ZResult; +} + +#[zenoh_macros::unstable] +impl SyncResolve for InitBuilder { + fn res_sync(self) -> ::To { + Ok(Session::init( + self.runtime, + self.aggregated_subscribers, + self.aggregated_publishers, + ) + .res_sync()) + } +} + +#[zenoh_macros::unstable] +impl AsyncResolve for InitBuilder { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} diff --git a/zenoh/src/handlers.rs b/zenoh/src/handlers.rs index c5d2c6bb90..6aecda34b9 100644 --- a/zenoh/src/handlers.rs +++ b/zenoh/src/handlers.rs @@ -13,8 +13,7 @@ // //! Callback handler trait. -use crate::API_DATA_RECEPTION_CHANNEL_SIZE; - +use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; use std::sync::{Arc, Mutex, Weak}; use zenoh_collections::RingBuffer as RingBufferInner; use zenoh_result::ZResult; diff --git a/zenoh/src/info.rs b/zenoh/src/info.rs index 3e0efdf134..1f7a903ba4 100644 --- a/zenoh/src/info.rs +++ b/zenoh/src/info.rs @@ -13,7 +13,7 @@ // //! Tools to access information about the current zenoh [`Session`](crate::Session). -use crate::SessionRef; +use crate::api::session::SessionRef; use std::future::Ready; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; use zenoh_protocol::core::{WhatAmI, ZenohId}; diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index c0bf501cc9..2f1beb5413 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -84,14 +84,10 @@ pub(crate) type Id = u32; use git_version::git_version; use handlers::DefaultHandler; #[cfg(feature = "unstable")] -use net::runtime::Runtime; use prelude::*; use scouting::ScoutBuilder; -use std::future::Ready; -use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; pub use zenoh_macros::{ke, kedefine, keformat, kewrite}; use zenoh_protocol::core::WhatAmIMatcher; -use zenoh_result::{zerror, ZResult}; use zenoh_util::concat_enabled_features; /// A zenoh error. @@ -123,6 +119,8 @@ pub const FEATURES: &str = concat_enabled_features!( ] ); +pub use crate::api::session::open; + pub mod key_expr { pub use crate::api::key_expr::keyexpr; pub use crate::api::key_expr::OwnedKeyExpr; @@ -137,11 +135,16 @@ pub mod key_expr { } } +pub mod session { + pub use crate::api::session::open; + pub use crate::api::session::init; + pub use crate::api::session::Session; + pub use crate::api::session::SessionRef; + pub use crate::api::session::SessionDeclarations; +} mod admin; #[macro_use] -mod session; -pub use session::*; mod api; pub(crate) mod net; @@ -231,158 +234,4 @@ where config: config.try_into().map_err(|e| e.into()), handler: DefaultHandler, } -} - -/// Open a zenoh [`Session`]. -/// -/// # Arguments -/// -/// * `config` - The [`Config`] for the zenoh session -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap(); -/// # } -/// ``` -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use std::str::FromStr; -/// use zenoh::prelude::r#async::*; -/// -/// let mut config = config::peer(); -/// config.set_id(ZenohId::from_str("221b72df20924c15b8794c6bdb471150").unwrap()); -/// config.connect.endpoints.extend("tcp/10.10.10.10:7447,tcp/11.11.11.11:7447".split(',').map(|s|s.parse().unwrap())); -/// -/// let session = zenoh::open(config).res().await.unwrap(); -/// # } -/// ``` -pub fn open(config: TryIntoConfig) -> OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - OpenBuilder { config } -} - -/// A builder returned by [`open`] used to open a zenoh [`Session`]. -/// -/// # Examples -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap(); -/// # } -/// ``` -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -pub struct OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - config: TryIntoConfig, -} - -impl Resolvable for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - type To = ZResult; -} - -impl SyncResolve for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - fn res_sync(self) -> ::To { - let config: crate::config::Config = self - .config - .try_into() - .map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?; - Session::new(config).res_sync() - } -} - -impl AsyncResolve for OpenBuilder -where - TryIntoConfig: std::convert::TryInto + Send + 'static, - >::Error: std::fmt::Debug, -{ - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} - -/// Initialize a Session with an existing Runtime. -/// This operation is used by the plugins to share the same Runtime as the router. -#[doc(hidden)] -#[zenoh_macros::unstable] -pub fn init(runtime: Runtime) -> InitBuilder { - InitBuilder { - runtime, - aggregated_subscribers: vec![], - aggregated_publishers: vec![], - } -} - -/// A builder returned by [`init`] and used to initialize a Session with an existing Runtime. -#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] -#[doc(hidden)] -#[zenoh_macros::unstable] -pub struct InitBuilder { - runtime: Runtime, - aggregated_subscribers: Vec, - aggregated_publishers: Vec, -} - -#[zenoh_macros::unstable] -impl InitBuilder { - #[inline] - pub fn aggregated_subscribers(mut self, exprs: Vec) -> Self { - self.aggregated_subscribers = exprs; - self - } - - #[inline] - pub fn aggregated_publishers(mut self, exprs: Vec) -> Self { - self.aggregated_publishers = exprs; - self - } -} - -#[zenoh_macros::unstable] -impl Resolvable for InitBuilder { - type To = ZResult; -} - -#[zenoh_macros::unstable] -impl SyncResolve for InitBuilder { - fn res_sync(self) -> ::To { - Ok(Session::init( - self.runtime, - self.aggregated_subscribers, - self.aggregated_publishers, - ) - .res_sync()) - } -} - -#[zenoh_macros::unstable] -impl AsyncResolve for InitBuilder { - type Future = Ready; - - fn res_async(self) -> Self::Future { - std::future::ready(self.res_sync()) - } -} +} \ No newline at end of file diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 23e1846741..8ce5386c3f 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -26,7 +26,7 @@ use { handlers::DefaultHandler, prelude::*, subscriber::{Subscriber, SubscriberInner}, - SessionRef, Undeclarable, + api::session::SessionRef, api::session::Undeclarable, }, std::convert::TryInto, std::future::Ready, diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 1f6ad17333..f634a14dd1 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -18,8 +18,8 @@ use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; use crate::sample::{DataInfo, QoS, Sample, SampleFields, SampleKind}; -use crate::SessionRef; -use crate::Undeclarable; +use crate::api::session::SessionRef; +use crate::api::session::Undeclarable; #[cfg(feature = "unstable")] use crate::{ handlers::{Callback, DefaultHandler, IntoHandler}, @@ -1511,7 +1511,7 @@ mod tests { #[test] fn sample_kind_integrity_in_publication() { - use crate::{open, prelude::sync::*}; + use crate::{api::session::open, prelude::sync::*}; const KEY_EXPR: &str = "test/sample_kind_integrity/publication"; const VALUE: &str = "zenoh"; @@ -1539,7 +1539,7 @@ mod tests { #[test] fn sample_kind_integrity_in_put_builder() { - use crate::{open, prelude::sync::*}; + use crate::{api::session::open, prelude::sync::*}; const KEY_EXPR: &str = "test/sample_kind_integrity/put_builder"; const VALUE: &str = "zenoh"; diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 0696fcbe33..447dfc81b6 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -21,8 +21,8 @@ use crate::prelude::*; use crate::sample::builder::SampleBuilder; use crate::sample::{QoSBuilder, SourceInfo}; use crate::Id; -use crate::SessionRef; -use crate::Undeclarable; +use crate::api::session::SessionRef; +use crate::api::session::Undeclarable; #[cfg(feature = "unstable")] use crate::{query::ReplyKeyExpr, sample::Attachment}; use std::fmt; diff --git a/zenoh/src/subscriber.rs b/zenoh/src/subscriber.rs index 1fc6e82b46..64f8d5e026 100644 --- a/zenoh/src/subscriber.rs +++ b/zenoh/src/subscriber.rs @@ -13,13 +13,13 @@ // //! Subscribing primitives. -use crate::handlers::{locked, Callback, DefaultHandler, IntoHandler}; use crate::api::key_expr::KeyExpr; +use crate::api::session::Undeclarable; +use crate::handlers::{locked, Callback, DefaultHandler, IntoHandler}; use crate::prelude::Locality; use crate::sample::Sample; use crate::Id; -use crate::Undeclarable; -use crate::{Result as ZResult, SessionRef}; +use crate::{api::session::SessionRef, Result as ZResult}; use std::fmt; use std::future::Ready; use std::ops::{Deref, DerefMut}; diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 5fd3edd985..f64784399c 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -14,7 +14,7 @@ use std::time::Duration; use zenoh::prelude::r#async::*; use zenoh::sample::builder::QoSBuilderTrait; -use zenoh::{publication::Priority, SessionDeclarations}; +use zenoh::{publication::Priority, session::SessionDeclarations}; use zenoh_core::ztimeout; const TIMEOUT: Duration = Duration::from_secs(60);