Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close builder #1576

Merged
merged 20 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,21 +110,23 @@ impl TaskController {
/// The call blocks until all tasks yield or timeout duration expires.
/// Returns 0 in case of success, number of non terminated tasks otherwise.
pub fn terminate_all(&self, timeout: Duration) -> usize {
ResolveFuture::new(async move { self.terminate_all_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_all_async())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
}
self.tracker.len()
})
.wait()
}

/// Async version of [`TaskController::terminate_all()`].
pub async fn terminate_all_async(&self, timeout: Duration) -> usize {
pub async fn terminate_all_async(&self) {
self.tracker.close();
self.token.cancel();
if tokio::time::timeout(timeout, self.tracker.wait())
.await
.is_err()
{
tracing::error!("Failed to terminate {} tasks", self.tracker.len());
return self.tracker.len();
}
0
self.tracker.wait().await
}
}

Expand Down Expand Up @@ -181,18 +183,24 @@ impl TerminatableTask {
/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
ResolveFuture::new(async move {
if tokio::time::timeout(timeout, self.terminate_async())
.await
.is_err()
{
tracing::error!("Failed to terminate the task");
return false;
};
true
})
.wait()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self) {
self.token.cancel();
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
let _ = handle.await;
}
true
}
}
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,7 @@ impl TransportManager {

pub async fn close(&self) {
self.close_unicast().await;
self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;
}

/*************************************/
Expand Down
4 changes: 1 addition & 3 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,7 @@ impl TransportMulticastInner {
cb.closed();
}

self.task_controller
.terminate_all_async(Duration::from_secs(10))
.await;
self.task_controller.terminate_all_async().await;

Ok(())
}
Expand Down
152 changes: 152 additions & 0 deletions zenoh/src/api/builders/close.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
future::{Future, IntoFuture},
pin::Pin,
time::Duration,
};

use async_trait::async_trait;
use zenoh_core::{Resolvable, Wait};
use zenoh_result::ZResult;
use zenoh_runtime::ZRuntime;

/// A builder for close operations.
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
pub struct CloseBuilder<TCloseable: Closeable> {
closee: TCloseable::TClosee,
timeout: Duration,
}

// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TCloseable: Closeable> CloseBuilder<TCloseable> {
pub(crate) fn new(closeable: &'_ TCloseable) -> Self {
Self {
closee: closeable.get_closee(),
timeout: Duration::from_secs(10),
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// Set the timeout for close operation
///
/// # Arguments
///
/// * `timeout` - The timeout value for close operation (10s by default)
///
#[doc(hidden)]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// Run Close operation concurrently
#[doc(hidden)]
pub fn in_background(
self,
) -> BackgroundCloseBuilder<<CloseBuilder<TCloseable> as Resolvable>::To> {
BackgroundCloseBuilder::new(self.into_future())
}
}

impl<TCloseable: Closeable> Resolvable for CloseBuilder<TCloseable> {
type To = ZResult<()>;
}

impl<TCloseable: Closeable> Wait for CloseBuilder<TCloseable> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

impl<TCloseable: Closeable> IntoFuture for CloseBuilder<TCloseable> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(
async move {
if tokio::time::timeout(self.timeout, self.closee.close_inner())
.await
.is_err()
{
bail!("close operation timed out!")
}
Ok(())
}
.into_future(),
)
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
/// A builder for close operations running in background
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[doc(hidden)]
#[allow(private_bounds)]
pub struct BackgroundCloseBuilder<TOutput: Send + 'static> {
inner: Pin<Box<dyn Future<Output = TOutput> + Send>>,
}

#[cfg(all(feature = "unstable", feature = "internal"))]
#[doc(hidden)]
// NOTE: `Closeable` is only pub(crate) because it is zenoh-internal trait, so we don't
// care about the `private_bounds` lint in this particular case.
#[allow(private_bounds)]
impl<TOutput: Send + 'static> BackgroundCloseBuilder<TOutput> {
fn new(inner: Pin<Box<dyn Future<Output = TOutput> + Send>>) -> Self {
Self { inner }
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> Resolvable for BackgroundCloseBuilder<TOutput> {
type To = tokio::task::JoinHandle<TOutput>;
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> Wait for BackgroundCloseBuilder<TOutput> {
fn wait(self) -> Self::To {
ZRuntime::Application.block_in_place(self.into_future())
}
}

#[cfg(all(feature = "unstable", feature = "internal"))]
impl<TOutput: Send + 'static> IntoFuture for BackgroundCloseBuilder<TOutput> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Pin<Box<dyn Future<Output = <Self as IntoFuture>::Output> + Send>>;

// NOTE: yes, we need to return a future that returns JoinHandle
#[allow(clippy::async_yields_async)]
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move { ZRuntime::Application.spawn(self.inner) }.into_future())
}
}

