Skip to content

Commit

Permalink
Protocol attachment to payload (#923)
Browse files Browse the repository at this point in the history
* Attachment typedef

* Fix io::Write for ZBuf and Payload

* FIx doc

* Add payload serializer test

* OptionPayload for API ergonomicity
  • Loading branch information
Mallets authored Apr 11, 2024
1 parent 834be85 commit a1b50dd
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 291 deletions.
8 changes: 4 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,12 +720,12 @@ impl BacktrackableWriter for ZBufWriter<'_> {
#[cfg(feature = "std")]
impl<'a> io::Write for ZBufWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match <Self as Writer>::write(self, buf) {
Ok(n) => Ok(n.get()),
Err(_) => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"UnexpectedEof",
)),
Err(_) => Err(io::ErrorKind::UnexpectedEof.into()),
}
}

Expand Down
27 changes: 6 additions & 21 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,12 @@ async fn main() {
tokio::time::sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.attachment(Some(
attachment
.split('&')
.map(|pair| split_once(pair, '='))
.collect(),
))
}
put.res().await.unwrap();
publisher
.put(buf)
.attachment(&attachment)
.res()
.await
.unwrap();
}
}

Expand All @@ -65,17 +61,6 @@ struct Args {
common: CommonArgs,
}

fn split_once(s: &str, c: char) -> (&[u8], &[u8]) {
let s_bytes = s.as_bytes();
match s.find(c) {
Some(index) => {
let (l, r) = s_bytes.split_at(index);
(l, &r[1..])
}
None => (s_bytes, &[]),
}
}

fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.value, args.attach)
Expand Down
9 changes: 8 additions & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ async fn main() {
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
print!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind(),
sample.key_expr().as_str(),
payload
);
if let Some(att) = sample.attachment() {
let att = att
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
print!(" ({})", att);
}
println!();
}
}

Expand Down
106 changes: 88 additions & 18 deletions zenoh/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ impl Payload {
Ok(Payload::new(buf))
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Get a [`PayloadReader`] implementing [`std::io::Read`] trait.
pub fn iter<T>(&self) -> PayloadIterator<'_, T>
where
Expand All @@ -104,12 +109,7 @@ impl Payload {
}
}

/// Get a [`PayloadWriter`] implementing [`std::io::Write`] trait.
pub fn writer(&mut self) -> PayloadWriter<'_> {
PayloadWriter(self.0.writer())
}

/// Encode an object of type `T` as a [`Value`] using the [`ZSerde`].
/// Serialize an object of type `T` as a [`Value`] using the [`ZSerde`].
///
/// ```rust
/// use zenoh::payload::Payload;
Expand All @@ -126,7 +126,7 @@ impl Payload {
ZSerde.serialize(t)
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn deserialize<'a, T>(&'a self) -> ZResult<T>
where
ZSerde: Deserialize<'a, T>,
Expand All @@ -137,7 +137,7 @@ impl Payload {
.map_err(|e| zerror!("{:?}", e).into())
}

/// Decode an object of type `T` from a [`Value`] using the [`ZSerde`].
/// Infallibly deserialize an object of type `T` from a [`Value`] using the [`ZSerde`].
pub fn into<'a, T>(&'a self) -> T
where
ZSerde: Deserialize<'a, T, Error = Infallible>,
Expand Down Expand Up @@ -231,6 +231,50 @@ where
}
}

/// Wrapper type for API ergonomicity to allow any type `T` to be converted into `Option<Payload>` where `T` implements `Into<Payload>`.
#[repr(transparent)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct OptionPayload(Option<Payload>);

impl<T> From<T> for OptionPayload
where
T: Into<Payload>,
{
fn from(value: T) -> Self {
Self(Some(value.into()))
}
}

impl<T> From<Option<T>> for OptionPayload
where
T: Into<Payload>,
{
fn from(mut value: Option<T>) -> Self {
match value.take() {
Some(v) => Self(Some(v.into())),
None => Self(None),
}
}
}

impl<T> From<&Option<T>> for OptionPayload
where
for<'a> &'a T: Into<Payload>,
{
fn from(value: &Option<T>) -> Self {
match value.as_ref() {
Some(v) => Self(Some(v.into())),
None => Self(None),
}
}
}

impl From<OptionPayload> for Option<Payload> {
fn from(value: OptionPayload) -> Self {
value.0
}
}

/// The default serializer for Zenoh payload. It supports primitives types, such as: vec<u8>, int, uint, float, string, bool.
/// It also supports common Rust serde values.
#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -858,7 +902,7 @@ impl Serialize<&serde_yaml::Value> for ZSerde {

