-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NBFTReliability #401
NBFTReliability #401
Changes from all commits
1d364b3
01e69ac
bfa393a
ed3850d
84ea3a0
38555f6
2b18a11
f3d0f28
b2840ac
64283a7
e74ddde
75433d8
330eb23
72235b2
477ac32
15e39ac
3464907
89957a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
// | ||
// Copyright (c) 2022 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
use async_std::task::sleep; | ||
use clap::{App, Arg}; | ||
use futures::prelude::*; | ||
use std::time::Duration; | ||
use zenoh::config::Config; | ||
use zenoh::prelude::r#async::*; | ||
use zenoh_ext::*; | ||
|
||
#[async_std::main] | ||
async fn main() { | ||
// Initiate logging | ||
env_logger::init(); | ||
|
||
let (config, key_expr, history, prefix) = parse_args(); | ||
|
||
println!("Opening session..."); | ||
let session = zenoh::open(config).res().await.unwrap().into_arc(); | ||
|
||
println!("Declaring NBFTReliabilityCache on {}", key_expr); | ||
let _cache = session | ||
.declare_nbftreliability_cache(key_expr) | ||
.history(history) | ||
.queryable_prefix(prefix) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As described here data published on key |
||
.res() | ||
.await | ||
.unwrap(); | ||
|
||
println!("Enter 'q' to quit..."); | ||
let mut stdin = async_std::io::stdin(); | ||
let mut input = [0_u8]; | ||
loop { | ||
let _ = stdin.read_exact(&mut input).await; | ||
match input[0] { | ||
b'q' => break, | ||
0 => sleep(Duration::from_secs(1)).await, | ||
_ => (), | ||
} | ||
} | ||
} | ||
|
||
fn parse_args() -> (Config, String, usize, String) { | ||
let args = App::new("zenoh-ext non blocking fault tolerant reliability cache example") | ||
.arg( | ||
Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode (peer by default).") | ||
.possible_values(["peer", "client"]), | ||
) | ||
.arg(Arg::from_usage( | ||
"-e, --connect=[ENDPOINT]... 'Endpoints to connect to.'", | ||
)) | ||
.arg(Arg::from_usage( | ||
"-l, --listen=[ENDPOINT]... 'Endpoints to listen on.'", | ||
)) | ||
.arg( | ||
Arg::from_usage("-k, --key=[KEYEXPR] 'The key expression to subscribe onto'") | ||
.default_value("demo/example/**"), | ||
) | ||
.arg(Arg::from_usage( | ||
"-c, --config=[FILE] 'A configuration file.'", | ||
)) | ||
.arg( | ||
Arg::from_usage("-h, --history=[SIZE] 'The number of publications to keep in cache'") | ||
.default_value("1024"), | ||
) | ||
.arg( | ||
Arg::from_usage("-x, --prefix=[STRING] 'The id of publishers to cache'") | ||
.default_value("*"), | ||
) | ||
.arg(Arg::from_usage( | ||
"--no-multicast-scouting 'Disable the multicast-based scouting mechanism.'", | ||
)) | ||
.get_matches(); | ||
|
||
let mut config = if let Some(conf_file) = args.value_of("config") { | ||
Config::from_file(conf_file).unwrap() | ||
} else { | ||
Config::default() | ||
}; | ||
if let Some(Ok(mode)) = args.value_of("mode").map(|mode| mode.parse()) { | ||
config.set_mode(Some(mode)).unwrap(); | ||
} | ||
if let Some(values) = args.values_of("connect") { | ||
config | ||
.connect | ||
.endpoints | ||
.extend(values.map(|v| v.parse().unwrap())) | ||
} | ||
if let Some(values) = args.values_of("listen") { | ||
config | ||
.listen | ||
.endpoints | ||
.extend(values.map(|v| v.parse().unwrap())) | ||
} | ||
if args.is_present("no-multicast-scouting") { | ||
config.scouting.multicast.set_enabled(Some(false)).unwrap(); | ||
} | ||
|
||
let key_expr = args.value_of("key").unwrap().to_string(); | ||
let history: usize = args.value_of("history").unwrap().parse().unwrap(); | ||
let prefix = args.value_of("prefix").unwrap().to_string(); | ||
|
||
(config, key_expr, history, prefix) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// | ||
// Copyright (c) 2022 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
use async_std::task::sleep; | ||
use clap::{App, Arg}; | ||
use std::time::Duration; | ||
use zenoh::config::Config; | ||
use zenoh::prelude::r#async::*; | ||
use zenoh_ext::*; | ||
|
||
#[async_std::main] | ||
async fn main() { | ||
// Initiate logging | ||
env_logger::init(); | ||
|
||
let (config, key_expr, value, cache, history) = parse_args(); | ||
|
||
println!("Opening session..."); | ||
let session = zenoh::open(config).res().await.unwrap(); | ||
|
||
println!("Declaring NBFTReliablePublisher on {}", &key_expr); | ||
let publ = session | ||
.declare_nbftreliable_publisher(&key_expr) | ||
.with_cache(cache) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I was expecting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A cache associated to a publisher will be preconfigured for it. It will typically have the publisher id as queryable prefix. So taking a cache declared independently does not seem the best way to me. Still a better name for the function could be found... |
||
.history(history) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If history is not configured, what's the default? Is it mandatory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default history is 1024 |
||
.res() | ||
.await | ||
.unwrap(); | ||
|
||
for idx in 0..u32::MAX { | ||
sleep(Duration::from_secs(1)).await; | ||
let buf = format!("[{:4}] {}", idx, value); | ||
println!("Pub Data ('{}': '{}')", &key_expr, buf); | ||
publ.put(buf).res().await.unwrap(); | ||
} | ||
} | ||
|
||
fn parse_args() -> (Config, String, String, bool, usize) { | ||
let args = App::new("zenoh-ext non blocking fault tolerant reliable publisher example") | ||
.arg( | ||
Arg::from_usage("-m, --mode=[MODE] 'The zenoh session mode (peer by default).") | ||
.possible_values(["peer", "client"]), | ||
) | ||
.arg(Arg::from_usage( | ||
"-e, --connect=[ENDPOINT]... 'Endpoints to connect to.'", | ||
)) | ||
.arg(Arg::from_usage( | ||
"-l, --listen=[ENDPOINT]... 'Endpoints to listen on.'", | ||
)) | ||
.arg( | ||
Arg::from_usage("-k, --key=[KEYEXPR] 'The key expression to publish.'") | ||
.default_value("demo/example/zenoh-rs-pub"), | ||
) | ||
.arg( | ||
Arg::from_usage("-v, --value=[VALUE] 'The value to publish.'") | ||
.default_value("Pub from Rust!"), | ||
) | ||
.arg(Arg::from_usage( | ||
"-n, --no-cache 'Disable local reliability cache'", | ||
)) | ||
.arg( | ||
Arg::from_usage("-h, --history=[SIZE] 'The number of publications to keep in cache'") | ||
.default_value("1024"), | ||
) | ||
.arg(Arg::from_usage( | ||
"-c, --config=[FILE] 'A configuration file.'", | ||
)) | ||
.arg(Arg::from_usage( | ||
"--no-multicast-scouting 'Disable the multicast-based scouting mechanism.'", | ||
)) | ||
.get_matches(); | ||
|
||
let mut config = if let Some(conf_file) = args.value_of("config") { | ||
Config::from_file(conf_file).unwrap() | ||
} else { | ||
Config::default() | ||
}; | ||
if let Some(Ok(mode)) = args.value_of("mode").map(|mode| mode.parse()) { | ||
config.set_mode(Some(mode)).unwrap(); | ||
} | ||
if let Some(values) = args.values_of("connect") { | ||
config | ||
.connect | ||
.endpoints | ||
.extend(values.map(|v| v.parse().unwrap())) | ||
} | ||
if let Some(values) = args.values_of("listen") { | ||
config | ||
.listen | ||
.endpoints | ||
.extend(values.map(|v| v.parse().unwrap())) | ||
} | ||
if args.is_present("no-multicast-scouting") { | ||
config.scouting.multicast.set_enabled(Some(false)).unwrap(); | ||
} | ||
|
||
let key_expr = args.value_of("key").unwrap().to_string(); | ||
let value = args.value_of("value").unwrap().to_string(); | ||
let cache = !args.is_present("no-cache"); | ||
let history: usize = args.value_of("history").unwrap().parse().unwrap(); | ||
|
||
(config, key_expr, value, cache, history) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
history
is not configured, what's the default? Is it mandatory?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default history is 1024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size of the reliability cache is really dependent on the use case and should be configured accordingly. This will force the user to think how big or small it has to be. Similar thing applies for the publisher and subscriber. Should we make it mandatory?