Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NBFTReliability #401

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ async-std = { version = "=1.12.0", default-features = false, features = [
"attributes",
"unstable",
] }
async-trait = "0.1.57"
bincode = "1.3.3"
env_logger = "0.10.0"
flume = "0.10.14"
futures = "0.3.25"
log = "0.4.17"
serde = "1.0.149"
zenoh = { version = "0.7.0-rc", path = "../zenoh", default-features = false, features = ["unstable"] }
zenoh-collections = { version = "0.7.0-rc", path = "../commons/zenoh-collections/" }
zenoh-core = { version = "0.7.0-rc", path = "../commons/zenoh-core/" }
zenoh-sync = { version = "0.7.0-rc", path = "../commons/zenoh-sync" }
zenoh-util = { version = "0.7.0-rc", path = "../commons/zenoh-util" }
Expand All @@ -64,3 +66,18 @@ path = "examples/z_member.rs"
[[example]]
name = "z_view_size"
path = "examples/z_view_size.rs"

[[example]]
name = "z_nbftr_sub"
path = "examples/z_nbftr_sub.rs"
required-features = ["unstable"]

[[example]]
name = "z_nbftr_pub"
path = "examples/z_nbftr_pub.rs"
required-features = ["unstable"]

[[example]]
name = "z_nbftr_cache"
path = "examples/z_nbftr_cache.rs"
required-features = ["unstable"]
115 changes: 115 additions & 0 deletions zenoh-ext/examples/z_nbftr_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::sleep;
use clap::{App, Arg};
use futures::prelude::*;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_ext::*;

#[async_std::main]
async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, history, prefix) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap().into_arc();

println!("Declaring NBFTReliabilityCache on {}", key_expr);
let _cache = session
.declare_reliability_cache(key_expr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, the nbftr cache seems to be a generic cache for bounded time-series. Does it introduce anything specific for the nbftr use case? If not, I suggest to have it more generic and the nbftr to simply rely upon it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NBFTReliabilityCache relies on the source_info to discriminate the series.

.history(history)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If history is not configured, what's the default? Is it mandatory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default history is 1024

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of the reliability cache is really dependent on the use case and should be configured accordingly. This will force the user to think how big or small it has to be. Similar thing applies for the publisher and subscriber. Should we make it mandatory?

.queryable_prefix(prefix)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does queryable_prefix do? In what does it differ from the key_expr used in the declare_reliability_cache?

Copy link
Contributor Author

@OlivierHecart OlivierHecart Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As described here data published on key k by a publisher with id <publisher_id>, will be made available by the cache for queries on <publisher_id>/k. So by changing this prefix you indicate for what publishers this cache is storing data (see here). Typically the cache attached to a NFTReliablePublisher will have the publisher id as prefix to only store data from this publisher while a cache deployed in the infrastructure will have a * prefix to store data from all publishers.
The code was partially inspired from the existing publication cache and names were taken from there. But I agree better names could be found.

.res()
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
let _ = stdin.read_exact(&mut input).await;
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
}

