Skip to content

Commit

Permalink
Merge/main into dev/1.0.0 (#1279)
Browse files Browse the repository at this point in the history
* Add NOTE for LowLatency transport. (#1088)

Signed-off-by: ChenYing Kuo <[email protected]>

* fix: Improve debug messages in `zenoh-transport` (#1090)

* fix: Improve debug messages for failing RX/TX tasks

* fix: Improve debug message for `accept_link` timeout

* chore: Fix `clippy::redundant_pattern_matching` error

* Improve pipeline backoff (#1097)

* Yield task for backoff

* Improve comments and error handling in backoff

* Simplify pipeline pull

* Consider backoff configuration

* Add typos check to CI (#1065)

* Fix typos

* Add typos check to CI

* Start link tx_task before notifying router (#1098)

* Fix typos (#1110)

* bump quinn & rustls (#1086)

* bump quinn & rustls

* fix ci windows check

* add comments

* Fix interface name scanning when listening on IP unspecified for TCP/TLS/QUIC/WS (#1123)

Co-authored-by: Julien Enoch <[email protected]>

* Enable releasing from any branch (#1136)

* Fix cargo clippy (#1145)

* Release tables locks before propagating subscribers and queryables declarations to void dead locks (#1150)

* Send simple sub and qabl declarations using a given function

* Send simple sub and qabl declarations after releasing tables lock

* Send simple sub and qabl declarations after releasing tables lock (missing places)

* feat: make `TerminatableTask` terminate itself when dropped (#1151)

* Fix bug in keyexpr::includes leading to call get_unchecked on empty array UB (#1208)

* REST plugin uses unbounded flume channels for queries (#1213)

* fix: typo in selector.rs (#1228)

* fix: zenohd --cfg (#1263)

* fix: zenohd --cfg

* ci: trigger

* Update zenohd/src/main.rs

---------

Co-authored-by: Luca Cominardi <[email protected]>

* Fix failover brokering bug reacting to linkstate changes (#1272)

* Change missleading log

* Fix failover brokering bug reacting to linkstate changes

* Retrigger CI

---------

Co-authored-by: Luca Cominardi <[email protected]>

* Code format

* Fix clippy warnings

* Code format

* Fix Clippy errors from Rust 1.80 (#1273)

* Allow unexpected `doc_auto_cfg` flag

* Keep never-constructed logger interceptor

* Ignore interior mutability of `Resource`

* Fix typo

* Resolve `clippy::doc-lazy-continuation` errors

* Upgrade `[email protected]` to `[email protected]`

See time-rs/time#693

* Update Cargo.toml (#1277)

Updated description to be aligned with what we use everywhere else

* Merge ci.yaml

---------

Signed-off-by: ChenYing Kuo <[email protected]>
Co-authored-by: ChenYing Kuo (CY) <[email protected]>
Co-authored-by: Mahmoud Mazouz <[email protected]>
Co-authored-by: Tavo Annus <[email protected]>
Co-authored-by: JLer <[email protected]>
Co-authored-by: Julien Enoch <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Yuyuan Yuan <[email protected]>
Co-authored-by: Diogo Matsubara <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: kydos <[email protected]>
  • Loading branch information
11 people authored Jul 30, 2024
1 parent 86f0848 commit 502d3be
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ authors = [
edition = "2021"
license = "EPL-2.0 OR Apache-2.0"
categories = ["network-programming"]
description = "Zenoh: Zero Overhead Pub/sub, Store/Query and Compute."
description = "Zenoh: The Zero Overhead Pub/Sub/Query Protocol."

# DEFAULT-FEATURES NOTE: Be careful with default-features and additivity!
# (https://github.com/rust-lang/cargo/issues/11329)
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-keyexpr/src/key_expr/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl keyexpr {
/// For instance, if `self` is `"a/**/c/*" and `prefix` is `a/b/c` then:
/// - the `prefix` matches `"a/**/c"` leading to a result of `"*"` when stripped from `self`
/// - the `prefix` matches `"a/**"` leading to a result of `"**/c/*"` when stripped from `self`
///
/// So the result is `["*", "**/c/*"]`.
/// If `prefix` cannot match the beginning of `self`, an empty list is reuturned.
///
Expand Down
26 changes: 17 additions & 9 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ impl TaskController {
}

pub struct TerminatableTask {
handle: JoinHandle<()>,
handle: Option<JoinHandle<()>>,
token: CancellationToken,
}

impl Drop for TerminatableTask {
fn drop(&mut self) {
self.terminate(std::time::Duration::from_secs(10));
}
}

impl TerminatableTask {
pub fn create_cancellation_token() -> CancellationToken {
CancellationToken::new()
Expand All @@ -146,7 +152,7 @@ impl TerminatableTask {
T: Send + 'static,
{
TerminatableTask {
handle: rt.spawn(future.map(|_f| ())),
handle: Some(rt.spawn(future.map(|_f| ()))),
token,
}
}
Expand All @@ -167,24 +173,26 @@ impl TerminatableTask {
};

TerminatableTask {
handle: rt.spawn(task),
handle: Some(rt.spawn(task)),
token,
}
}

/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
pub fn terminate(&mut self, timeout: Duration) -> bool {
ResolveFuture::new(async move { self.terminate_async(timeout).await }).wait()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
self.token.cancel();
if tokio::time::timeout(timeout, self.handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
if let Some(handle) = self.handle.take() {
if tokio::time::timeout(timeout, handle).await.is_err() {
tracing::error!("Failed to terminate the task");
return false;
};
}
true
}
}
3 changes: 2 additions & 1 deletion plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.state()
.0
.get(Selector::borrowed(&key_expr, &parameters))
.consolidation(consolidation);
.consolidation(consolidation)
.with(flume::unbounded());
if !body.is_empty() {
let encoding: Encoding = req
.content_type()
Expand Down
1 change: 0 additions & 1 deletion zenoh-ext/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//

//! To manage groups and group memberships
use std::{
collections::HashMap,
convert::TryInto,
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl<'a> PublicationCache<'a> {
let PublicationCache {
_queryable,
local_sub,
task,
mut task,
} = self;
_queryable.undeclare().await?;
local_sub.undeclare().await?;
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use std::{
convert::{TryFrom, TryInto},
future::{IntoFuture, Ready},
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ fn propagate_simple_queryable(
.local_qabls
.insert(res.clone(), (id, info));
let key_expr = Resource::decl_key(res, &mut dst_face);
println!("Decled key = {key_expr:?}");
send_declare(
&dst_face.primitives,
RoutingContext::with_expr(
Expand Down
3 changes: 1 addition & 2 deletions zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ struct HatTables {

impl Drop for HatTables {
fn drop(&mut self) {
if self.linkstatepeers_trees_task.is_some() {
let task = self.linkstatepeers_trees_task.take().unwrap();
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,10 @@ struct HatTables {

impl Drop for HatTables {
fn drop(&mut self) {
if self.linkstatepeers_trees_task.is_some() {
let task = self.linkstatepeers_trees_task.take().unwrap();
if let Some(mut task) = self.linkstatepeers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
if self.routers_trees_task.is_some() {
let task = self.routers_trees_task.take().unwrap();
if let Some(mut task) = self.routers_trees_task.take() {
task.terminate(Duration::from_secs(10));
}
}
Expand Down Expand Up @@ -253,7 +251,9 @@ impl HatTables {
.as_ref()
.map(|net| {
let links = net.get_links(peer1);
HatTables::failover_brokering_to(links, peer2)
let res = HatTables::failover_brokering_to(links, peer2);
tracing::trace!("failover_brokering {} {} : {}", peer1, peer2, res);
res
})
.unwrap_or(false)
}
Expand Down
5 changes: 5 additions & 0 deletions zenohd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ fn config_from_args(args: &Args) -> Config {
}
Err(e) => tracing::warn!("Couldn't perform configuration {}: {}", json, e),
}
} else {
panic!(
"--cfg accepts KEY:VALUE pairs. {} is not a valid KEY:VALUE pair.",
json
)
}
}
tracing::debug!("Config: {:?}", &config);
Expand Down

0 comments on commit 502d3be

Please sign in to comment.