Skip to content

Commit

Permalink
Refactor Liveliness implementation (#865)
Browse files Browse the repository at this point in the history
* Router implements interests protocol for clients

* Send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients for pico

* Fix WireExprExt M flag encoding/decoding

* Fix decl_key

* Clients send all samples and queries to routers and peers

* Avoid self declaration loop on interest

* Fix query/replies copy/paste bugs

* Peers implement interests protocol for clients

* Don't send WireExpr in UndeclareSubscriber/UndeclareQueryable to clients

* Add client writer-side filtering (#863)

* Add client writer-side filtering

* Reimplement liveliness with interests

* Fix writer-side filtering before receiving FinalInterest

* Fix pubsub interest based routing after router failover

* refactor: Add boilerplace for new liveliness router hat

* refactor: Handle incoming liveliness token declaration/undeclaration

* refactor: Take `TokenId` id in `HatLivelinessTrait` methods

* refactor: Implement router `HatLivelinessTrait`

* chore: Add copyright headers

* refactor: Implement client `HatLivelinessTrait`

* refactor: Implement liveliness dispatching logic

* refactor: Use `DeclareToken`/`UndeclareToken` in liveliness declaration/undeclaration

* refactor: Implement Token Interest protocol for client & router

* fix: Use correct token state in Liveliness query replies

* chore: Temporarily Allow dead_code and unused_variables in p2p_peer and linkstate_peer hat

* style: Rename `Primitives` to `IngressPrimitives` and `EPrimitives` to `EgressPrimitives`

Ingress means "entering the router" and Egress means "leaving the router".

This change could of course be reverted (through r-a) right before merging
the refactor/liveliness-declaration branch.

It would've been even better to move Session's IngressPrimitives impl to an
EgressPrimitives, but that's not possible since a Session can recieve Query
replies from itself.

Ideally, Session in/egress should be Router e/ingress but the `EPrimitives`
trait breaks this symmetry. Messages leaving the router are not the same as
messages leaving the Session. This could be solved by making the `EgressPrimitives`
trait generic over the message context, but the necessary refactoring would
be a hude undertaking, and I don't have enough time nor familiarity with the
codebase for it.

* ci: Fix `clippy::suspicious_open_options` lint

* chore: Fix naming issue after rebasing against `interests`

* style: Rename liveliness to token

* refactor: Add p2p_peer token implementation

* fix: Set token id to 0 in multicast token declaration

* fix: Mark unused arguments in p2p_peer `HatTokenTrait` impl

* refactor: Add linkstate_peer token implementation

* fix: Declare `Interest::TOKEN` in liveliness subscriber declaration

* wip: Implement liveliness subscriber without liveliness prefix

* Declare message can be Push/Request/RequestContinuous/Response

* Address review comments

* Remove F: Future flag from DeclareInterest

* cargo fmt --all

* Remove unused Interest flags field

* Update doc

* Remove unneeded interest_id field

* Update commons/zenoh-protocol/src/network/declare.rs

* Remove unused UndeclareInterest

* Implement proper Declare Request/Response id correlation

* Add new Interest network message

* Update doc

* Update codec

* Fix stable build

* Fix test_acl

* Fix writer side filtering

* fix: Cleanup token ressources

* fix: Remove multicast token propagation

* fix: Logic error in `Session::undeclare_subscriber_inner`

* fix: Change `log::debug` to `log::trace` in `IngressPrimitives`

* revert: Restore names of `Primitives` and `EPrimitives` methods

* Add separate functions to compute matching status

* Fix unstable imports

* Remove useless checks

* fix: Correctly set `interest_id` field everywhere

* feat: Implement liveliness queries in `Session`

* fix: Discard liveliness query on `DeclareFinal` with known interest id

* feat: Propagate client liveliness queries, send local replies

* feat: Handle liveliness query (final) replies

* fix: Apply Clippy lints from Rust 1.77

* fix: `RwLock` deadlock in `Session::undeclare_subscriber_inner`

* fix: Remove `zenoh::net::routing::PREFIX_LIVELINESS`

The key assumption here is that no key expression
can start with '@/liveliness' anymore.

* fix: Set `ext_qos` to `QoSType::DEFAULT` when for liveliness queries

* fix: Don't immediatly reply with `DeclareFinal` to token interest

* fix: Send `DeclareToken` w/ interest id for `InterestMode::Current`

* fix: Send `DeclareFinal` in response to `Current` token interest

* fix: Set `ext_qos: QoSType::DEFAULT` in routing interest declaration

* fix: Share ownership of `TokenQuery` b/w destination faces

* fix: Remove unused code

* fix: Rustfmt errors with CI config

* test: Liveliness clique/brokered and subscriber/query scenarios

* fix: Don't declare tokens with interest ids for `CurrentFuture`

* fix: Incorrect wire expr for liveliness undeclaration callbacks

* fix: Support liveliness queries to linkstate/p2p peers

* test: Querying & fetching liveliness subscribers

* fix: Incorrect keyexpr in liveliness query replies

* fix: Remove `LivelinessQueryState::key_expr`

* Fix locking

* Only store tokens in response of a Future interest

* Rename interest_id_counter

* Remove dataroute related code from token dispatcher

* Fix client token interest local replies

* Remove no more needed compute_local_replies

* Move TokenQuery to token module

* Fix details

* Use proper id in DeclareToken for p2p_peer

* Remove comments

* Add needed token_new_face functions

* Don't register declares sent as response to a current interest

* Code reorg

* Add missing token_remove_node, token_tree_change and token_linkstate_change

* Fix querying subscriber liveliness test

* Send one shot Undeclares

* Fix clippy warnings

* Add misssing token related code in close_face

* Properly propagate one shot undeclare token

* Fix querying subscriber liveliness test

* Fix invalid interest aggregate option handling

* Fix interest propagation when pub client connecting to peer

* Code reorg

* Peers wait for DeclareFinal from routers before propagating to clients

* Fix InterestFinal propagation

* Only send back DeclareFinal if interest is current

* Terminate liveliness interest for subsystems of peers

* Fix liveliness test includes

* Add liveliness local tests

* Address review comments

* Fix compilation

* Fix stable build

* Apply 2024 copyright to new files (interests new files as well)

* Avoid clones

* Remove unwraps

* Fix Value usage after merge

* Remove useless checks

* Change &Option<T> tp Option<&T>

* Revert wrongly commited change in z_liveliness example

---------

Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
4 people authored Jun 18, 2024
1 parent 338af96 commit a5b4a65
Show file tree
Hide file tree
Showing 38 changed files with 4,359 additions and 716 deletions.
7 changes: 4 additions & 3 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ maintenance = { status = "actively-developed" }
[features]
unstable = []
default = []
shared-memory = [
"zenoh/shared-memory",
]
shared-memory = ["zenoh/shared-memory"]

[dependencies]
tokio = { workspace = true, features = [
Expand All @@ -53,5 +51,8 @@ serde_json = { workspace = true }
zenoh = { workspace = true, features = ["unstable", "internal"], default-features = false }
zenoh-macros = { workspace = true }

[dev-dependencies]
zenoh = { workspace = true, features = ["unstable"], default-features = true }

[package.metadata.docs.rs]
features = ["unstable"]
354 changes: 354 additions & 0 deletions zenoh-ext/tests/liveliness.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh::{
config::{self, EndPoint, WhatAmI},
sample::SampleKind,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_querying_subscriber_clique() {
use std::time::Duration;

use zenoh::{internal::ztimeout, prelude::*};
use zenoh_ext::SubscriberBuilderExt;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const PEER1_ENDPOINT: &str = "udp/localhost:47447";

const LIVELINESS_KEYEXPR_1: &str = "test/liveliness/querying-subscriber/brokered/1";
const LIVELINESS_KEYEXPR_2: &str = "test/liveliness/querying-subscriber/brokered/2";
const LIVELINESS_KEYEXPR_ALL: &str = "test/liveliness/querying-subscriber/brokered/*";

zenoh_util::try_init_log_from_env();

let peer1 = {
let mut c = config::default();
c.listen
.set_endpoints(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (1) ZID: {}", s.zid());
s
};

let peer2 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (2) ZID: {}", s.zid());
s
};

let token1 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_1)).unwrap();
tokio::time::sleep(SLEEP).await;

let sub = ztimeout!(peer1
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_ALL)
.querying())
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_querying_subscriber_brokered() {
use std::time::Duration;

use zenoh::{internal::ztimeout, prelude::*};
use zenoh_ext::SubscriberBuilderExt;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const ROUTER_ENDPOINT: &str = "tcp/localhost:47448";

const LIVELINESS_KEYEXPR_1: &str = "test/liveliness/querying-subscriber/brokered/1";
const LIVELINESS_KEYEXPR_2: &str = "test/liveliness/querying-subscriber/brokered/2";
const LIVELINESS_KEYEXPR_ALL: &str = "test/liveliness/querying-subscriber/brokered/*";

zenoh_util::try_init_log_from_env();

let _router = {
let mut c = config::default();
c.listen
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Router ZID: {}", s.zid());
s
};

let client1 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (1) ZID: {}", s.zid());
s
};

let client2 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (2) ZID: {}", s.zid());
s
};

let client3 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (3) ZID: {}", s.zid());
s
};

let token1 = ztimeout!(client2.liveliness().declare_token(LIVELINESS_KEYEXPR_1)).unwrap();
tokio::time::sleep(SLEEP).await;

let sub = ztimeout!(client1
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_ALL)
.querying())
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_fetching_subscriber_clique() {
use std::time::Duration;

use zenoh::{internal::ztimeout, prelude::*};
use zenoh_ext::SubscriberBuilderExt;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const PEER1_ENDPOINT: &str = "udp/localhost:47449";

const LIVELINESS_KEYEXPR_1: &str = "test/liveliness/querying-subscriber/brokered/1";
const LIVELINESS_KEYEXPR_2: &str = "test/liveliness/querying-subscriber/brokered/2";
const LIVELINESS_KEYEXPR_ALL: &str = "test/liveliness/querying-subscriber/brokered/*";

zenoh_util::try_init_log_from_env();

let peer1 = {
let mut c = config::default();
c.listen
.set_endpoints(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (1) ZID: {}", s.zid());
s
};

let peer2 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![PEER1_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Peer));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Peer (2) ZID: {}", s.zid());
s
};

