Skip to content

Commit

Permalink
feat: remove Value from the public API, make it internal for plugins (
Browse files Browse the repository at this point in the history
#1119)

* feat: remove `Value` from the public API, make it internal for plugins

* fix: remove `ReplyError::payload_mut`

* fix: fix tests

* fix: fix examples

* fix: fix examples

* fix: fix doctests
  • Loading branch information
wyfo authored Jun 12, 2024
1 parent 115e894 commit 73961dd
Show file tree
Hide file tree
Showing 26 changed files with 146 additions and 216 deletions.
12 changes: 6 additions & 6 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
use std::time::Duration;

use clap::Parser;
use zenoh::{prelude::*, query::QueryTarget, selector::Selector, Config};
use zenoh::{query::QueryTarget, selector::Selector, Config};
use zenoh_examples::CommonArgs;

#[tokio::main]
async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (config, selector, value, target, timeout) = parse_args();
let (config, selector, payload, target, timeout) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();
Expand All @@ -34,7 +34,7 @@ async fn main() {
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
.value(value)
.payload(payload.unwrap_or_default())
.target(target)
.timeout(timeout)
.await
Expand Down Expand Up @@ -77,8 +77,8 @@ struct Args {
/// The selection of resources to query
selector: Selector<'static>,
#[arg(short, long)]
/// An optional value to put in the query.
value: Option<String>,
/// An optional payload to put in the query.
payload: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
Expand All @@ -100,7 +100,7 @@ fn parse_args() -> (
(
args.common.into(),
args.selector,
args.value,
args.payload,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Expand Down
13 changes: 6 additions & 7 deletions examples/examples/z_get_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::time::Duration;

use clap::Parser;
use zenoh::{
prelude::*,
query::QueryTarget,
selector::Selector,
shm::{
Expand All @@ -33,7 +32,7 @@ async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (mut config, selector, mut value, target, timeout) = parse_args();
let (mut config, selector, mut payload, target, timeout) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
Expand Down Expand Up @@ -67,15 +66,15 @@ async fn main() {
.await
.unwrap();

let content = value
let content = payload
.take()
.unwrap_or_else(|| "Get from SHM Rust!".to_string());
sbuf[0..content.len()].copy_from_slice(content.as_bytes());

println!("Sending Query '{selector}'...");
let replies = session
.get(&selector)
.value(sbuf)
.payload(sbuf)
.target(target)
.timeout(timeout)
.await
Expand Down Expand Up @@ -114,8 +113,8 @@ struct Args {
#[arg(short, long, default_value = "demo/example/**")]
/// The selection of resources to query
selector: Selector<'static>,
/// The value to publish.
value: Option<String>,
/// The payload to publish.
payload: Option<String>,
#[arg(short, long, default_value = "BEST_MATCHING")]
/// The target queryables of the query.
target: Qt,
Expand All @@ -137,7 +136,7 @@ fn parse_args() -> (
(
args.common.into(),
args.selector,
args.value,
args.payload,
match args.target {
Qt::BestMatching => QueryTarget::BestMatching,
Qt::All => QueryTarget::All,
Expand Down
10 changes: 5 additions & 5 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() {
// Initiate logging
zenoh::try_init_log_from_env();

let (config, key_expr, value, attachment) = parse_args();
let (config, key_expr, payload, attachment) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();
Expand All @@ -33,7 +33,7 @@ async fn main() {
println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
let buf = format!("[{idx:4}] {payload}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
publisher.put(buf).attachment(&attachment).await.unwrap();
}
Expand All @@ -45,8 +45,8 @@ struct Args {
/// The key expression to write to.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Pub from Rust!")]
/// The value to write.
value: String,
/// The payload to write.
payload: String,
#[arg(short, long)]
/// The attachments to add to each put.
///
Expand All @@ -58,5 +58,5 @@ struct Args {

fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.attach)
(args.common.into(), args.key, args.payload, args.attach)
}
12 changes: 6 additions & 6 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), ZError> {
// Initiate logging
zenoh::try_init_log_from_env();

let (mut config, path, value) = parse_args();
let (mut config, path, payload) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
Expand Down Expand Up @@ -74,10 +74,10 @@ async fn main() -> Result<(), ZError> {
// of the write. This is simply to have the same format as zn_pub.
let prefix = format!("[{idx:4}] ");
let prefix_len = prefix.as_bytes().len();
let slice_len = prefix_len + value.as_bytes().len();
let slice_len = prefix_len + payload.as_bytes().len();

sbuf[0..prefix_len].copy_from_slice(prefix.as_bytes());
sbuf[prefix_len..slice_len].copy_from_slice(value.as_bytes());
sbuf[prefix_len..slice_len].copy_from_slice(payload.as_bytes());

// Write the data
println!(
Expand All @@ -97,13 +97,13 @@ struct Args {
/// The key expression to publish onto.
path: KeyExpr<'static>,
#[arg(short, long, default_value = "Pub from SHM Rust!")]
/// The value of to publish.
value: String,
/// The payload of to publish.
payload: String,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String) {
let args = Args::parse();
(args.common.into(), args.path, args.value)
(args.common.into(), args.path, args.payload)
}
12 changes: 6 additions & 6 deletions examples/examples/z_put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (config, key_expr, value) = parse_args();
let (config, key_expr, payload) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Putting Data ('{key_expr}': '{value}')...");
session.put(&key_expr, value).await.unwrap();
println!("Putting Data ('{key_expr}': '{payload}')...");
session.put(&key_expr, payload).await.unwrap();
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand All @@ -35,13 +35,13 @@ struct Args {
/// The key expression to write to.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Put from Rust!")]
/// The value to write.
value: String,
/// The payload to write.
payload: String,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String) {
let args = Args::parse();
(args.common.into(), args.key, args.value)
(args.common.into(), args.key, args.payload)
}
12 changes: 6 additions & 6 deletions examples/examples/z_put_float.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (config, key_expr, value) = parse_args();
let (config, key_expr, payload) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();

println!("Putting Float ('{key_expr}': '{value}')...");
session.put(&key_expr, value).await.unwrap();
println!("Putting Float ('{key_expr}': '{payload}')...");
session.put(&key_expr, payload).await.unwrap();

session.close().await.unwrap();
}
Expand All @@ -37,13 +37,13 @@ struct Args {
/// The key expression to write to.
key: KeyExpr<'static>,
#[arg(short, long, default_value_t = std::f64::consts::PI)]
/// The value to write.
value: f64,
/// The payload to write.
payload: f64,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, f64) {
let args = Args::parse();
(args.common.into(), args.key, args.value)
(args.common.into(), args.key, args.payload)
}
21 changes: 10 additions & 11 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (mut config, key_expr, value, complete) = parse_args();
let (mut config, key_expr, payload, complete) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
Expand All @@ -43,27 +43,26 @@ async fn main() {

println!("Press CTRL-C to quit...");
while let Ok(query) = queryable.recv_async().await {
match query.value() {
match query.payload() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => {
let payload = value
.payload()
Some(query_payload) => {
let deserialized_payload = query_payload
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Queryable ] Received Query '{}' with payload '{}'",
query.selector(),
payload
deserialized_payload
)
}
}
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
payload,
);
query
.reply(key_expr.clone(), value.clone())
.reply(key_expr.clone(), payload.clone())
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
}
Expand All @@ -75,8 +74,8 @@ struct Args {
/// The key expression matching queries to reply to.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Queryable from Rust!")]
/// The value to reply to queries.
value: String,
/// The payload to reply to queries.
payload: String,
#[arg(long)]
/// Declare the queryable as complete w.r.t. the key expression.
complete: bool,
Expand All @@ -86,5 +85,5 @@ struct Args {

fn parse_args() -> (Config, KeyExpr<'static>, String, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.complete)
(args.common.into(), args.key, args.payload, args.complete)
}
18 changes: 9 additions & 9 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn main() {
// initiate logging
zenoh::try_init_log_from_env();

let (mut config, key_expr, value, complete) = parse_args();
let (mut config, key_expr, payload, complete) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
Expand Down Expand Up @@ -68,9 +68,9 @@ async fn main() {
query.selector(),
query.key_expr().as_str(),
);
if let Some(payload) = query.payload() {
match payload.deserialize::<&zshm>() {
Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)),
if let Some(query_payload) = query.payload() {
match query_payload.deserialize::<&zshm>() {
Ok(p) => print!(": '{}'", String::from_utf8_lossy(p)),
Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e),
}
}
Expand All @@ -86,12 +86,12 @@ async fn main() {
.await
.unwrap();

sbuf[0..value.len()].copy_from_slice(value.as_bytes());
sbuf[0..payload.len()].copy_from_slice(payload.as_bytes());

println!(
">> [Queryable] Responding ('{}': '{}')",
key_expr.as_str(),
value,
payload,
);
query
.reply(key_expr.clone(), sbuf)
Expand All @@ -106,8 +106,8 @@ struct Args {
/// The key expression matching queries to reply to.
key: KeyExpr<'static>,
#[arg(short, long, default_value = "Queryable from SHM Rust!")]
/// The value to reply to queries.
value: String,
/// The payload to reply to queries.
payload: String,
#[arg(long)]
/// Declare the queryable as complete w.r.t. the key expression.
complete: bool,
Expand All @@ -117,5 +117,5 @@ struct Args {

fn parse_args() -> (Config, KeyExpr<'static>, String, bool) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.complete)
(args.common.into(), args.key, args.payload, args.complete)
}
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::{hash_map::Entry, HashMap};

use async_std::sync::RwLock;
use async_trait::async_trait;
use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp, value::Value};
use zenoh::{internal::Value, key_expr::OwnedKeyExpr, time::Timestamp};
use zenoh_backend_traits::{
config::{StorageConfig, VolumeConfig},
Capability, History, Persistence, Storage, StorageInsertionResult, StoredData, Volume,
Expand Down
2 changes: 1 addition & 1 deletion plugins/zenoh-backend-traits/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async-std = { workspace = true, features = ["default"] }
async-trait = { workspace = true }
derive_more = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true, features = ["unstable"] }
zenoh = { workspace = true, features = ["unstable", "internal"] }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
schemars = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions plugins/zenoh-backend-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! ```
//! use std::sync::Arc;
//! use async_trait::async_trait;
//! use zenoh::{key_expr::OwnedKeyExpr, prelude::*, time::Timestamp, value::Value};
//! use zenoh::{key_expr::OwnedKeyExpr, prelude::*, time::Timestamp, internal::Value};
//! use zenoh_backend_traits::*;
//! use zenoh_backend_traits::config::*;
//!
Expand Down Expand Up @@ -125,9 +125,9 @@ use async_trait::async_trait;
use const_format::concatcp;
use zenoh::{
core::Result as ZResult,
internal::Value,
key_expr::{keyexpr, OwnedKeyExpr},
time::Timestamp,
value::Value,
};
use zenoh_plugin_trait::{PluginControl, PluginInstance, PluginStatusRec, StructVersion};
use zenoh_util::concat_enabled_features;
Expand Down
Loading

0 comments on commit 73961dd

Please sign in to comment.