Skip to content

Commit

Permalink
feat!: make session an arc-like object
Browse files Browse the repository at this point in the history
The refactoring is quite deep, so this is the first (dirty) iteration
which passes the tests.
  • Loading branch information
wyfo committed Sep 3, 2024
1 parent e745b8f commit 2efff1f
Show file tree
Hide file tree
Showing 38 changed files with 672 additions and 1,182 deletions.
2 changes: 1 addition & 1 deletion examples/examples/z_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;
use zenoh_ext::SubscriberForward;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{prelude::*, session::ZenohId};
use zenoh::session::ZenohId;
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {

let (config, express) = parse_args();

let session = zenoh::open(config).wait().unwrap().into_arc();
let session = zenoh::open(config).wait().unwrap();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, prelude::*, Config};
use zenoh::{handlers::RingChannel, key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use zenoh::{
bytes::ZBytes,
key_expr::KeyExpr,
prelude::*,
shm::{
zshm, BlockOn, GarbageCollect, PosixShmProviderBackend, ShmProviderBuilder,
POSIX_PROTOCOL_ID,
Expand Down
1 change: 0 additions & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use futures::select;
use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::*,
sample::{Sample, SampleKind},
Config,
};
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, Config};
use zenoh::{key_expr::KeyExpr, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::{key_expr::KeyExpr, prelude::*, sample::SampleKind, Config};
use zenoh::{key_expr::KeyExpr, sample::SampleKind, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_sub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use clap::Parser;
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
use zenoh::shm::zshm;
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*};
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand Down
1 change: 0 additions & 1 deletion plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use zenoh::{
key_expr::{keyexpr, KeyExpr},
prelude::ZResult,
sample::Sample,
session::SessionDeclarations,
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};

Expand Down
1 change: 0 additions & 1 deletion plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use zenoh::{
config::Config,
key_expr::keyexpr,
qos::{CongestionControl, QoSBuilderTrait},
session::SessionDeclarations,
};

const HTML: &str = r#"
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use zenoh::{
prelude::*,
query::{Parameters, QueryConsolidation, Reply, Selector, ZenohParameters},
sample::{Sample, SampleKind},
session::{Session, SessionDeclarations},
session::Session,
};
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};

Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
use flume::{Receiver, Sender};
use futures::{pin_mut, select, FutureExt};
use tokio::{sync::RwLock, time::interval};
use zenoh::{key_expr::keyexpr, prelude::*};
use zenoh::key_expr::keyexpr;
use zenoh_backend_traits::config::{ReplicaConfig, StorageConfig};

use crate::{backends_mgt::StoreIntercept, storages_mgt::StorageMessage};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use zenoh::{
},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
session::{Session, SessionDeclarations},
session::Session,
time::{Timestamp, NTP64},
Result as ZResult,
};
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/examples/examples/z_query_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::{arg, Parser};
use zenoh::{config::Config, prelude::*, query::ReplyKeyExpr};
use zenoh::{config::Config, query::ReplyKeyExpr};
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

Expand Down
25 changes: 12 additions & 13 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use zenoh::{
pubsub::FlumeSubscriber,
query::{Query, Queryable, ZenohParameters},
sample::{Locality, Sample},
session::{SessionDeclarations, SessionRef},
Error, Resolvable, Resolve, Result as ZResult,
Error, Resolvable, Resolve, Result as ZResult, Session,
};

/// The builder of PublicationCache, allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: SessionRef<'a>,
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Option<Locality>,
Expand All @@ -43,7 +42,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {

impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
pub(crate) fn new(
session: SessionRef<'a>,
session: &'a Session,
pub_key_expr: ZResult<KeyExpr<'b>>,
) -> PublicationCacheBuilder<'a, 'b, 'c> {
PublicationCacheBuilder {
Expand Down Expand Up @@ -95,8 +94,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
}
}

