From c6f52c2b8f51e7b0d27e09b6aaab81fde0bc9538 Mon Sep 17 00:00:00 2001 From: yellowhatter <104833606+yellowhatter@users.noreply.github.com> Date: Tue, 3 Dec 2024 18:15:10 +0300 Subject: [PATCH] Close builder (#1576) * atexit-safe close builder draft * - get rid of nolocal crate - add timeout for async task controller finalization * Close builder trait with proper timeout support for Runtime and Session * fix close behavior * fix clippy fix ephemeral ports in tests * - brush-up timeout implementation in the TaskController - remove backoff from CloseBuilder - simplify some code around CloseBuilder * Revert ephemeral port fix * Update close.rs * merge fix * Make Close builder functionality unstable && internal * fix docs * Session close tests --- commons/zenoh-task/src/lib.rs | 42 +++-- io/zenoh-transport/src/manager.rs | 4 +- io/zenoh-transport/src/multicast/transport.rs | 4 +- zenoh/src/api/builders/close.rs | 152 ++++++++++++++++++ zenoh/src/api/builders/mod.rs | 1 + zenoh/src/api/session.rs | 94 ++++++----- zenoh/src/lib.rs | 1 + zenoh/src/net/runtime/mod.rs | 61 ++++--- zenoh/tests/session.rs | 23 +++ 9 files changed, 296 insertions(+), 86 deletions(-) create mode 100644 zenoh/src/api/builders/close.rs diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index 9c863da4f4..89ff308e43 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -110,21 +110,23 @@ impl TaskController { /// The call blocks until all tasks yield or timeout duration expires. /// Returns 0 in case of success, number of non terminated tasks otherwise. pub fn terminate_all(&self, timeout: Duration) -> usize { - ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait() + ResolveFuture::new(async move { + if tokio::time::timeout(timeout, self.terminate_all_async()) + .await + .is_err() + { + tracing::error!("Failed to terminate {} tasks", self.tracker.len()); + } + self.tracker.len() + }) + .wait() } /// Async version of [`TaskController::terminate_all()`]. - pub async fn terminate_all_async(&self, timeout: Duration) -> usize { + pub async fn terminate_all_async(&self) { self.tracker.close(); self.token.cancel(); - if tokio::time::timeout(timeout, self.tracker.wait()) - .await - .is_err() - { - tracing::error!("Failed to terminate {} tasks", self.tracker.len()); - return self.tracker.len(); - } - 0 + self.tracker.wait().await } } @@ -181,18 +183,24 @@ impl TerminatableTask { /// Attempts to terminate the task. /// Returns true if task completed / aborted within timeout duration, false otherwise. pub fn terminate(&mut self, timeout: Duration) -> bool { - ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait() + ResolveFuture::new(async move { + if tokio::time::timeout(timeout, self.terminate_async()) + .await + .is_err() + { + tracing::error!("Failed to terminate the task"); + return false; + }; + true + }) + .wait() } /// Async version of [`TerminatableTask::terminate()`]. - pub async fn terminate_async(&mut self, timeout: Duration) -> bool { + pub async fn terminate_async(&mut self) { self.token.cancel(); if let Some(handle) = self.handle.take() { - if tokio::time::timeout(timeout, handle).await.is_err() { - tracing::error!("Failed to terminate the task"); - return false; - }; + let _ = handle.await; } - true } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index a1c45a6d71..7f3adef4a6 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -471,9 +471,7 @@ impl TransportManager { pub async fn close(&self) { self.close_unicast().await; - self.task_controller - .terminate_all_async(Duration::from_secs(10)) - .await; + self.task_controller.terminate_all_async().await; } /*************************************/ diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index bcccaa9a85..3777978a3c 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -194,9 +194,7 @@ impl TransportMulticastInner { cb.closed(); } - self.task_controller - .terminate_all_async(Duration::from_secs(10)) - .await; + self.task_controller.terminate_all_async().await; Ok(()) } diff --git a/zenoh/src/api/builders/close.rs b/zenoh/src/api/builders/close.rs new file mode 100644 index 0000000000..53fba6a2ab --- /dev/null +++ b/zenoh/src/api/builders/close.rs @@ -0,0 +1,152 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::{ + future::{Future, IntoFuture}, + pin::Pin, + time::Duration, +}; + +use async_trait::async_trait; +use zenoh_core::{Resolvable, Wait}; +use zenoh_result::ZResult; +use zenoh_runtime::ZRuntime; + +/// A builder for close operations. +// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't +// care about the `private_bounds` lint in this particular case. +#[allow(private_bounds)] +pub struct CloseBuilder { + closee: TCloseable::TClosee, + timeout: Duration, +} + +// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't +// care about the `private_bounds` lint in this particular case. +#[allow(private_bounds)] +impl CloseBuilder { + pub(crate) fn new(closeable: &'_ TCloseable) -> Self { + Self { + closee: closeable.get_closee(), + timeout: Duration::from_secs(10), + } + } + + #[cfg(all(feature = "unstable", feature = "internal"))] + /// Set the timeout for close operation + /// + /// # Arguments + /// + /// * `timeout` - The timeout value for close operation (10s by default) + /// + #[doc(hidden)] + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + #[cfg(all(feature = "unstable", feature = "internal"))] + /// Run Close operation concurrently + #[doc(hidden)] + pub fn in_background( + self, + ) -> BackgroundCloseBuilder< as Resolvable>::To> { + BackgroundCloseBuilder::new(self.into_future()) + } +} + +impl Resolvable for CloseBuilder { + type To = ZResult<()>; +} + +impl Wait for CloseBuilder { + fn wait(self) -> Self::To { + ZRuntime::Application.block_in_place(self.into_future()) + } +} + +impl IntoFuture for CloseBuilder { + type Output = ::To; + type IntoFuture = Pin::Output> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin( + async move { + if tokio::time::timeout(self.timeout, self.closee.close_inner()) + .await + .is_err() + { + bail!("close operation timed out!") + } + Ok(()) + } + .into_future(), + ) + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +/// A builder for close operations running in background +// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't +// care about the `private_bounds` lint in this particular case. +#[doc(hidden)] +#[allow(private_bounds)] +pub struct BackgroundCloseBuilder { + inner: Pin + Send>>, +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +#[doc(hidden)] +// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't +// care about the `private_bounds` lint in this particular case. +#[allow(private_bounds)] +impl BackgroundCloseBuilder { + fn new(inner: Pin + Send>>) -> Self { + Self { inner } + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl Resolvable for BackgroundCloseBuilder { + type To = tokio::task::JoinHandle; +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl Wait for BackgroundCloseBuilder { + fn wait(self) -> Self::To { + ZRuntime::Application.block_in_place(self.into_future()) + } +} + +#[cfg(all(feature = "unstable", feature = "internal"))] +impl IntoFuture for BackgroundCloseBuilder { + type Output = ::To; + type IntoFuture = Pin::Output> + Send>>; + + // NOTE: yes, we need to return a future that returns JoinHandle + #[allow(clippy::async_yields_async)] + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { ZRuntime::Application.spawn(self.inner) }.into_future()) + } +} + +#[async_trait] +pub(crate) trait Closee: Send + Sync + 'static { + async fn close_inner(&self); +} + +pub(crate) trait Closeable { + type TClosee: Closee; + fn get_closee(&self) -> Self::TClosee; +} diff --git a/zenoh/src/api/builders/mod.rs b/zenoh/src/api/builders/mod.rs index 780e25366e..f29ec9b027 100644 --- a/zenoh/src/api/builders/mod.rs +++ b/zenoh/src/api/builders/mod.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // +pub(crate) mod close; pub(crate) mod info; pub(crate) mod matching_listener; pub(crate) mod publisher; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index b679a13dbb..ca18c288db 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -25,6 +25,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; +use async_trait::async_trait; use tracing::{error, info, trace, warn}; use uhlc::Timestamp; #[cfg(feature = "internal")] @@ -67,6 +68,7 @@ use zenoh_result::ZResult; use zenoh_shm::api::client_storage::ShmClientStorage; use zenoh_task::TaskController; +use super::builders::close::{CloseBuilder, Closeable, Closee}; #[cfg(feature = "unstable")] use crate::api::selector::ZenohParameters; #[cfg(feature = "unstable")] @@ -733,8 +735,8 @@ impl Session { /// subscriber_task.await.unwrap(); /// # } /// ``` - pub fn close(&self) -> impl Resolve> + '_ { - self.0.close() + pub fn close(&self) -> CloseBuilder { + CloseBuilder::new(self) } /// Check if the session has been closed. @@ -1236,50 +1238,12 @@ impl Session { }) } } + impl SessionInner { pub fn zid(&self) -> ZenohId { self.runtime.zid() } - fn close(&self) -> impl Resolve> + '_ { - ResolveFuture::new(async move { - let Some(primitives) = zwrite!(self.state).primitives.take() else { - return Ok(()); - }; - if self.owns_runtime { - info!(zid = %self.zid(), "close session"); - } - self.task_controller.terminate_all(Duration::from_secs(10)); - if self.owns_runtime { - self.runtime.close().await?; - } else { - primitives.send_close(); - } - // defer the cleanup of internal data structures by taking them out of the locked state - // this is needed because callbacks may contain entities which need to acquire the - // lock to be dropped, so callback must be dropped without the lock held - let mut state = zwrite!(self.state); - let _queryables = std::mem::take(&mut state.queryables); - let _subscribers = std::mem::take(&mut state.subscribers); - let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers); - let _local_resources = std::mem::take(&mut state.local_resources); - let _remote_resources = std::mem::take(&mut state.remote_resources); - drop(state); - #[cfg(feature = "unstable")] - { - // the lock from the outer scope cannot be reused because the declared variables - // would be undeclared at the end of the block, with the lock held, and we want - // to avoid that; so we reacquire the lock in the block - // anyway, it doesn't really matter, and this code will be cleaned up when the APIs - // will be stabilized. - let mut state = zwrite!(self.state); - let _matching_listeners = std::mem::take(&mut state.matching_listeners); - drop(state); - } - Ok(()) - }) - } - pub(crate) fn declare_prefix<'a>( &'a self, prefix: &'a str, @@ -3146,3 +3110,51 @@ where { OpenBuilder::new(config) } + +#[async_trait] +impl Closee for Arc { + async fn close_inner(&self) { + let Some(primitives) = zwrite!(self.state).primitives.take() else { + return; + }; + + if self.owns_runtime { + info!(zid = %self.zid(), "close session"); + self.task_controller.terminate_all_async().await; + self.runtime.get_closee().close_inner().await; + } else { + self.task_controller.terminate_all_async().await; + primitives.send_close(); + } + + // defer the cleanup of internal data structures by taking them out of the locked state + // this is needed because callbacks may contain entities which need to acquire the + // lock to be dropped, so callback must be dropped without the lock held + let mut state = zwrite!(self.state); + let _queryables = std::mem::take(&mut state.queryables); + let _subscribers = std::mem::take(&mut state.subscribers); + let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers); + let _local_resources = std::mem::take(&mut state.local_resources); + let _remote_resources = std::mem::take(&mut state.remote_resources); + drop(state); + #[cfg(feature = "unstable")] + { + // the lock from the outer scope cannot be reused because the declared variables + // would be undeclared at the end of the block, with the lock held, and we want + // to avoid that; so we reacquire the lock in the block + // anyway, it doesn't really matter, and this code will be cleaned up when the APIs + // will be stabilized. + let mut state = zwrite!(self.state); + let _matching_listeners = std::mem::take(&mut state.matching_listeners); + drop(state); + } + } +} + +impl Closeable for Session { + type TClosee = Arc; + + fn get_closee(&self) -> Self::TClosee { + self.0.clone() + } +} diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 9ae97d9e7b..ec39dd29c3 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -193,6 +193,7 @@ pub mod session { pub use crate::api::builders::session::{init, InitBuilder}; pub use crate::api::{ builders::{ + close::CloseBuilder, info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder}, publisher::{SessionDeleteBuilder, SessionPutBuilder}, query::SessionGetBuilder, diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 301698eea6..aa824df7b4 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -29,10 +29,10 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, Weak, }, - time::Duration, }; pub use adminspace::AdminSpace; +use async_trait::async_trait; use futures::{stream::StreamExt, Future}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -62,8 +62,13 @@ use super::{primitives::DeMux, routing, routing::router::Router}; use crate::api::loader::{load_plugins, start_plugins}; #[cfg(feature = "plugins")] use crate::api::plugins::PluginsManager; +#[cfg(feature = "internal")] +use crate::session::CloseBuilder; use crate::{ - api::config::{Config, Notifier}, + api::{ + builders::close::{Closeable, Closee}, + config::{Config, Notifier}, + }, GIT_VERSION, LONG_VERSION, }; @@ -267,26 +272,9 @@ impl Runtime { self.state.next_id.fetch_add(1, Ordering::SeqCst) } - pub async fn close(&self) -> ZResult<()> { - tracing::trace!("Runtime::close())"); - // TODO: Plugins should be stopped - // TODO: Check this whether is able to terminate all spawned task by Runtime::spawn - self.state - .task_controller - .terminate_all(Duration::from_secs(10)); - self.manager().close().await; - // clean up to break cyclic reference of self.state to itself - self.state.transport_handlers.write().unwrap().clear(); - // TODO: the call below is needed to prevent intermittent leak - // due to not freed resource Arc, that apparently happens because - // the task responsible for resource clean up was aborted earlier than expected. - // This should be resolved by identfying correspodning task, and placing - // cancellation token manually inside it. - let router = self.router(); - let mut tables = router.tables.tables.write().unwrap(); - tables.root_res.close(); - tables.faces.clear(); - Ok(()) + #[cfg(feature = "internal")] + pub fn close(&self) -> CloseBuilder { + CloseBuilder::new(self) } pub fn is_closed(&self) -> bool { @@ -539,3 +527,32 @@ impl TransportPeerEventHandler for RuntimeMulticastSession { self } } + +#[async_trait] +impl Closee for Arc { + async fn close_inner(&self) { + tracing::trace!("Runtime::close())"); + // TODO: Plugins should be stopped + // TODO: Check this whether is able to terminate all spawned task by Runtime::spawn + self.task_controller.terminate_all_async().await; + self.manager.close().await; + // clean up to break cyclic reference of self.state to itself + self.transport_handlers.write().unwrap().clear(); + // TODO: the call below is needed to prevent intermittent leak + // due to not freed resource Arc, that apparently happens because + // the task responsible for resource clean up was aborted earlier than expected. + // This should be resolved by identfying correspodning task, and placing + // cancellation token manually inside it. + let mut tables = self.router.tables.tables.write().unwrap(); + tables.root_res.close(); + tables.faces.clear(); + } +} + +impl Closeable for Runtime { + type TClosee = Arc; + + fn get_closee(&self) -> Self::TClosee { + self.state.clone() + } +} diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index a0eb6be130..3e84249b82 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -407,3 +407,26 @@ async fn zenoh_2sessions_1runtime_init() { println!("[ ][02e] Closing r2 runtime"); ztimeout!(r2.close()).unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn zenoh_session_close() { + zenoh::init_log_from_env_or("error"); + let (peer01, peer02) = open_session_unicast(&["tcp/127.0.0.1:17457"]).await; + close_session(peer01, peer02).await; +} + +#[cfg(all(feature = "internal", feature = "unstable"))] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn zenoh_session_close_in_background() { + zenoh::init_log_from_env_or("error"); + + let (peer01, peer02) = open_session_unicast(&["tcp/127.0.0.1:17467"]).await; + let close_task_1 = peer01.close().in_background().await; + let close_task_2 = peer02.close().in_background().await; + + let close_all = async move { + close_task_1.await.unwrap().unwrap(); + close_task_2.await.unwrap().unwrap(); + }; + ztimeout!(close_all); +}