fn serialize(self, t: &serde_yaml::Value) -> Self::Output {
let mut payload = Payload::empty();
serde_yaml::to_writer(payload.0.writer(), t)?;
serde_yaml::to_writer(payload.writer(), t)?;
Ok(payload)
}
}
Expand Down Expand Up @@ -1092,15 +1136,9 @@ impl TryFrom<Payload> for SharedMemoryBuf {
}

// Tuple
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
let (a, b) = t;
macro_rules! impl_tuple {
($t:expr) => {{
let (a, b) = $t;

let codec = Zenoh080::new();
let mut buffer: ZBuf = ZBuf::empty();
Expand All @@ -1117,6 +1155,29 @@ where
}

Payload::new(buffer)
}};
}
impl<A, B> Serialize<(A, B)> for ZSerde
where
A: Into<Payload>,
B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: (A, B)) -> Self::Output {
impl_tuple!(t)
}
}

impl<A, B> Serialize<&(A, B)> for ZSerde
where
for<'a> &'a A: Into<Payload>,
for<'b> &'b B: Into<Payload>,
{
type Output = Payload;

fn serialize(self, t: &(A, B)) -> Self::Output {
impl_tuple!(t)
}
}

Expand Down Expand Up @@ -1402,5 +1463,14 @@ mod tests {
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(usize, Vec<u8>)>());
assert_eq!(hm, o);

let mut hm: HashMap<String, String> = HashMap::new();
hm.insert(String::from("0"), String::from("a"));
hm.insert(String::from("1"), String::from("b"));
println!("Serialize:\t{:?}", hm);
let p = Payload::from_iter(hm.iter());
println!("Deserialize:\t{:?}\n", p);
let o = HashMap::from_iter(p.iter::<(String, String)>());
assert_eq!(hm, o);
}
}
4 changes: 3 additions & 1 deletion zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Publishing primitives.
use crate::net::primitives::Primitives;
use crate::payload::OptionPayload;
use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
Expand Down Expand Up @@ -167,7 +168,8 @@ impl<P, T> SampleBuilderTrait for PublicationBuilder<P, T> {
}
}
#[cfg(feature = "unstable")]
fn attachment<TA: Into<Option<Attachment>>>(self, attachment: TA) -> Self {
fn attachment<TA: Into<OptionPayload>>(self, attachment: TA) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Query primitives.
use crate::handlers::{locked, Callback, DefaultHandler};
use crate::payload::OptionPayload;
use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
Expand Down Expand Up @@ -144,7 +145,8 @@ impl<Handler> SampleBuilderTrait for GetBuilder<'_, '_, Handler> {
}

#[cfg(feature = "unstable")]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self {
fn attachment<T: Into<OptionPayload>>(self, attachment: T) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use crate::encoding::Encoding;
use crate::handlers::{locked, DefaultHandler};
use crate::net::primitives::Primitives;
use crate::payload::OptionPayload;
use crate::prelude::*;
use crate::sample::builder::SampleBuilder;
use crate::sample::QoSBuilder;
Expand Down Expand Up @@ -308,7 +309,8 @@ impl<T> TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
impl<T> SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
fn attachment<U: Into<Option<Attachment>>>(self, attachment: U) -> Self {
fn attachment<U: Into<OptionPayload>>(self, attachment: U) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
attachment: attachment.into(),
..self
Expand Down
8 changes: 5 additions & 3 deletions zenoh/src/sample/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

use std::marker::PhantomData;

use crate::payload::OptionPayload;
#[cfg(feature = "unstable")]
use crate::sample::{Attachment, SourceInfo};
use crate::sample::SourceInfo;
use crate::sample::{QoS, QoSBuilder};
use crate::Encoding;
use crate::KeyExpr;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub trait SampleBuilderTrait {
fn source_info(self, source_info: SourceInfo) -> Self;
/// Attach user-provided data in key-value format
#[zenoh_macros::unstable]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self;
fn attachment<T: Into<OptionPayload>>(self, attachment: T) -> Self;
}

pub trait ValueBuilderTrait {
Expand Down Expand Up @@ -177,7 +178,8 @@ impl<T> SampleBuilderTrait for SampleBuilder<T> {
}

#[zenoh_macros::unstable]
fn attachment<U: Into<Option<Attachment>>>(self, attachment: U) -> Self {
fn attachment<U: Into<OptionPayload>>(self, attachment: U) -> Self {
let attachment: OptionPayload = attachment.into();
Self {
sample: Sample {
attachment: attachment.into(),
Expand Down
Loading

0 comments on commit a1b50dd

Please sign in to comment.