From 502d3be20b78f2873ec069cbcd5fefd308ecf5c7 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 30 Jul 2024 15:26:32 +0200 Subject: [PATCH] Merge/main into dev/1.0.0 (#1279) * Add NOTE for LowLatency transport. (#1088) Signed-off-by: ChenYing Kuo * fix: Improve debug messages in `zenoh-transport` (#1090) * fix: Improve debug messages for failing RX/TX tasks * fix: Improve debug message for `accept_link` timeout * chore: Fix `clippy::redundant_pattern_matching` error * Improve pipeline backoff (#1097) * Yield task for backoff * Improve comments and error handling in backoff * Simplify pipeline pull * Consider backoff configuration * Add typos check to CI (#1065) * Fix typos * Add typos check to CI * Start link tx_task before notifying router (#1098) * Fix typos (#1110) * bump quinn & rustls (#1086) * bump quinn & rustls * fix ci windows check * add comments * Fix interface name scanning when listening on IP unspecified for TCP/TLS/QUIC/WS (#1123) Co-authored-by: Julien Enoch * Enable releasing from any branch (#1136) * Fix cargo clippy (#1145) * Release tables locks before propagating subscribers and queryables declarations to void dead locks (#1150) * Send simple sub and qabl declarations using a given function * Send simple sub and qabl declarations after releasing tables lock * Send simple sub and qabl declarations after releasing tables lock (missing places) * feat: make `TerminatableTask` terminate itself when dropped (#1151) * Fix bug in keyexpr::includes leading to call get_unchecked on empty array UB (#1208) * REST plugin uses unbounded flume channels for queries (#1213) * fix: typo in selector.rs (#1228) * fix: zenohd --cfg (#1263) * fix: zenohd --cfg * ci: trigger * Update zenohd/src/main.rs --------- Co-authored-by: Luca Cominardi * Fix failover brokering bug reacting to linkstate changes (#1272) * Change missleading log * Fix failover brokering bug reacting to linkstate changes * Retrigger CI --------- Co-authored-by: Luca Cominardi * Code format * Fix clippy warnings * Code format * Fix Clippy errors from Rust 1.80 (#1273) * Allow unexpected `doc_auto_cfg` flag * Keep never-constructed logger interceptor * Ignore interior mutability of `Resource` * Fix typo * Resolve `clippy::doc-lazy-continuation` errors * Upgrade `time@0.3.28` to `time@0.3.36` See https://github.com/time-rs/time/issues/693 * Update Cargo.toml (#1277) Updated description to be aligned with what we use everywhere else * Merge ci.yaml --------- Signed-off-by: ChenYing Kuo Co-authored-by: ChenYing Kuo (CY) Co-authored-by: Mahmoud Mazouz Co-authored-by: Tavo Annus Co-authored-by: JLer Co-authored-by: Julien Enoch Co-authored-by: OlivierHecart Co-authored-by: Yuyuan Yuan Co-authored-by: Diogo Matsubara Co-authored-by: OlivierHecart Co-authored-by: kydos --- Cargo.toml | 2 +- .../zenoh-keyexpr/src/key_expr/borrowed.rs | 1 + commons/zenoh-task/src/lib.rs | 26 ++++++++++++------- plugins/zenoh-plugin-rest/src/lib.rs | 3 ++- zenoh-ext/src/group.rs | 1 - zenoh-ext/src/publication_cache.rs | 2 +- zenoh/src/api/key_expr.rs | 1 - zenoh/src/net/routing/hat/client/queries.rs | 1 - .../src/net/routing/hat/linkstate_peer/mod.rs | 3 +-- zenoh/src/net/routing/hat/router/mod.rs | 10 +++---- zenohd/src/main.rs | 5 ++++ 11 files changed, 33 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 254cdc19b9..b686656f77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ authors = [ edition = "2021" license = "EPL-2.0 OR Apache-2.0" categories = ["network-programming"] -description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute." +description = "Zenoh: The Zero Overhead Pub/Sub/Query Protocol." # DEFAULT-FEATURES NOTE: Be careful with default-features and additivity! # (https://github.com/rust-lang/cargo/issues/11329) diff --git a/commons/zenoh-keyexpr/src/key_expr/borrowed.rs b/commons/zenoh-keyexpr/src/key_expr/borrowed.rs index a98337b987..e2afa9712f 100644 --- a/commons/zenoh-keyexpr/src/key_expr/borrowed.rs +++ b/commons/zenoh-keyexpr/src/key_expr/borrowed.rs @@ -187,6 +187,7 @@ impl keyexpr { /// For instance, if `self` is `"a/**/c/*" and `prefix` is `a/b/c` then: /// - the `prefix` matches `"a/**/c"` leading to a result of `"*"` when stripped from `self` /// - the `prefix` matches `"a/**"` leading to a result of `"**/c/*"` when stripped from `self` + /// /// So the result is `["*", "**/c/*"]`. /// If `prefix` cannot match the beginning of `self`, an empty list is reuturned. /// diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index 7eab9d316f..2a06b56b5c 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -129,10 +129,16 @@ impl TaskController { } pub struct TerminatableTask { - handle: JoinHandle<()>, + handle: Option>, token: CancellationToken, } +impl Drop for TerminatableTask { + fn drop(&mut self) { + self.terminate(std::time::Duration::from_secs(10)); + } +} + impl TerminatableTask { pub fn create_cancellation_token() -> CancellationToken { CancellationToken::new() @@ -146,7 +152,7 @@ impl TerminatableTask { T: Send + 'static, { TerminatableTask { - handle: rt.spawn(future.map(|_f| ())), + handle: Some(rt.spawn(future.map(|_f| ()))), token, } } @@ -167,24 +173,26 @@ impl TerminatableTask { }; TerminatableTask { - handle: rt.spawn(task), + handle: Some(rt.spawn(task)), token, } } /// Attempts to terminate the task. /// Returns true if task completed / aborted within timeout duration, false otherwise. - pub fn terminate(self, timeout: Duration) -> bool { + pub fn terminate(&mut self, timeout: Duration) -> bool { ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait() } /// Async version of [`TerminatableTask::terminate()`]. - pub async fn terminate_async(self, timeout: Duration) -> bool { + pub async fn terminate_async(&mut self, timeout: Duration) -> bool { self.token.cancel(); - if tokio::time::timeout(timeout, self.handle).await.is_err() { - tracing::error!("Failed to terminate the task"); - return false; - }; + if let Some(handle) = self.handle.take() { + if tokio::time::timeout(timeout, handle).await.is_err() { + tracing::error!("Failed to terminate the task"); + return false; + }; + } true } } diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 7ef21ace7c..898e9ae2df 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -402,7 +402,8 @@ async fn query(mut req: Request<(Arc, String)>) -> tide::Result PublicationCache<'a> { let PublicationCache { _queryable, local_sub, - task, + mut task, } = self; _queryable.undeclare().await?; local_sub.undeclare().await?; diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index 50ce79180b..81d9aecc20 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // - use std::{ convert::{TryFrom, TryInto}, future::{IntoFuture, Ready}, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 0c394da851..8ef3ec1fb7 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -105,7 +105,6 @@ fn propagate_simple_queryable( .local_qabls .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); - println!("Decled key = {key_expr:?}"); send_declare( &dst_face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index ded87f18ee..f861e1bed3 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -126,8 +126,7 @@ struct HatTables { impl Drop for HatTables { fn drop(&mut self) { - if self.linkstatepeers_trees_task.is_some() { - let task = self.linkstatepeers_trees_task.take().unwrap(); + if let Some(mut task) = self.linkstatepeers_trees_task.take() { task.terminate(Duration::from_secs(10)); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 4f3a6ab62b..cf7d1d14b6 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -134,12 +134,10 @@ struct HatTables { impl Drop for HatTables { fn drop(&mut self) { - if self.linkstatepeers_trees_task.is_some() { - let task = self.linkstatepeers_trees_task.take().unwrap(); + if let Some(mut task) = self.linkstatepeers_trees_task.take() { task.terminate(Duration::from_secs(10)); } - if self.routers_trees_task.is_some() { - let task = self.routers_trees_task.take().unwrap(); + if let Some(mut task) = self.routers_trees_task.take() { task.terminate(Duration::from_secs(10)); } } @@ -253,7 +251,9 @@ impl HatTables { .as_ref() .map(|net| { let links = net.get_links(peer1); - HatTables::failover_brokering_to(links, peer2) + let res = HatTables::failover_brokering_to(links, peer2); + tracing::trace!("failover_brokering {} {} : {}", peer1, peer2, res); + res }) .unwrap_or(false) } diff --git a/zenohd/src/main.rs b/zenohd/src/main.rs index 60d898d84f..d25260d606 100644 --- a/zenohd/src/main.rs +++ b/zenohd/src/main.rs @@ -264,6 +264,11 @@ fn config_from_args(args: &Args) -> Config { } Err(e) => tracing::warn!("Couldn't perform configuration {}: {}", json, e), } + } else { + panic!( + "--cfg accepts KEY:VALUE pairs. {} is not a valid KEY:VALUE pair.", + json + ) } } tracing::debug!("Config: {:?}", &config);