diff --git a/Cargo.lock b/Cargo.lock index ea6441f797..1adc8bdf8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5020,7 +5020,6 @@ name = "zenoh-ext" version = "0.11.0-dev" dependencies = [ "bincode", - "clap", "flume", "futures", "serde", @@ -5036,6 +5035,22 @@ dependencies = [ "zenoh-util", ] +[[package]] +name = "zenoh-ext-examples" +version = "0.11.0-dev" +dependencies = [ + "bincode", + "clap", + "flume", + "futures", + "serde", + "tokio", + "tracing", + "zenoh", + "zenoh-ext", + "zenoh-util", +] + [[package]] name = "zenoh-keyexpr" version = "0.11.0-dev" diff --git a/Cargo.toml b/Cargo.toml index b82735b72a..d226de35cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ members = [ "plugins/zenoh-plugin-trait", "zenoh", "zenoh-ext", + "zenoh-ext/examples", "zenohd", ] exclude = ["ci/nostd-check", "ci/valgrind-check"] diff --git a/zenoh-ext/Cargo.toml b/zenoh-ext/Cargo.toml index 99b6ecf5c1..3b842244d3 100644 --- a/zenoh-ext/Cargo.toml +++ b/zenoh-ext/Cargo.toml @@ -46,24 +46,5 @@ zenoh-sync = { workspace = true } zenoh-runtime = { workspace = true } zenoh-task = { workspace = true } -[dev-dependencies] -clap = { workspace = true, features = ["derive"] } - -[[example]] -name = "z_query_sub" -path = "examples/z_query_sub.rs" - -[[example]] -name = "z_pub_cache" -path = "examples/z_pub_cache.rs" - -[[example]] -name = "z_member" -path = "examples/z_member.rs" - -[[example]] -name = "z_view_size" -path = "examples/z_view_size.rs" - [package.metadata.docs.rs] features = ["unstable"] diff --git a/zenoh-ext/examples/Cargo.toml b/zenoh-ext/examples/Cargo.toml new file mode 100644 index 0000000000..3493016835 --- /dev/null +++ b/zenoh-ext/examples/Cargo.toml @@ -0,0 +1,62 @@ +# +# Copyright (c) 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-ext-examples" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +authors = ["kydos ", "Julien Enoch "] +edition = { workspace = true } +license = { workspace = true } +categories = { workspace = true } +description = "Internal crate for zenoh" + +[badges] +maintenance = { status = "actively-developed" } + +[features] +unstable = [] +default = [] + +[dependencies] +tokio = { workspace = true, features = ["rt", "sync", "time", "macros", "io-std"] } +bincode = { workspace = true } +zenoh-util = {workspace = true } +flume = { workspace = true } +futures = { workspace = true } +tracing = {workspace = true} +serde = { workspace = true, features = ["default"] } +zenoh = { workspace = true, features = ["unstable"], default-features = false } +zenoh-ext = {workspace = true } +clap = { workspace = true, features = ["derive"] } + +[[example]] +name = "z_query_sub" +path = "examples/z_query_sub.rs" + +[[example]] +name = "z_pub_cache" +path = "examples/z_pub_cache.rs" + +[[example]] +name = "z_member" +path = "examples/z_member.rs" + +[[example]] +name = "z_view_size" +path = "examples/z_view_size.rs" + +[package.metadata.docs.rs] +features = ["unstable"] diff --git a/zenoh-ext/examples/README.md b/zenoh-ext/examples/examples/README.md similarity index 100% rename from zenoh-ext/examples/README.md rename to zenoh-ext/examples/examples/README.md diff --git a/zenoh-ext/examples/z_member.rs b/zenoh-ext/examples/examples/z_member.rs similarity index 100% rename from zenoh-ext/examples/z_member.rs rename to zenoh-ext/examples/examples/z_member.rs diff --git a/zenoh-ext/examples/examples/z_pub_cache.rs b/zenoh-ext/examples/examples/z_pub_cache.rs new file mode 100644 index 0000000000..332e3f7822 --- /dev/null +++ b/zenoh-ext/examples/examples/z_pub_cache.rs @@ -0,0 +1,93 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::{arg, Parser}; +use std::time::Duration; +use zenoh::config::{Config, ModeDependentValue}; +use zenoh::prelude::r#async::*; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh_util::init_log_from_env(); + + let (config, key_expr, value, history, prefix, complete) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!("Declaring PublicationCache on {}", &key_expr); + let mut publication_cache_builder = session + .declare_publication_cache(&key_expr) + .history(history) + .queryable_complete(complete); + if let Some(prefix) = prefix { + publication_cache_builder = publication_cache_builder.queryable_prefix(prefix); + } + let _publication_cache = publication_cache_builder.res().await.unwrap(); + + println!("Press CTRL-C to quit..."); + for idx in 0..u32::MAX { + tokio::time::sleep(Duration::from_secs(1)).await; + let buf = format!("[{idx:4}] {value}"); + println!("Put Data ('{}': '{}')", &key_expr, buf); + session.put(&key_expr, buf).res().await.unwrap(); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-pub")] + /// The key expression to publish. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Pub from Rust!")] + /// The value to reply to queries. + value: String, + #[arg(short = 'i', long, default_value = "1")] + /// The number of publications to keep in cache. + history: usize, + #[arg(short = 'o', long)] + /// Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables. + complete: bool, + #[arg(short = 'x', long)] + /// An optional queryable prefix. + prefix: Option, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + KeyExpr<'static>, + String, + usize, + Option, + bool, +) { + let args = Args::parse(); + let mut config: Config = args.common.into(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + ( + config, + args.key, + args.value, + args.history, + args.prefix, + args.complete, + ) +} diff --git a/zenoh-ext/examples/examples/z_query_sub.rs b/zenoh-ext/examples/examples/z_query_sub.rs new file mode 100644 index 0000000000..3622c78e7c --- /dev/null +++ b/zenoh-ext/examples/examples/z_query_sub.rs @@ -0,0 +1,81 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::arg; +use clap::Parser; +use zenoh::config::Config; +use zenoh::prelude::r#async::*; +use zenoh::query::ReplyKeyExpr; +use zenoh_ext::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + // Initiate logging + zenoh_util::init_log_from_env(); + + let (config, key_expr, query) = parse_args(); + + println!("Opening session..."); + let session = zenoh::open(config).res().await.unwrap(); + + println!( + "Declaring QueryingSubscriber on {} with an initial query on {}", + key_expr, + query.as_ref().unwrap_or(&key_expr) + ); + let subscriber = if let Some(selector) = query { + session + .declare_subscriber(key_expr) + .querying() + .query_selector(&selector) + .query_accept_replies(ReplyKeyExpr::Any) + .res() + .await + .unwrap() + } else { + session + .declare_subscriber(key_expr) + .querying() + .res() + .await + .unwrap() + }; + + println!("Press CTRL-C to quit..."); + while let Ok(sample) = subscriber.recv_async().await { + println!( + ">> [Subscriber] Received {} ('{}': '{}')", + sample.kind, + sample.key_expr.as_str(), + sample.value + ); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The key expression to subscribe onto. + key: String, + #[arg(short, long)] + /// The selector to use for queries (by default it's same as 'key' option) + query: Option, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, String, Option) { + let args = Args::parse(); + (args.common.into(), args.key, args.query) +} diff --git a/zenoh-ext/examples/examples/z_view_size.rs b/zenoh-ext/examples/examples/z_view_size.rs new file mode 100644 index 0000000000..a7d95ae603 --- /dev/null +++ b/zenoh-ext/examples/examples/z_view_size.rs @@ -0,0 +1,80 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::{arg, Parser}; +use std::sync::Arc; +use std::time::Duration; +use zenoh::config::Config; +use zenoh::prelude::r#async::*; +use zenoh_ext::group::*; +use zenoh_ext_examples::CommonArgs; + +#[tokio::main] +async fn main() { + zenoh_util::init_log_from_env(); + + let (config, group_name, id, size, timeout) = parse_args(); + + let z = Arc::new(zenoh::open(config).res().await.unwrap()); + let member_id = id.unwrap_or_else(|| z.zid().to_string()); + let member = Member::new(member_id.as_str()) + .unwrap() + .lease(Duration::from_secs(3)); + + let group = Group::join(z.clone(), group_name.as_str(), member) + .await + .unwrap(); + println!( + "Member {member_id} waiting for {size} members in group {group_name} for {timeout} seconds..." + ); + if group + .wait_for_view_size(size, Duration::from_secs(timeout)) + .await + { + println!("Established view size of {size} with members:"); + for m in group.view().await { + println!(" - {}", m.id()); + } + } else { + println!("Failed to establish view size of {size}"); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "zgroup")] + ///"The group name". + group: String, + #[arg(short, long, default_value = "3")] + /// The expected group size. The example will wait for the group to reach this size. + size: usize, + #[arg(short, long, default_value = "15")] + /// The duration (in seconds) this example will wait for the group to reach the expected size + timeout: u64, + #[arg(short, long)] + /// The group member id (default is the zenoh ID) + id: Option, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, String, Option, usize, u64) { + let args = Args::parse(); + ( + args.common.into(), + args.group, + args.id, + args.size, + args.timeout, + ) +} diff --git a/zenoh-ext/examples/src/lib.rs b/zenoh-ext/examples/src/lib.rs new file mode 100644 index 0000000000..4c203d5cff --- /dev/null +++ b/zenoh-ext/examples/src/lib.rs @@ -0,0 +1,60 @@ +//! Examples on using Zenoh. +//! See the code in ../examples/ +//! Check ../README.md for usage. +//! +use zenoh::config::Config; + +#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub enum Wai { + Peer, + Client, + Router, +} +impl core::fmt::Display for Wai { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + core::fmt::Debug::fmt(&self, f) + } +} +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +pub struct CommonArgs { + #[arg(short, long)] + /// A configuration file. + config: Option, + #[arg(short, long)] + /// The Zenoh session mode [default: peer]. + mode: Option, + #[arg(short = 'e', long)] + /// Endpoints to connect to. + connect: Vec, + #[arg(short, long)] + /// Endpoints to listen on. + listen: Vec, +} + +impl From for Config { + fn from(value: CommonArgs) -> Self { + (&value).into() + } +} +impl From<&CommonArgs> for Config { + fn from(value: &CommonArgs) -> Self { + let mut config = match &value.config { + Some(path) => Config::from_file(path).unwrap(), + None => Config::default(), + }; + match value.mode { + Some(Wai::Peer) => config.set_mode(Some(zenoh::scouting::WhatAmI::Peer)), + Some(Wai::Client) => config.set_mode(Some(zenoh::scouting::WhatAmI::Client)), + Some(Wai::Router) => config.set_mode(Some(zenoh::scouting::WhatAmI::Router)), + None => Ok(None), + } + .unwrap(); + if !value.connect.is_empty() { + config.connect.endpoints = value.connect.iter().map(|v| v.parse().unwrap()).collect(); + } + if !value.listen.is_empty() { + config.listen.endpoints = value.listen.iter().map(|v| v.parse().unwrap()).collect(); + } + config + } +} diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs deleted file mode 100644 index 34c13dbce8..0000000000 --- a/zenoh-ext/examples/z_pub_cache.rs +++ /dev/null @@ -1,109 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use clap::{arg, Command}; -use std::time::Duration; -use zenoh::config::{Config, ModeDependentValue}; -use zenoh::prelude::r#async::*; -use zenoh_ext::*; - -#[tokio::main] -async fn main() { - // Initiate logging - zenoh_util::init_log_from_env(); - - let (config, key_expr, value, history, prefix, complete) = parse_args(); - - println!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); - - println!("Declaring PublicationCache on {}", &key_expr); - let mut publication_cache_builder = session - .declare_publication_cache(&key_expr) - .history(history) - .queryable_complete(complete); - if let Some(prefix) = prefix { - publication_cache_builder = publication_cache_builder.queryable_prefix(prefix); - } - let _publication_cache = publication_cache_builder.res().await.unwrap(); - - println!("Press CTRL-C to quit..."); - for idx in 0..u32::MAX { - tokio::time::sleep(Duration::from_secs(1)).await; - let buf = format!("[{idx:4}] {value}"); - println!("Put Data ('{}': '{}')", &key_expr, buf); - session.put(&key_expr, buf).res().await.unwrap(); - } -} - -fn parse_args() -> (Config, String, String, usize, Option, bool) { - let args = Command::new("zenoh-ext pub cache example") - .arg( - arg!(-m --mode [MODE] "The zenoh session mode (peer by default)") - .value_parser(["peer", "client"]), - ) - .arg(arg!(-e --connect [ENDPOINT]... "Endpoints to connect to.")) - .arg(arg!(-l --listen [ENDPOINT]... "Endpoints to listen on.")) - .arg( - arg!(-k --key [KEYEXPR] "The key expression to publish.") - .default_value("demo/example/zenoh-rs-pub"), - ) - .arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!")) - .arg( - arg!(-i --history [SIZE] "The number of publications to keep in cache") - .default_value("1"), - ) - .arg(arg!(-x --prefix [STRING] "An optional queryable prefix")) - .arg(arg!(-c --config [FILE] "A configuration file.")) - .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) - .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) - .get_matches(); - - let mut config = if let Some(conf_file) = args.get_one::<&String>("config") { - Config::from_file(conf_file).unwrap() - } else { - Config::default() - }; - if let Some(Ok(mode)) = args.get_one::<&String>("mode").map(|mode| mode.parse()) { - config.set_mode(Some(mode)).unwrap(); - } - if let Some(values) = args.get_many::<&String>("connect") { - config - .connect - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if let Some(values) = args.get_many::<&String>("listen") { - config - .listen - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if args.get_flag("no-multicast-scouting") { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - - // Timestamping of publications is required for publication cache - config - .timestamping - .set_enabled(Some(ModeDependentValue::Unique(true))) - .unwrap(); - - let key_expr = args.get_one::("key").unwrap().to_string(); - let value = args.get_one::("value").unwrap().to_string(); - let history: usize = args.get_one::("history").unwrap().parse().unwrap(); - let prefix = args.get_one::("prefix").map(|s| (*s).to_owned()); - let complete = args.get_flag("complete"); - - (config, key_expr, value, history, prefix, complete) -} diff --git a/zenoh-ext/examples/z_query_sub.rs b/zenoh-ext/examples/z_query_sub.rs deleted file mode 100644 index c7491cfa61..0000000000 --- a/zenoh-ext/examples/z_query_sub.rs +++ /dev/null @@ -1,112 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use clap::arg; -use clap::Command; -use zenoh::config::Config; -use zenoh::prelude::r#async::*; -use zenoh::query::ReplyKeyExpr; -use zenoh_ext::*; - -#[tokio::main] -async fn main() { - // Initiate logging - zenoh_util::init_log_from_env(); - - let (config, key_expr, query) = parse_args(); - - println!("Opening session..."); - let session = zenoh::open(config).res().await.unwrap(); - - println!( - "Declaring QueryingSubscriber on {} with an initial query on {}", - key_expr, - query.as_ref().unwrap_or(&key_expr) - ); - let subscriber = if let Some(selector) = query { - session - .declare_subscriber(key_expr) - .querying() - .query_selector(&selector) - .query_accept_replies(ReplyKeyExpr::Any) - .res() - .await - .unwrap() - } else { - session - .declare_subscriber(key_expr) - .querying() - .res() - .await - .unwrap() - }; - - println!("Press CTRL-C to quit..."); - while let Ok(sample) = subscriber.recv_async().await { - println!( - ">> [Subscriber] Received {} ('{}': '{}')", - sample.kind, - sample.key_expr.as_str(), - sample.value - ); - } -} - -fn parse_args() -> (Config, String, Option) { - let args = Command::new("zenoh-ext query sub example") - .arg( - arg!(-m --mode [MODE] "The zenoh session mode (peer by default).") - .value_parser(["peer", "client"]), - ) - .arg(arg!(-e --connect [ENDPOINT]... "Endpoints to connect to.")) - .arg(arg!(-l --listen [ENDPOINT]... "Endpoints to listen on.")) - .arg( - arg!(-k --key [KEYEXPR] "The key expression to subscribe onto") - .default_value("demo/example/**"), - ) - .arg( - arg!(-q --query [SELECTOR] "The selector to use for queries (by default it's same than 'selector' option)") - ) - .arg(arg!(-c --config [FILE] "A configuration file.")) - .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) - .get_matches(); - - let mut config = if let Some(conf_file) = args.get_one::<&String>("config") { - Config::from_file(conf_file).unwrap() - } else { - Config::default() - }; - if let Some(Ok(mode)) = args.get_one::<&String>("mode").map(|mode| mode.parse()) { - config.set_mode(Some(mode)).unwrap(); - } - if let Some(values) = args.get_many::<&String>("connect") { - config - .connect - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if let Some(values) = args.get_many::<&String>("listen") { - config - .listen - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if args.get_flag("no-multicast-scouting") { - config.scouting.multicast.set_enabled(Some(false)).unwrap(); - } - - let key_expr = args.get_one::("key").unwrap().to_string(); - let query = args.get_one::("query").map(ToString::to_string); - - (config, key_expr, query) -} diff --git a/zenoh-ext/examples/z_view_size.rs b/zenoh-ext/examples/z_view_size.rs deleted file mode 100644 index e23b122928..0000000000 --- a/zenoh-ext/examples/z_view_size.rs +++ /dev/null @@ -1,108 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use clap::{arg, Command}; -use std::sync::Arc; -use std::time::Duration; -use zenoh::config::Config; -use zenoh::prelude::r#async::*; -use zenoh_ext::group::*; - -#[tokio::main] -async fn main() { - zenoh_util::init_log_from_env(); - - let (config, group_name, id, size, timeout) = parse_args(); - - let z = Arc::new(zenoh::open(config).res().await.unwrap()); - let member_id = id.unwrap_or_else(|| z.zid().to_string()); - let member = Member::new(member_id.as_str()) - .unwrap() - .lease(Duration::from_secs(3)); - - let group = Group::join(z.clone(), group_name.as_str(), member) - .await - .unwrap(); - println!( - "Member {member_id} waiting for {size} members in group {group_name} for {timeout} seconds..." - ); - if group - .wait_for_view_size(size, Duration::from_secs(timeout)) - .await - { - println!("Established view size of {size} with members:"); - for m in group.view().await { - println!(" - {}", m.id()); - } - } else { - println!("Failed to establish view size of {size}"); - } -} - -fn parse_args() -> (Config, String, Option, usize, u64) { - let args = Command::new("zenoh-ext group view size example") - .arg( - arg!(-m --mode [MODE] "The zenoh session mode (peer by default).") - .value_parser(["peer", "client"]), - ) - .arg(arg!( - -e --connect [ENDPOINT]... "Endpoints to connect to." - )) - .arg(arg!( - -l --listen [ENDPOINT]... "Endpoints to listen on." - )) - .arg(arg!( - -c --config [FILE] "A configuration file." - )) - .arg(arg!( - -g --group [STRING] "The group name" - ).default_value("zgroup")) - .arg(arg!( - -i --id [STRING] "The group member id (default is the zenoh ID)" - )) - .arg(arg!( - -s --size [INT] "The expected group size. The example will wait for the group to reach this size" - ).default_value("3")) - .arg(arg!( - -t --timeout [SEC] "The duration (in seconds) this example will wait for the group to reach the expected size." - ).default_value("15")) - .get_matches(); - - let mut config = if let Some(conf_file) = args.get_one::<&String>("config") { - Config::from_file(conf_file).unwrap() - } else { - Config::default() - }; - if let Some(Ok(mode)) = args.get_one::<&String>("mode").map(|mode| mode.parse()) { - config.set_mode(Some(mode)).unwrap(); - } - if let Some(values) = args.get_many::<&String>("connect") { - config - .connect - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - if let Some(values) = args.get_many::<&String>("listen") { - config - .listen - .endpoints - .extend(values.map(|v| v.parse().unwrap())) - } - - let group = args.get_one::("group").unwrap().to_string(); - let id = args.get_one::("id").map(|v| (*v).to_owned()); - let size: usize = args.get_one::("size").unwrap().parse().unwrap(); - let timeout: u64 = args.get_one::("timeout").unwrap().parse().unwrap(); - - (config, group, id, size, timeout) -} diff --git a/zenoh-ext/src/group.rs b/zenoh-ext/src/group.rs index 6bd9841c29..a1442fa5c4 100644 --- a/zenoh-ext/src/group.rs +++ b/zenoh-ext/src/group.rs @@ -24,7 +24,6 @@ use std::ops::Add; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; -use tokio::task::JoinHandle; use zenoh::prelude::r#async::*; use zenoh::publication::Publisher; use zenoh::query::ConsolidationMode; @@ -33,6 +32,7 @@ use zenoh::Result as ZResult; use zenoh::Session; use zenoh_result::bail; use zenoh_sync::Condition; +use zenoh_task::TaskController; const GROUP_PREFIX: &str = "zenoh/ext/net/group"; const EVENT_POSTFIX: &str = "evt"; @@ -163,17 +163,13 @@ struct GroupState { pub struct Group { state: Arc, - tasks: Vec>, + task_controller: TaskController, } impl Drop for Group { fn drop(&mut self) { // cancel background tasks - tokio::runtime::Handle::current().block_on(async { - while let Some(handle) = self.tasks.pop() { - handle.abort(); - } - }); + self.task_controller.terminate_all(Duration::from_secs(10)); } } @@ -192,36 +188,33 @@ async fn keep_alive_task(state: Arc) { } } -fn spawn_watchdog(s: Arc, period: Duration) -> JoinHandle<()> { - let watch_dog = async move { - loop { - tokio::time::sleep(period).await; - let now = Instant::now(); - let mut ms = s.members.lock().await; - let expired_members: Vec = ms - .iter() - .filter(|e| e.1 .1 < now) - .map(|e| e.0.clone()) - .collect(); - - for e in &expired_members { - tracing::debug!("Member with lease expired: {}", e); - ms.remove(e); - } - if !expired_members.is_empty() { - tracing::debug!("Other members list: {:?}", ms.keys()); - drop(ms); - let u_evt = &*s.user_events_tx.lock().await; - for e in expired_members { - if let Some(tx) = u_evt { - tx.send(GroupEvent::LeaseExpired(LeaseExpiredEvent { mid: e })) - .unwrap() - } +async fn watchdog_task(s: Arc, period: Duration) { + loop { + tokio::time::sleep(period).await; + let now = Instant::now(); + let mut ms = s.members.lock().await; + let expired_members: Vec = ms + .iter() + .filter(|e| e.1 .1 < now) + .map(|e| e.0.clone()) + .collect(); + + for e in &expired_members { + tracing::debug!("Member with lease expired: {}", e); + ms.remove(e); + } + if !expired_members.is_empty() { + tracing::debug!("Other members list: {:?}", ms.keys()); + drop(ms); + let u_evt = &*s.user_events_tx.lock().await; + for e in expired_members { + if let Some(tx) = u_evt { + tx.send(GroupEvent::LeaseExpired(LeaseExpiredEvent { mid: e })) + .unwrap() } } } - }; - tokio::task::spawn(watch_dog) + } } async fn query_handler(z: Arc, state: Arc) { @@ -395,16 +388,17 @@ impl Group { let buf = bincode::serialize(&join_evt).unwrap(); let _ = state.group_publisher.put(buf).res().await; + let task_controller = TaskController::default(); // If the liveliness is manual it is the user who has to assert it. if is_auto_liveliness { - tokio::task::spawn(keep_alive_task(state.clone())); + task_controller.spawn_abortable(keep_alive_task(state.clone())); } - let events_task = tokio::task::spawn(net_event_handler(z.clone(), state.clone())); - let queries_task = tokio::task::spawn(query_handler(z.clone(), state.clone())); - let watchdog_task = spawn_watchdog(state.clone(), Duration::from_secs(1)); + task_controller.spawn_abortable(net_event_handler(z.clone(), state.clone())); + task_controller.spawn_abortable(query_handler(z.clone(), state.clone())); + task_controller.spawn_abortable(watchdog_task(state.clone(), Duration::from_secs(1))); Ok(Group { state, - tasks: Vec::from([events_task, queries_task, watchdog_task]), + task_controller, }) }