Skip to content

Commit

Permalink
fix zenoh-ext example args parsing; (eclipse-zenoh#934)
Browse files Browse the repository at this point in the history
fix zenoh-ext::Group task termination to prevent panic in z_view_size example;
  • Loading branch information
DenisBiryukov91 authored Apr 16, 2024
1 parent 4aa6e3d commit 88573f3
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 388 deletions.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ members = [
"plugins/zenoh-plugin-trait",
"zenoh",
"zenoh-ext",
"zenoh-ext/examples",
"zenohd",
]
exclude = ["ci/nostd-check", "ci/valgrind-check"]
Expand Down
19 changes: 0 additions & 19 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,5 @@ zenoh-sync = { workspace = true }
zenoh-runtime = { workspace = true }
zenoh-task = { workspace = true }

[dev-dependencies]
clap = { workspace = true, features = ["derive"] }

[[example]]
name = "z_query_sub"
path = "examples/z_query_sub.rs"

[[example]]
name = "z_pub_cache"
path = "examples/z_pub_cache.rs"

[[example]]
name = "z_member"
path = "examples/z_member.rs"

[[example]]
name = "z_view_size"
path = "examples/z_view_size.rs"

[package.metadata.docs.rs]
features = ["unstable"]
62 changes: 62 additions & 0 deletions zenoh-ext/examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Copyright (c) 2023 ZettaScale Technology
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
# which is available at https://www.apache.org/licenses/LICENSE-2.0.
#
# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
#
# Contributors:
# ZettaScale Zenoh Team, <[email protected]>
#
[package]
rust-version = { workspace = true }
name = "zenoh-ext-examples"
version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
authors = ["kydos <[email protected]>", "Julien Enoch <[email protected]>"]
edition = { workspace = true }
license = { workspace = true }
categories = { workspace = true }
description = "Internal crate for zenoh"

[badges]
maintenance = { status = "actively-developed" }

[features]
unstable = []
default = []

[dependencies]
tokio = { workspace = true, features = ["rt", "sync", "time", "macros", "io-std"] }
bincode = { workspace = true }
zenoh-util = {workspace = true }
flume = { workspace = true }
futures = { workspace = true }
tracing = {workspace = true}
serde = { workspace = true, features = ["default"] }
zenoh = { workspace = true, features = ["unstable"], default-features = false }
zenoh-ext = {workspace = true }
clap = { workspace = true, features = ["derive"] }

[[example]]
name = "z_query_sub"
path = "examples/z_query_sub.rs"

[[example]]
name = "z_pub_cache"
path = "examples/z_pub_cache.rs"

[[example]]
name = "z_member"
path = "examples/z_member.rs"

[[example]]
name = "z_view_size"
path = "examples/z_view_size.rs"

[package.metadata.docs.rs]
features = ["unstable"]
File renamed without changes.
File renamed without changes.
93 changes: 93 additions & 0 deletions zenoh-ext/examples/examples/z_pub_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::{arg, Parser};
use std::time::Duration;
use zenoh::config::{Config, ModeDependentValue};
use zenoh::prelude::r#async::*;
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh_util::init_log_from_env();

let (config, key_expr, value, history, prefix, complete) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
let _publication_cache = publication_cache_builder.res().await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Put Data ('{}': '{}')", &key_expr, buf);
session.put(&key_expr, buf).res().await.unwrap();
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/zenoh-rs-pub")]
/// The key expression to publish.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Pub from Rust!")]
/// The value to reply to queries.
value: String,
#[arg(short = 'i', long, default_value = "1")]
/// The number of publications to keep in cache.
history: usize,
#[arg(short = 'o', long)]
/// Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.
complete: bool,
#[arg(short = 'x', long)]
/// An optional queryable prefix.
prefix: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (
Config,
KeyExpr<'static>,
String,
usize,
Option<String>,
bool,
) {
let args = Args::parse();
let mut config: Config = args.common.into();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
(
config,
args.key,
args.value,
args.history,
args.prefix,
args.complete,
)
}
81 changes: 81 additions & 0 deletions zenoh-ext/examples/examples/z_query_sub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::arg;
use clap::Parser;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh::query::ReplyKeyExpr;
use zenoh_ext::*;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
// Initiate logging
zenoh_util::init_log_from_env();

let (config, key_expr, query) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!(
"Declaring QueryingSubscriber on {} with an initial query on {}",
key_expr,
query.as_ref().unwrap_or(&key_expr)
);
let subscriber = if let Some(selector) = query {
session
.declare_subscriber(key_expr)
.querying()
.query_selector(&selector)
.query_accept_replies(ReplyKeyExpr::Any)
.res()
.await
.unwrap()
} else {
session
.declare_subscriber(key_expr)
.querying()
.res()
.await
.unwrap()
};

println!("Press CTRL-C to quit...");
while let Ok(sample) = subscriber.recv_async().await {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The key expression to subscribe onto.
key: String,
#[arg(short, long)]
/// The selector to use for queries (by default it's same as 'key' option)
query: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.query)
}
80 changes: 80 additions & 0 deletions zenoh-ext/examples/examples/z_view_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::{arg, Parser};
use std::sync::Arc;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_ext::group::*;
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
async fn main() {
zenoh_util::init_log_from_env();

let (config, group_name, id, size, timeout) = parse_args();

let z = Arc::new(zenoh::open(config).res().await.unwrap());
let member_id = id.unwrap_or_else(|| z.zid().to_string());
let member = Member::new(member_id.as_str())
.unwrap()
.lease(Duration::from_secs(3));

let group = Group::join(z.clone(), group_name.as_str(), member)
.await
.unwrap();
println!(
"Member {member_id} waiting for {size} members in group {group_name} for {timeout} seconds..."
);
if group
.wait_for_view_size(size, Duration::from_secs(timeout))
.await
{
println!("Established view size of {size} with members:");
for m in group.view().await {
println!(" - {}", m.id());
}
} else {
println!("Failed to establish view size of {size}");
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "zgroup")]
///"The group name".
group: String,
#[arg(short, long, default_value = "3")]
/// The expected group size. The example will wait for the group to reach this size.
size: usize,
#[arg(short, long, default_value = "15")]
/// The duration (in seconds) this example will wait for the group to reach the expected size
timeout: u64,
#[arg(short, long)]
/// The group member id (default is the zenoh ID)
id: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, String, Option<String>, usize, u64) {
let args = Args::parse();
(
args.common.into(),
args.group,
args.id,
args.size,
args.timeout,
)
}
Loading

0 comments on commit 88573f3

Please sign in to comment.