#[async_trait]
pub(crate) trait Closee: Send + Sync + 'static {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
async fn close_inner(&self);
}

pub(crate) trait Closeable {
type TClosee: Closee;
fn get_closee(&self) -> Self::TClosee;
}
1 change: 1 addition & 0 deletions zenoh/src/api/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub(crate) mod close;
pub(crate) mod info;
pub(crate) mod matching_listener;
pub(crate) mod publisher;
Expand Down
94 changes: 53 additions & 41 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use tracing::{error, info, trace, warn};
use uhlc::Timestamp;
#[cfg(feature = "internal")]
Expand Down Expand Up @@ -67,6 +68,7 @@ use zenoh_result::ZResult;
use zenoh_shm::api::client_storage::ShmClientStorage;
use zenoh_task::TaskController;

use super::builders::close::{CloseBuilder, Closeable, Closee};
#[cfg(feature = "unstable")]
use crate::api::selector::ZenohParameters;
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -733,8 +735,8 @@ impl Session {
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
pub fn close(&self) -> CloseBuilder<Self> {
CloseBuilder::new(self)
}

/// Check if the session has been closed.
Expand Down Expand Up @@ -1236,50 +1238,12 @@ impl Session {
})
}
}

impl SessionInner {
pub fn zid(&self) -> ZenohId {
self.runtime.zid()
}

fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
if self.owns_runtime {
info!(zid = %self.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
} else {
primitives.send_close();
}
// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
Ok(())
})
}

pub(crate) fn declare_prefix<'a>(
&'a self,
prefix: &'a str,
Expand Down Expand Up @@ -3146,3 +3110,51 @@ where
{
OpenBuilder::new(config)
}

#[async_trait]
impl Closee for Arc<SessionInner> {
async fn close_inner(&self) {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return;
};

if self.owns_runtime {
info!(zid = %self.zid(), "close session");
self.task_controller.terminate_all_async().await;
self.runtime.get_closee().close_inner().await;
} else {
self.task_controller.terminate_all_async().await;
primitives.send_close();
}

// defer the cleanup of internal data structures by taking them out of the locked state
// this is needed because callbacks may contain entities which need to acquire the
// lock to be dropped, so callback must be dropped without the lock held
let mut state = zwrite!(self.state);
let _queryables = std::mem::take(&mut state.queryables);
let _subscribers = std::mem::take(&mut state.subscribers);
let _liveliness_subscribers = std::mem::take(&mut state.liveliness_subscribers);
let _local_resources = std::mem::take(&mut state.local_resources);
let _remote_resources = std::mem::take(&mut state.remote_resources);
drop(state);
#[cfg(feature = "unstable")]
{
// the lock from the outer scope cannot be reused because the declared variables
// would be undeclared at the end of the block, with the lock held, and we want
// to avoid that; so we reacquire the lock in the block
// anyway, it doesn't really matter, and this code will be cleaned up when the APIs
// will be stabilized.
let mut state = zwrite!(self.state);
let _matching_listeners = std::mem::take(&mut state.matching_listeners);
drop(state);
}
}
}

impl Closeable for Session {
type TClosee = Arc<SessionInner>;

fn get_closee(&self) -> Self::TClosee {
self.0.clone()
}
}
1 change: 1 addition & 0 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub mod session {
pub use crate::api::builders::session::{init, InitBuilder};
pub use crate::api::{
builders::{
close::CloseBuilder,
info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, ZenohIdBuilder},
publisher::{SessionDeleteBuilder, SessionPutBuilder},
query::SessionGetBuilder,
Expand Down
Loading
Loading