fn parse_args() -> (Config, String, usize, String) {
let args = App::new("zenoh-ext non blocking fault tolerant reliability cache example")
.arg(
Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode (peer by default).")
.possible_values(["peer", "client"]),
)
.arg(Arg::from_usage(
"-e, --connect=[ENDPOINT]... 'Endpoints to connect to.'",
))
.arg(Arg::from_usage(
"-l, --listen=[ENDPOINT]... 'Endpoints to listen on.'",
))
.arg(
Arg::from_usage("-k, --key=[KEYEXPR] 'The key expression to subscribe onto'")
.default_value("demo/example/**"),
)
.arg(Arg::from_usage(
"-c, --config=[FILE] 'A configuration file.'",
))
.arg(
Arg::from_usage("-h, --history=[SIZE] 'The number of publications to keep in cache'")
.default_value("1024"),
)
.arg(
Arg::from_usage("-x, --prefix=[STRING] 'The id of publishers to cache'")
.default_value("*"),
)
.arg(Arg::from_usage(
"--no-multicast-scouting 'Disable the multicast-based scouting mechanism.'",
))
.get_matches();

let mut config = if let Some(conf_file) = args.value_of("config") {
Config::from_file(conf_file).unwrap()
} else {
Config::default()
};
if let Some(Ok(mode)) = args.value_of("mode").map(|mode| mode.parse()) {
config.set_mode(Some(mode)).unwrap();
}
if let Some(values) = args.values_of("connect") {
config
.connect
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.values_of("listen") {
config
.listen
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if args.is_present("no-multicast-scouting") {
config.scouting.multicast.set_enabled(Some(false)).unwrap();
}

let key_expr = args.value_of("key").unwrap().to_string();
let history: usize = args.value_of("history").unwrap().parse().unwrap();
let prefix = args.value_of("prefix").unwrap().to_string();

(config, key_expr, history, prefix)
}
113 changes: 113 additions & 0 deletions zenoh-ext/examples/z_nbftr_pub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::sleep;
use clap::{App, Arg};
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_ext::*;

#[async_std::main]
async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value, cache, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring NBFTReliablePublisher on {}", &key_expr);
let publ = session
.declare_reliable_publisher(&key_expr)
.with_cache(cache)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I was expecting with_cache to accept a cache object created by declare_reliability_cache. Instead, it seems to take a bool. This is a bit counter-intuitive for the with_X pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cache associated to a publisher will be preconfigured for it. It will typically have the publisher id as queryable prefix. So taking a cache declared independently does not seem the best way to me. Still a better name for the function could be found...

.history(history)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If history is not configured, what's the default? Is it mandatory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default history is 1024

.res()
.await
.unwrap();

for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{:4}] {}", idx, value);
println!("Pub Data ('{}': '{}')", &key_expr, buf);
publ.put(buf).res().await.unwrap();
}
}

fn parse_args() -> (Config, String, String, bool, usize) {
let args = App::new("zenoh-ext non blocking fault tolerant reliable publisher example")
.arg(
Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode (peer by default).")
.possible_values(["peer", "client"]),
)
.arg(Arg::from_usage(
"-e, --connect=[ENDPOINT]... 'Endpoints to connect to.'",
))
.arg(Arg::from_usage(
"-l, --listen=[ENDPOINT]... 'Endpoints to listen on.'",
))
.arg(
Arg::from_usage("-k, --key=[KEYEXPR] 'The key expression to publish.'")
.default_value("demo/example/zenoh-rs-pub"),
)
.arg(
Arg::from_usage("-v, --value=[VALUE] 'The value to publish.'")
.default_value("Pub from Rust!"),
)
.arg(Arg::from_usage(
"-n, --no-cache 'Disable local reliability cache'",
))
.arg(
Arg::from_usage("-h, --history=[SIZE] 'The number of publications to keep in cache'")
.default_value("1024"),
)
.arg(Arg::from_usage(
"-c, --config=[FILE] 'A configuration file.'",
))
.arg(Arg::from_usage(
"--no-multicast-scouting 'Disable the multicast-based scouting mechanism.'",
))
.get_matches();

let mut config = if let Some(conf_file) = args.value_of("config") {
Config::from_file(conf_file).unwrap()
} else {
Config::default()
};
if let Some(Ok(mode)) = args.value_of("mode").map(|mode| mode.parse()) {
config.set_mode(Some(mode)).unwrap();
}
if let Some(values) = args.values_of("connect") {
config
.connect
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if let Some(values) = args.values_of("listen") {
config
.listen
.endpoints
.extend(values.map(|v| v.parse().unwrap()))
}
if args.is_present("no-multicast-scouting") {
config.scouting.multicast.set_enabled(Some(false)).unwrap();
}

let key_expr = args.value_of("key").unwrap().to_string();
let value = args.value_of("value").unwrap().to_string();
let cache = !args.is_present("no-cache");
let history: usize = args.value_of("history").unwrap().parse().unwrap();

(config, key_expr, value, cache, history)
}
Loading