Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol attachment to payload #923

Merged
merged 5 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,12 @@ impl BacktrackableWriter for ZBufWriter<'_> {
#[cfg(feature = "std")]
impl<'a> io::Write for ZBufWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match <Self as Writer>::write(self, buf) {
Ok(n) => Ok(n.get()),
Err(_) => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
)),
Err(_) => Err(io::ErrorKind::UnexpectedEof.into()),
}
}

Expand Down
27 changes: 6 additions & 21 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,12 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.attachment(Some(
attachment
.split('&')
.map(|pair| split_once(pair, '='))
.collect(),
))
}
put.res().await.unwrap();
publisher
.put(buf)
.attachment(&attachment)
.res()
.await
.unwrap();
}
}

Expand All @@ -65,17 +61,6 @@ struct Args {
common: CommonArgs,
}

fn split_once(s: &str, c: char) -> (&[u8], &[u8]) {
let s_bytes = s.as_bytes();
match s.find(c) {
Some(index) => {
let (l, r) = s_bytes.split_at(index);
(l, &r[1..])
}
None => (s_bytes, &[]),
}
}

fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.attach)
Expand Down
9 changes: 8 additions & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ async fn main() {
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
print!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
if let Some(att) = sample.attachment() {
let att = att
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
print!(" ({})", att);
}
println!();
}
}

Expand Down
106 changes: 88 additions & 18 deletions zenoh/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl Payload {
Ok(Payload::new(buf))
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Get a [`PayloadReader`] implementing [`std::io::Read`] trait.
pub fn iter<T>(&self) -> PayloadIterator<'_, T>
where
Expand All @@ -104,12 +109,7 @@ impl Payload {
}
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Encode an object of type `T` as a [`Value`] using the [`ZSerde`].
/// Serialize an object of type `T` as a [`Value`] using the [`ZSerde`].
///
/// ```rust
/// use zenoh::payload::Payload;
Expand All @@ -126,7 +126,7 @@ impl Payload {
ZSerde.serialize(t)
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn deserialize<'a, T>(&'a self) -> ZResult<T>
where
ZSerde: Deserialize<'a, T>,
Expand All @@ -137,7 +137,7 @@ impl Payload {
.map_err(|e| zerror!("{:?}", e).into())
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Infallibly deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn into<'a, T>(&'a self) -> T
where
ZSerde: Deserialize<'a, T, Error = Infallible>,
Expand Down Expand Up @@ -231,6 +231,50 @@ where
}
}

/// Wrapper type for API ergonomicity to allow any type `T` to be converted into `Option<Payload>` where `T` implements `Into<Payload>`.
#[repr(transparent)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct OptionPayload(Option<Payload>);

impl<T> From<T> for OptionPayload
where
T: Into<Payload>,
{
fn from(value: T) -> Self {
Self(Some(value.into()))
}
}

impl<T> From<Option<T>> for OptionPayload
where
T: Into<Payload>,
{
fn from(mut value: Option<T>) -> Self {
match value.take() {
Some(v) => Self(Some(v.into())),
None => Self(None),
}
}
}

impl<T> From<&Option<T>> for OptionPayload
where
for<'a> &'a T: Into<Payload>,
{
fn from(value: &Option<T>) -> Self {
match value.as_ref() {
Some(v) => Self(Some(v.into())),
None => Self(None),
}
}
}

impl From<OptionPayload> for Option<Payload> {
fn from(value: OptionPayload) -> Self {
value.0
}
}

/// The default serializer for Zenoh payload. It supports primitives types, such as: vec<u8>, int, uint, float, string, bool.
/// It also supports common Rust serde values.
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -858,7 +902,7 @@ impl Serialize<&serde_yaml::Value> for ZSerde {

fn serialize(self, t: &serde_yaml::Value) -> Self::Output {
let mut payload = Payload::empty();
serde_yaml::to_writer(payload.0.writer(), t)?;
serde_yaml::to_writer(payload.writer(), t)?;
Ok(payload)
}
}
Expand Down Expand Up @@ -1092,15 +1136,9 @@ impl TryFrom<Payload> for SharedMemoryBuf {
}

// Tuple
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
let (a, b) = t;
macro_rules! impl_tuple {
($t:expr) => {{
let (a, b) = $t;

let codec = Zenoh080::new();
let mut buffer: ZBuf = ZBuf::empty();
Expand All @@ -1117,6 +1155,29 @@ where
}

Payload::new(buffer)
}};
}
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
impl_tuple!(t)
}
}

impl<A, B> Serialize<&(A, B)> for ZSerde
where
for<'a> &'a A: Into<Payload>,
for<'b> &'b B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: &(A, B)) -> Self::Output {
impl_tuple!(t)
}
}

Expand Down Expand Up @@ -1402,5 +1463,14 @@ mod tests {
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(usize, Vec<u8>)>());
assert_eq!(hm, o);

let mut hm: HashMap<String, String> = HashMap::new();
hm.insert(String::from("0"), String::from("a"));
hm.insert(String::from("1"), String::from("b"));
println!("Serialize:\t{:?}", hm);
let p = Payload::from_iter(hm.iter());
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(String, String)>());
assert_eq!(hm, o);
}
}
4 changes: 3 additions & 1 deletion zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Publishing primitives.
use crate::net::primitives::Primitives;
use crate::payload::OptionPayload;
use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
Expand Down Expand Up @@ -167,7 +168,8 @@ impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
}
}
#[cfg(feature = "unstable")]
fn attachment<TA: Into<Option<Attachment>>>(self, attachment: TA) -> Self {
fn attachment<TA: Into<OptionPayload>>(self, attachment: TA) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Query primitives.
use crate::handlers::{locked, Callback, DefaultHandler};
use crate::payload::OptionPayload;
use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
Expand Down Expand Up @@ -144,7 +145,8 @@ impl<Handler> SampleBuilderTrait for GetBuilder<'_, '_, Handler> {
}

#[cfg(feature = "unstable")]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self {
fn attachment<T: Into<OptionPayload>>(self, attachment: T) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use crate::encoding::Encoding;
use crate::handlers::{locked, DefaultHandler};
use crate::net::primitives::Primitives;
use crate::payload::OptionPayload;
use crate::prelude::*;
use crate::sample::builder::SampleBuilder;
use crate::sample::QoSBuilder;
Expand Down Expand Up @@ -308,7 +309,8 @@ impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
fn attachment<U: Into<Option<Attachment>>>(self, attachment: U) -> Self {
fn attachment<U: Into<OptionPayload>>(self, attachment: U) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
8 changes: 5 additions & 3 deletions zenoh/src/sample/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

use std::marker::PhantomData;

use crate::payload::OptionPayload;
#[cfg(feature = "unstable")]
use crate::sample::{Attachment, SourceInfo};
use crate::sample::SourceInfo;
use crate::sample::{QoS, QoSBuilder};
use crate::Encoding;
use crate::KeyExpr;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub trait SampleBuilderTrait {
fn source_info(self, source_info: SourceInfo) -> Self;
/// Attach user-provided data in key-value format
#[zenoh_macros::unstable]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self;
fn attachment<T: Into<OptionPayload>>(self, attachment: T) -> Self;
}

pub trait ValueBuilderTrait {
Expand Down Expand Up @@ -177,7 +178,8 @@ impl<T> SampleBuilderTrait for SampleBuilder<T> {
}

#[zenoh_macros::unstable]
fn attachment<U: Into<Option<Attachment>>>(self, attachment: U) -> Self {
fn attachment<U: Into<OptionPayload>>(self, attachment: U) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
sample: Sample {
attachment: attachment.into(),
Expand Down
Loading
Loading