diff --git a/Cargo.lock b/Cargo.lock index 69148b230..b79dc2f61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5078,6 +5078,7 @@ name = "zenoh" version = "1.0.0-dev" dependencies = [ "ahash", + "async-channel 2.3.1", "async-trait", "bytes", "flume", diff --git a/Cargo.toml b/Cargo.toml index f2b96a40b..a8fecea1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ async-global-executor = "2.4.1" async-io = "2.3.4" async-std = { version = "1.6.5", features = ["tokio1"] } async-trait = "0.1.82" +async-channel = "2.3.1" base64 = "0.22.1" bincode = "1.3.3" bytes = "1.7.1" diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 1359bdd1b..a169722a9 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -68,6 +68,7 @@ unstable = ["internal_config", "zenoh-keyexpr/unstable", "zenoh-config/unstable" internal_config = [] [dependencies] +async-channel = { workspace = true } tokio = { workspace = true, features = ["rt", "macros", "time"] } tokio-util = { workspace = true } ahash = { workspace = true } diff --git a/zenoh/src/api/builders/close.rs b/zenoh/src/api/builders/close.rs index daf53f1e2..9342114fc 100644 --- a/zenoh/src/api/builders/close.rs +++ b/zenoh/src/api/builders/close.rs @@ -99,19 +99,13 @@ impl IntoFuture for CloseBuilder { #[cfg(all(feature = "unstable", feature = "internal"))] /// A builder for close operations running in background -// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't -// care about the `private_bounds` lint in this particular case. #[doc(hidden)] -#[allow(private_bounds)] pub struct BackgroundCloseBuilder { inner: Pin + Send>>, } #[cfg(all(feature = "unstable", feature = "internal"))] #[doc(hidden)] -// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't -// care about the `private_bounds` lint in this particular case. -#[allow(private_bounds)] impl BackgroundCloseBuilder { fn new(inner: Pin + Send>>) -> Self { Self { inner } @@ -120,7 +114,7 @@ impl BackgroundCloseBuilder { #[cfg(all(feature = "unstable", feature = "internal"))] impl Resolvable for BackgroundCloseBuilder { - type To = tokio::task::JoinHandle; + type To = NolocalJoinHandle; } #[cfg(all(feature = "unstable", feature = "internal"))] @@ -135,10 +129,65 @@ impl IntoFuture for BackgroundCloseBuilder { type Output = ::To; type IntoFuture = Pin::Output> + Send>>; - // NOTE: yes, we need to return a future that returns JoinHandle - #[allow(clippy::async_yields_async)] fn into_future(self) -> Self::IntoFuture { - Box::pin(async move { ZRuntime::Application.spawn(self.inner) }.into_future()) + Box::pin( + async move { + let (tx, rx) = async_channel::bounded::(1); + + ZRuntime::Application.spawn(async move { + tx.send(self.inner.await) + .await + .expect("BackgroundCloseBuilder: critical error sending the result") + }); + NolocalJoinHandle::new(rx) + } + .into_future(), + ) + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +#[doc(hidden)] +pub struct NolocalJoinHandle { + rx: async_channel::Receiver, +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl NolocalJoinHandle { + fn new(rx: async_channel::Receiver) -> Self { + Self { rx } + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl Resolvable for NolocalJoinHandle { + type To = TOutput; +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl Wait for NolocalJoinHandle { + fn wait(self) -> Self::To { + self.rx + .recv_blocking() + .expect("NolocalJoinHandle: critical error receiving the result") + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl IntoFuture for NolocalJoinHandle { + type Output = ::To; + type IntoFuture = Pin::Output> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin( + async move { + self.rx + .recv() + .await + .expect("NolocalJoinHandle: critical error receiving the result") + } + .into_future(), + ) } } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 643b0e134..6de866171 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -475,6 +475,12 @@ compile_error!( #[zenoh_macros::internal] pub mod internal { + #[zenoh_macros::unstable] + pub mod builders { + pub mod close { + pub use crate::api::builders::close::{BackgroundCloseBuilder, NolocalJoinHandle}; + } + } pub mod traits { pub use crate::api::builders::sample::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index 5a1837844..9079f4fbe 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -425,8 +425,8 @@ async fn zenoh_session_close_in_background() { let close_task_2 = peer02.close().in_background().await; let close_all = async move { - close_task_1.await.unwrap().unwrap(); - close_task_2.await.unwrap().unwrap(); + close_task_1.await.unwrap(); + close_task_2.await.unwrap(); }; ztimeout!(close_all); }