diff --git a/Cargo.toml b/Cargo.toml index 254cdc19b9..b686656f77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ authors = [ edition = "2021" license = "EPL-2.0 OR Apache-2.0" categories = ["network-programming"] -description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute." +description = "Zenoh: The Zero Overhead Pub/Sub/Query Protocol." # DEFAULT-FEATURES NOTE: Be careful with default-features and additivity! # (https://github.com/rust-lang/cargo/issues/11329) diff --git a/commons/zenoh-keyexpr/src/key_expr/borrowed.rs b/commons/zenoh-keyexpr/src/key_expr/borrowed.rs index a98337b987..e2afa9712f 100644 --- a/commons/zenoh-keyexpr/src/key_expr/borrowed.rs +++ b/commons/zenoh-keyexpr/src/key_expr/borrowed.rs @@ -187,6 +187,7 @@ impl keyexpr { /// For instance, if `self` is `"a/**/c/*" and `prefix` is `a/b/c` then: /// - the `prefix` matches `"a/**/c"` leading to a result of `"*"` when stripped from `self` /// - the `prefix` matches `"a/**"` leading to a result of `"**/c/*"` when stripped from `self` + /// /// So the result is `["*", "**/c/*"]`. /// If `prefix` cannot match the beginning of `self`, an empty list is reuturned. /// diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index 7eab9d316f..2a06b56b5c 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -129,10 +129,16 @@ impl TaskController { } pub struct TerminatableTask { - handle: JoinHandle<()>, + handle: Option>, token: CancellationToken, } +impl Drop for TerminatableTask { + fn drop(&mut self) { + self.terminate(std::time::Duration::from_secs(10)); + } +} + impl TerminatableTask { pub fn create_cancellation_token() -> CancellationToken { CancellationToken::new() @@ -146,7 +152,7 @@ impl TerminatableTask { T: Send + 'static, { TerminatableTask { - handle: rt.spawn(future.map(|_f| ())), + handle: Some(rt.spawn(future.map(|_f| ()))), token, } } @@ -167,24 +173,26 @@ impl TerminatableTask { }; TerminatableTask { - handle: rt.spawn(task), + handle: Some(rt.spawn(task)), token, } } /// Attempts to terminate the task. /// Returns true if task completed / aborted within timeout duration, false otherwise. - pub fn terminate(self, timeout: Duration) -> bool { + pub fn terminate(&mut self, timeout: Duration) -> bool { ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait() } /// Async version of [`TerminatableTask::terminate()`]. - pub async fn terminate_async(self, timeout: Duration) -> bool { + pub async fn terminate_async(&mut self, timeout: Duration) -> bool { self.token.cancel(); - if tokio::time::timeout(timeout, self.handle).await.is_err() { - tracing::error!("Failed to terminate the task"); - return false; - }; + if let Some(handle) = self.handle.take() { + if tokio::time::timeout(timeout, handle).await.is_err() { + tracing::error!("Failed to terminate the task"); + return false; + }; + } true } } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 7ef21ace7c..898e9ae2df 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -402,7 +402,8 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result PublicationCache<'a> { let PublicationCache { _queryable, local_sub, - task, + mut task, } = self; _queryable.undeclare().await?; local_sub.undeclare().await?; diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 50ce79180b..81d9aecc20 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // - use std::{ convert::{TryFrom, TryInto}, future::{IntoFuture, Ready}, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 0c394da851..8ef3ec1fb7 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -105,7 +105,6 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); - println!("Decled key = {key_expr:?}"); send_declare( &dst_face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index ded87f18ee..f861e1bed3 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -126,8 +126,7 @@ struct HatTables { impl Drop for HatTables { fn drop(&mut self) { - if self.linkstatepeers_trees_task.is_some() { - let task = self.linkstatepeers_trees_task.take().unwrap(); + if let Some(mut task) = self.linkstatepeers_trees_task.take() { task.terminate(Duration::from_secs(10)); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 4f3a6ab62b..cf7d1d14b6 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -134,12 +134,10 @@ struct HatTables { impl Drop for HatTables { fn drop(&mut self) { - if self.linkstatepeers_trees_task.is_some() { - let task = self.linkstatepeers_trees_task.take().unwrap(); + if let Some(mut task) = self.linkstatepeers_trees_task.take() { task.terminate(Duration::from_secs(10)); } - if self.routers_trees_task.is_some() { - let task = self.routers_trees_task.take().unwrap(); + if let Some(mut task) = self.routers_trees_task.take() { task.terminate(Duration::from_secs(10)); } } @@ -253,7 +251,9 @@ impl HatTables { .as_ref() .map(|net| { let links = net.get_links(peer1); - HatTables::failover_brokering_to(links, peer2) + let res = HatTables::failover_brokering_to(links, peer2); + tracing::trace!("failover_brokering {} {} : {}", peer1, peer2, res); + res }) .unwrap_or(false) } diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 60d898d84f..d25260d606 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -264,6 +264,11 @@ fn config_from_args(args: &Args) -> Config { } Err(e) => tracing::warn!("Couldn't perform configuration {}: {}", json, e), } + } else { + panic!( + "--cfg accepts KEY:VALUE pairs. {} is not a valid KEY:VALUE pair.", + json + ) } } tracing::debug!("Config: {:?}", &config);