impl<'a> Resolvable for PublicationCacheBuilder<'a, '_, '_> {
type To = ZResult<PublicationCache<'a>>;
impl Resolvable for PublicationCacheBuilder<'_, '_, '_> {
type To = ZResult<PublicationCache>;
}

impl Wait for PublicationCacheBuilder<'_, '_, '_> {
Expand All @@ -105,7 +104,7 @@ impl Wait for PublicationCacheBuilder<'_, '_, '_> {
}
}

impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> {
impl IntoFuture for PublicationCacheBuilder<'_, '_, '_> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

Expand All @@ -114,14 +113,14 @@ impl<'a> IntoFuture for PublicationCacheBuilder<'a, '_, '_> {
}
}

pub struct PublicationCache<'a> {
local_sub: FlumeSubscriber<'a>,
_queryable: Queryable<'a, flume::Receiver<Query>>,
pub struct PublicationCache {
local_sub: FlumeSubscriber,
_queryable: Queryable<flume::Receiver<Query>>,
task: TerminatableTask,
}

impl<'a> PublicationCache<'a> {
fn new(conf: PublicationCacheBuilder<'a, '_, '_>) -> ZResult<PublicationCache<'a>> {
impl PublicationCache {
fn new(conf: PublicationCacheBuilder<'_, '_, '_>) -> ZResult<PublicationCache> {
let key_expr = conf.pub_key_expr?;
// the queryable_prefix (optional), and the key_expr for PublicationCache's queryable ("[<queryable_prefix>]/<pub_key_expr>")
let (queryable_prefix, queryable_key_expr): (Option<OwnedKeyExpr>, KeyExpr) =
Expand Down Expand Up @@ -258,7 +257,7 @@ impl<'a> PublicationCache<'a> {

/// Undeclare this [`PublicationCache`]`.
#[inline]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
ResolveFuture::new(async move {
let PublicationCache {
_queryable,
Expand Down
24 changes: 12 additions & 12 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,16 @@ use zenoh::{
pubsub::{Reliability, Subscriber},
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait},
session::{SessionDeclarations, SessionRef},
time::Timestamp,
Error, Resolvable, Resolve, Result as ZResult,
Error, Resolvable, Resolve, Result as ZResult, Session,
};

use crate::ExtractSample;

/// The builder of [`FetchingSubscriber`], allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) session: SessionRef<'a>,
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
Expand Down Expand Up @@ -224,7 +223,7 @@ where
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
}

impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
Expand Down Expand Up @@ -362,7 +361,7 @@ pub struct FetchingSubscriberBuilder<
> where
TryIntoSample: ExtractSample,
{
pub(crate) session: SessionRef<'a>,
pub(crate) session: &'a Session,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
Expand Down Expand Up @@ -548,7 +547,7 @@ where
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
}

impl<
Expand Down Expand Up @@ -620,28 +619,29 @@ where
/// }
/// # }
/// ```
pub struct FetchingSubscriber<'a, Handler> {
subscriber: Subscriber<'a, ()>,
pub struct FetchingSubscriber<Handler> {
subscriber: Subscriber<()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}

impl<Handler> std::ops::Deref for FetchingSubscriber<'_, Handler> {
impl<Handler> std::ops::Deref for FetchingSubscriber<Handler> {
type Target = Handler;
fn deref(&self) -> &Self::Target {
&self.handler
}
}

impl<Handler> std::ops::DerefMut for FetchingSubscriber<'_, Handler> {
impl<Handler> std::ops::DerefMut for FetchingSubscriber<Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.handler
}
}

impl<'a, Handler> FetchingSubscriber<'a, Handler> {
impl<Handler> FetchingSubscriber<Handler> {
fn new<
'a,
KeySpace,
InputHandler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Expand Down Expand Up @@ -724,7 +724,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {

/// Undeclare this [`FetchingSubscriber`]`.
#[inline]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> {
self.subscriber.undeclare()
}

Expand Down
Loading

0 comments on commit 2efff1f

Please sign in to comment.