let token1 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_1)).unwrap();
tokio::time::sleep(SLEEP).await;

let sub = ztimeout!(peer1
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_ALL)
.fetching(|cb| peer1
.liveliness()
.get(LIVELINESS_KEYEXPR_ALL)
.callback(cb)
.wait()))
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(peer2.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_liveliness_fetching_subscriber_brokered() {
use std::time::Duration;

use zenoh::{internal::ztimeout, prelude::*};
use zenoh_ext::SubscriberBuilderExt;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const ROUTER_ENDPOINT: &str = "tcp/localhost:47450";

const LIVELINESS_KEYEXPR_1: &str = "test/liveliness/querying-subscriber/brokered/1";
const LIVELINESS_KEYEXPR_2: &str = "test/liveliness/querying-subscriber/brokered/2";
const LIVELINESS_KEYEXPR_ALL: &str = "test/liveliness/querying-subscriber/brokered/*";

zenoh_util::try_init_log_from_env();

let _router = {
let mut c = config::default();
c.listen
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Router));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Router ZID: {}", s.zid());
s
};

let client1 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (1) ZID: {}", s.zid());
s
};

let client2 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (2) ZID: {}", s.zid());
s
};

let client3 = {
let mut c = config::default();
c.connect
.set_endpoints(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
let _ = c.set_mode(Some(WhatAmI::Client));
let s = ztimeout!(zenoh::open(c)).unwrap();
tracing::info!("Client (3) ZID: {}", s.zid());
s
};

let token1 = ztimeout!(client2.liveliness().declare_token(LIVELINESS_KEYEXPR_1)).unwrap();
tokio::time::sleep(SLEEP).await;

let sub = ztimeout!(client1
.liveliness()
.declare_subscriber(LIVELINESS_KEYEXPR_ALL)
.fetching(|cb| client1
.liveliness()
.get(LIVELINESS_KEYEXPR_ALL)
.callback(cb)
.wait()))
.unwrap();
tokio::time::sleep(SLEEP).await;

let _token2 = ztimeout!(client3.liveliness().declare_token(LIVELINESS_KEYEXPR_2)).unwrap();
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Put);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_2);

drop(token1);
tokio::time::sleep(SLEEP).await;

let sample = ztimeout!(sub.recv_async()).unwrap();
assert_eq!(sample.kind(), SampleKind::Delete);
assert_eq!(sample.key_expr().as_str(), LIVELINESS_KEYEXPR_1);
}
Loading

0 comments on commit a5b4a65

Please sign in to comment.