diff --git a/commons/zenoh-buffers/src/zbuf.rs b/commons/zenoh-buffers/src/zbuf.rs index b78c2c8154..1365397966 100644 --- a/commons/zenoh-buffers/src/zbuf.rs +++ b/commons/zenoh-buffers/src/zbuf.rs @@ -20,7 +20,7 @@ use crate::{ ZSlice, }; use alloc::{sync::Arc, vec::Vec}; -use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice}; +use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr}; use zenoh_collections::SingleOrVec; fn get_mut_unchecked(arc: &mut Arc) -> &mut T { @@ -55,6 +55,85 @@ impl ZBuf { self.slices.push(zslice); } } + + pub fn splice>(&mut self, erased: Range, replacement: &[u8]) { + let start = match erased.start_bound() { + core::ops::Bound::Included(n) => *n, + core::ops::Bound::Excluded(n) => n + 1, + core::ops::Bound::Unbounded => 0, + }; + let end = match erased.end_bound() { + core::ops::Bound::Included(n) => n + 1, + core::ops::Bound::Excluded(n) => *n, + core::ops::Bound::Unbounded => self.len(), + }; + if start != end { + self.remove(start, end); + } + self.insert(start, replacement); + } + fn remove(&mut self, mut start: usize, mut end: usize) { + assert!(start <= end); + assert!(end <= self.len()); + let mut start_slice_idx = 0; + let mut start_idx_in_start_slice = 0; + let mut end_slice_idx = 0; + let mut end_idx_in_end_slice = 0; + for (i, slice) in self.slices.as_mut().iter_mut().enumerate() { + if slice.len() > start { + start_slice_idx = i; + start_idx_in_start_slice = start; + } + if slice.len() >= end { + end_slice_idx = i; + end_idx_in_end_slice = end; + break; + } + start -= slice.len(); + end -= slice.len(); + } + let start_slice = &mut self.slices.as_mut()[start_slice_idx]; + start_slice.end = start_slice.start + start_idx_in_start_slice; + let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize; + let end_slice = &mut self.slices.as_mut()[end_slice_idx]; + end_slice.start += end_idx_in_end_slice; + let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize; + self.slices.drain(drain_start..drain_end); + } + fn insert(&mut self, mut at: usize, slice: &[u8]) { + if slice.is_empty() { + return; + } + let old_at = at; + let mut slice_index = usize::MAX; + for (i, slice) in self.slices.as_ref().iter().enumerate() { + if at < slice.len() { + slice_index = i; + break; + } + if let Some(new_at) = at.checked_sub(slice.len()) { + at = new_at + } else { + panic!( + "Out of bounds insert attempted: at={old_at}, len={}", + self.len() + ) + } + } + if at != 0 { + let split = &self.slices.as_ref()[slice_index]; + let (l, r) = ( + split.subslice(0, at).unwrap(), + split.subslice(at, split.len()).unwrap(), + ); + self.slices.drain(slice_index..(slice_index + 1)); + self.slices.insert(slice_index, l); + self.slices.insert(slice_index + 1, Vec::from(slice).into()); + self.slices.insert(slice_index + 2, r); + } else { + self.slices.insert(slice_index, Vec::from(slice).into()) + } + } } // Buffer @@ -70,7 +149,7 @@ impl Buffer for ZBuf { // SplitBuffer impl SplitBuffer for ZBuf { - type Slices<'a> = iter::Map, fn(&'a ZSlice) -> &'a [u8]>; + type Slices<'a> = iter::Map, fn(&'a ZSlice) -> &'a [u8]>; fn slices(&self) -> Self::Slices<'_> { self.slices.as_ref().iter().map(ZSlice::as_slice) @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf { type Writer = ZBufWriter<'a>; fn writer(self) -> Self::Writer { + let mut cache = None; + if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() { + // Verify the ZSlice is actually a Vec + if let Some(b) = buf.as_any().downcast_ref::>() { + // Check for the length + if *end == b.len() { + cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) }) + } + } + } + ZBufWriter { inner: self, - cache: Arc::new(Vec::new()), + cache: cache.unwrap_or_else(|| Arc::new(Vec::new())), } } } @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> { } fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> { - self.write_exact(slice::from_ref(&byte)) + self.write_exact(core::slice::from_ref(&byte)) } fn remaining(&self) -> usize { diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index 2da156f8b6..c1f2489b88 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -38,7 +38,7 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output { - let NetworkMessage { body } = x; + let NetworkMessage { body, .. } = x; match body { NetworkBody::Push(b) => self.write(&mut *writer, b), diff --git a/commons/zenoh-codec/src/scouting/mod.rs b/commons/zenoh-codec/src/scouting/mod.rs index 9b462816eb..bbedce4282 100644 --- a/commons/zenoh-codec/src/scouting/mod.rs +++ b/commons/zenoh-codec/src/scouting/mod.rs @@ -31,7 +31,7 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &ScoutingMessage) -> Self::Output { - let ScoutingMessage { body } = x; + let ScoutingMessage { body, .. } = x; match body { ScoutingBody::Scout(s) => self.write(&mut *writer, s), diff --git a/commons/zenoh-codec/src/transport/mod.rs b/commons/zenoh-codec/src/transport/mod.rs index 455faa8bc3..559b5b5fda 100644 --- a/commons/zenoh-codec/src/transport/mod.rs +++ b/commons/zenoh-codec/src/transport/mod.rs @@ -81,7 +81,7 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output { - let TransportMessage { body } = x; + let TransportMessage { body, .. } = x; match body { TransportBody::Frame(b) => self.write(&mut *writer, b), diff --git a/commons/zenoh-collections/src/single_or_vec.rs b/commons/zenoh-collections/src/single_or_vec.rs index ea190395fb..0490a66a71 100644 --- a/commons/zenoh-collections/src/single_or_vec.rs +++ b/commons/zenoh-collections/src/single_or_vec.rs @@ -11,11 +11,12 @@ // Contributors: // ZettaScale Zenoh Team, // + use alloc::{vec, vec::Vec}; use core::{ cmp::PartialEq, fmt, iter, - ops::{Index, IndexMut}, + ops::{Index, IndexMut, RangeBounds}, ptr, slice, }; @@ -112,6 +113,19 @@ impl SingleOrVec { matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty()) } + fn vectorize(&mut self) -> &mut Vec { + if let SingleOrVecInner::Single(v) = &self.0 { + unsafe { + let v = core::ptr::read(v); + core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v])) + }; + } + let SingleOrVecInner::Vec(v) = &mut self.0 else { + unsafe { core::hint::unreachable_unchecked() } + }; + v + } + pub fn get(&self, index: usize) -> Option<&T> { match &self.0 { SingleOrVecInner::Single(v) => (index == 0).then_some(v), @@ -139,6 +153,55 @@ impl SingleOrVec { SingleOrVecInner::Vec(v) => v.last_mut(), } } + pub fn drain>(&mut self, range: Range) -> Drain { + match &mut self.0 { + this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain { + inner: DrainInner::Single(this), + }, + SingleOrVecInner::Vec(vec) => Drain { + inner: DrainInner::Vec(vec.drain(range)), + }, + _ => Drain { + inner: DrainInner::Done, + }, + } + } + pub fn insert(&mut self, at: usize, value: T) { + assert!(at <= self.len()); + self.vectorize().insert(at, value); + } +} +enum DrainInner<'a, T> { + Vec(alloc::vec::Drain<'a, T>), + Single(&'a mut SingleOrVecInner), + Done, +} +pub struct Drain<'a, T> { + inner: DrainInner<'a, T>, +} +impl<'a, T> Iterator for Drain<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + match &mut self.inner { + DrainInner::Vec(drain) => drain.next(), + DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } { + SingleOrVecInner::Single(value) => unsafe { + core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new())); + Some(value) + }, + SingleOrVecInner::Vec(_) => None, + }, + _ => None, + } + } +} +impl<'a, T> Drop for Drain<'a, T> { + fn drop(&mut self) { + if let DrainInner::Single(_) = self.inner { + self.next(); + } + } } impl Default for SingleOrVec { diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index c72767ad58..e67576e673 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -257,7 +257,8 @@ pub mod ext { } } - /// 7 6 5 4 3 2 1 0 + /// ```text + /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ /// % num elems % /// +-------+-+-+---+ @@ -266,6 +267,7 @@ pub mod ext { /// ~ val: ~ /// +---------------+ /// ... -- N times (key, value) tuples + /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct AttachmentType { pub buffer: ZBuf, diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index aebca309ad..097b686de9 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -23,7 +23,7 @@ async fn main() { // Initiate logging env_logger::init(); - let (config, key_expr, value) = parse_args(); + let (config, key_expr, value, attachment) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -35,7 +35,16 @@ async fn main() { sleep(Duration::from_secs(1)).await; let buf = format!("[{idx:4}] {value}"); println!("Putting Data ('{}': '{}')...", &key_expr, buf); - publisher.put(buf).res().await.unwrap(); + let mut put = publisher.put(buf); + if let Some(attachment) = &attachment { + put = put.with_attachment( + attachment + .split('&') + .map(|pair| pair.as_bytes().split_at(pair.find('=').unwrap_or(0))) + .collect(), + ) + } + put.res().await.unwrap(); } } @@ -47,11 +56,16 @@ struct Args { #[arg(short, long, default_value = "Pub from Rust!")] /// The value to write. value: String, + #[arg(short, long)] + /// The attachments to add to each put. + /// + /// The key-value pairs are &-separated, and = serves as the separator between key and value. + attach: Option, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>, String) { +fn parse_args() -> (Config, KeyExpr<'static>, String, Option) { let args = Args::parse(); - (args.common.into(), args.key, args.value) + (args.common.into(), args.key, args.value, args.attach) } diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index 433444b8de..618115e6f4 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -11,9 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // + use clap::Parser; use std::convert::TryInto; -use zenoh::config::Config; use zenoh::prelude::sync::*; use zenoh::publication::CongestionControl; use zenoh_examples::CommonArgs; @@ -21,14 +21,21 @@ use zenoh_examples::CommonArgs; fn main() { // initiate logging env_logger::init(); - let (config, size, prio, print, number) = parse_args(); + let args = Args::parse(); + + let mut prio = Priority::default(); + if let Some(p) = args.priority { + prio = p.try_into().unwrap(); + } + + let payload_size = args.payload_size; - let data: Value = (0usize..size) + let data: Value = (0usize..dbg!(payload_size)) .map(|i| (i % 10) as u8) .collect::>() .into(); - let session = zenoh::open(config).res().unwrap(); + let session = zenoh::open(args.common).res().unwrap(); let publisher = session .declare_publisher("test/thr") @@ -42,8 +49,8 @@ fn main() { loop { publisher.put(data.clone()).res().unwrap(); - if print { - if count < number { + if args.print { + if count < args.number { count += 1; } else { let thpt = count as f64 / start.elapsed().as_secs_f64(); @@ -57,34 +64,17 @@ fn main() { #[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)] struct Args { - #[arg(short, long)] /// Priority for sending data + #[arg(short, long)] priority: Option, - #[arg(short = 't', long)] /// Print the statistics + #[arg(short = 't', long)] print: bool, - #[arg(short, long, default_value = "100000")] /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "100000")] number: usize, /// Sets the size of the payload to publish payload_size: usize, #[command(flatten)] common: CommonArgs, } - -fn parse_args() -> (Config, usize, Priority, bool, usize) { - let args = Args::parse(); - - let mut prio = Priority::default(); - if let Some(p) = args.priority { - prio = p.try_into().unwrap(); - } - - ( - args.common.into(), - args.payload_size, - prio, - args.print, - args.number, - ) -} diff --git a/examples/src/lib.rs b/examples/src/lib.rs index a766bd0695..255ac01917 100644 --- a/examples/src/lib.rs +++ b/examples/src/lib.rs @@ -4,9 +4,8 @@ //! use zenoh::config::Config; -#[derive(clap::ValueEnum, Default, Clone, Copy, PartialEq, Eq, Hash, Debug)] +#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum Wai { - #[default] Peer, Client, Router, diff --git a/zenoh/src/admin.rs b/zenoh/src/admin.rs index a8aad9c809..8cdf638af5 100644 --- a/zenoh/src/admin.rs +++ b/zenoh/src/admin.rs @@ -153,6 +153,8 @@ impl TransportMulticastEventHandler for Handler { &expr, Some(info), serde_json::to_vec(&peer).unwrap().into(), + #[cfg(feature = "unstable")] + None, ); Ok(Arc::new(PeerHandler { expr, @@ -200,6 +202,8 @@ impl TransportPeerEventHandler for PeerHandler { .with_suffix(&format!("/link/{}", s.finish())), Some(info), serde_json::to_vec(&link).unwrap().into(), + #[cfg(feature = "unstable")] + None, ); } @@ -218,6 +222,8 @@ impl TransportPeerEventHandler for PeerHandler { .with_suffix(&format!("/link/{}", s.finish())), Some(info), vec![0u8; 0].into(), + #[cfg(feature = "unstable")] + None, ); } @@ -228,8 +234,14 @@ impl TransportPeerEventHandler for PeerHandler { kind: SampleKind::Delete, ..Default::default() }; - self.session - .handle_data(true, &self.expr, Some(info), vec![0u8; 0].into()); + self.session.handle_data( + true, + &self.expr, + Some(info), + vec![0u8; 0].into(), + #[cfg(feature = "unstable")] + None, + ); } fn as_any(&self) -> &dyn std::any::Any { diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 1e36cb8f69..a29d4b5d4a 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -749,6 +749,8 @@ where Locality::default(), self.timeout, None, + #[cfg(feature = "unstable")] + None, callback, ) .map(|_| receiver) diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 08b00c5047..96ea85f6b4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -378,6 +378,8 @@ impl Primitives for AdminSpace { qid: msg.id, zid, primitives, + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }), }; diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index bad74733a6..8a84e49566 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -19,6 +19,8 @@ use crate::handlers::Callback; use crate::handlers::DefaultHandler; use crate::net::transport::primitives::Primitives; use crate::prelude::*; +#[zenoh_macros::unstable] +use crate::sample::Attachment; use crate::sample::DataInfo; use crate::Encoding; use crate::SessionRef; @@ -78,6 +80,8 @@ pub struct PutBuilder<'a, 'b> { pub(crate) publisher: PublisherBuilder<'a, 'b>, pub(crate) value: Value, pub(crate) kind: SampleKind, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl PutBuilder<'_, '_> { @@ -117,6 +121,12 @@ impl PutBuilder<'_, '_> { self.kind = kind; self } + + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl Resolvable for PutBuilder<'_, '_> { @@ -130,6 +140,8 @@ impl SyncResolve for PutBuilder<'_, '_> { publisher, value, kind, + #[cfg(feature = "unstable")] + attachment, } = self; let key_expr = publisher.key_expr?; log::trace!("write({:?}, [...])", &key_expr); @@ -151,22 +163,42 @@ impl SyncResolve for PutBuilder<'_, '_> { ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), payload: match kind { - SampleKind::Put => PushBody::Put(Put { - timestamp, - encoding: value.encoding.clone(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, // @TODO: expose it in the API - ext_unknown: vec![], - payload: value.payload.clone(), - }), - SampleKind::Delete => PushBody::Del(Del { - timestamp, - ext_sinfo: None, - ext_attachment: None, // @TODO: expose it in the API - ext_unknown: vec![], - }), + SampleKind::Put => { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } + PushBody::Put(Put { + timestamp, + encoding: value.encoding.clone(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment, + ext_unknown: vec![], + payload: value.payload.clone(), + }) + } + SampleKind::Delete => { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } + PushBody::Del(Del { + timestamp, + ext_sinfo: None, + ext_attachment, + ext_unknown: vec![], + }) + } }, }); } @@ -183,6 +215,8 @@ impl SyncResolve for PutBuilder<'_, '_> { &key_expr.to_wire(&publisher.session), Some(data_info), value.payload, + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) @@ -339,6 +373,8 @@ impl<'a> Publisher<'a> { publisher: self, value, kind, + #[cfg(feature = "unstable")] + attachment: None, } } @@ -623,6 +659,16 @@ pub struct Publication<'a> { publisher: &'a Publisher<'a>, value: Value, kind: SampleKind, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, +} + +impl<'a> Publication<'a> { + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl Resolvable for Publication<'_> { @@ -635,6 +681,8 @@ impl SyncResolve for Publication<'_> { publisher, value, kind, + #[cfg(feature = "unstable")] + attachment, } = self; log::trace!("write({:?}, [...])", publisher.key_expr); let primitives = zread!(publisher.session.state) @@ -645,6 +693,14 @@ impl SyncResolve for Publication<'_> { let timestamp = publisher.session.runtime.new_timestamp(); if publisher.destination != Locality::SessionLocal { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } primitives.send_push(Push { wire_expr: publisher.key_expr.to_wire(&publisher.session).to_owned(), ext_qos: ext::QoSType::new( @@ -660,7 +716,7 @@ impl SyncResolve for Publication<'_> { ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, - ext_attachment: None, // @TODO: expose it in the API + ext_attachment, ext_unknown: vec![], payload: value.payload.clone(), }), @@ -678,6 +734,8 @@ impl SyncResolve for Publication<'_> { &publisher.key_expr.to_wire(&publisher.session), Some(data_info), value.payload, + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index 18cb7e882e..c4f3fb35e9 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -16,6 +16,8 @@ use crate::handlers::{locked, Callback, DefaultHandler}; use crate::prelude::*; +#[zenoh_macros::unstable] +use crate::sample::Attachment; use crate::Session; use std::collections::HashMap; use std::future::Ready; @@ -126,6 +128,8 @@ pub struct GetBuilder<'a, 'b, Handler> { pub(crate) timeout: Duration, pub(crate) handler: Handler, pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { @@ -159,6 +163,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: _, } = self; GetBuilder { @@ -170,6 +176,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: callback, } } @@ -238,6 +246,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler: _, } = self; GetBuilder { @@ -249,6 +259,8 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> { destination, timeout, value, + #[cfg(feature = "unstable")] + attachment, handler, } } @@ -294,6 +306,12 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { self } + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } + /// By default, `get` guarantees that it will only receive replies whose key expressions intersect /// with the queried key expression. /// @@ -310,6 +328,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { destination, timeout, value, + attachment, handler, } = self; Self { @@ -321,6 +340,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> { destination, timeout, value, + attachment, handler, } } @@ -369,6 +389,8 @@ where self.destination, self.timeout, self.value, + #[cfg(feature = "unstable")] + self.attachment, callback, ) .map(|_| receiver) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 6afd0516b1..914684f76f 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -18,6 +18,9 @@ use crate::handlers::{locked, DefaultHandler}; use crate::prelude::*; #[zenoh_macros::unstable] use crate::query::ReplyKeyExpr; +#[zenoh_macros::unstable] +use crate::sample::Attachment; +use crate::sample::DataInfo; use crate::SessionRef; use crate::Undeclarable; @@ -45,6 +48,8 @@ pub(crate) struct QueryInner { pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -91,6 +96,11 @@ impl Query { self.inner.value.as_ref() } + #[zenoh_macros::unstable] + pub fn attachment(&self) -> Option<&Attachment> { + self.inner.attachment.as_ref() + } + /// Sends a reply to this Query. /// /// By default, queries only accept replies whose key expression intersects with the query's. @@ -150,6 +160,20 @@ pub struct ReplyBuilder<'a> { result: Result, } +impl<'a> ReplyBuilder<'a> { + #[allow(clippy::result_large_err)] + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Result { + match &mut self.result { + Ok(sample) => { + sample.attachment = Some(attachment); + Ok(self) + } + Err(_) => Err((self, attachment)), + } + } +} + impl<'a> Resolvable for ReplyBuilder<'a> { type To = ZResult<()>; } @@ -163,7 +187,34 @@ impl SyncResolve for ReplyBuilder<'_> { { bail!("Attempted to reply on `{}`, which does not intersect with query `{}`, despite query only allowing replies on matching key expressions", sample.key_expr, self.query.key_expr()) } - let (key_expr, payload, data_info) = sample.split(); + let Sample { + key_expr, + value: Value { payload, encoding }, + kind, + timestamp, + #[cfg(feature = "unstable")] + source_info, + #[cfg(feature = "unstable")] + attachment, + } = sample; + #[allow(unused_mut)] + let mut data_info = DataInfo { + kind, + encoding: Some(encoding), + timestamp, + source_id: None, + source_sn: None, + }; + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + data_info.source_id = source_info.source_id; + data_info.source_sn = source_info.source_sn; + if let Some(attachment) = attachment { + ext_attachment = Some(attachment.into()); + } + } self.query.inner.primitives.send_response(Response { rid: self.query.inner.qid, wire_expr: WireExpr { @@ -187,7 +238,7 @@ impl SyncResolve for ReplyBuilder<'_> { ext_consolidation: ConsolidationType::default(), #[cfg(feature = "shared-memory")] ext_shm: None, - ext_attachment: None, // @TODO: expose it in the API + ext_attachment, ext_unknown: vec![], payload, }), diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 1d3c168e40..083e6fced5 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -98,6 +98,221 @@ impl From> for SourceInfo { } } +mod attachment { + #[zenoh_macros::unstable] + use zenoh_buffers::{ + reader::{HasReader, Reader}, + writer::HasWriter, + ZBuf, ZBufReader, ZSlice, + }; + #[zenoh_macros::unstable] + use zenoh_codec::{RCodec, WCodec, Zenoh080}; + #[zenoh_macros::unstable] + use zenoh_protocol::zenoh::ext::AttachmentType; + + /// A builder for [`Attachment`] + #[zenoh_macros::unstable] + pub struct AttachmentBuilder { + pub(crate) inner: Vec, + } + #[zenoh_macros::unstable] + impl Default for AttachmentBuilder { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl AttachmentBuilder { + pub fn new() -> Self { + Self { inner: Vec::new() } + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + pub fn build(self) -> Attachment { + Attachment { + inner: self.inner.into(), + } + } + } + #[zenoh_macros::unstable] + impl From for Attachment { + fn from(value: AttachmentBuilder) -> Self { + Attachment { + inner: value.inner.into(), + } + } + } + #[zenoh_macros::unstable] + #[derive(Clone)] + pub struct Attachment { + pub(crate) inner: ZBuf, + } + #[zenoh_macros::unstable] + impl Default for Attachment { + fn default() -> Self { + Self::new() + } + } + #[zenoh_macros::unstable] + impl From for AttachmentType { + fn from(this: Attachment) -> Self { + AttachmentType { buffer: this.inner } + } + } + #[zenoh_macros::unstable] + impl From> for Attachment { + fn from(this: AttachmentType) -> Self { + Attachment { inner: this.buffer } + } + } + #[zenoh_macros::unstable] + impl Attachment { + pub fn new() -> Self { + Self { + inner: ZBuf::empty(), + } + } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn len(&self) -> usize { + self.iter().count() + } + pub fn iter(&self) -> AttachmentIterator { + self.into_iter() + } + fn _get(&self, key: &[u8]) -> Option { + self.iter() + .find_map(|(k, v)| (k.as_slice() == key).then_some(v)) + } + pub fn get>(&self, key: &Key) -> Option { + self._get(key.as_ref()) + } + fn _insert(&mut self, key: &[u8], value: &[u8]) { + let codec = Zenoh080; + let mut writer = self.inner.writer(); + codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure + codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure + } + /// Inserts a key-value pair to the attachment. + /// + /// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key. + /// + /// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`] + pub fn insert + ?Sized, Value: AsRef<[u8]> + ?Sized>( + &mut self, + key: &Key, + value: &Value, + ) { + self._insert(key.as_ref(), value.as_ref()) + } + fn _extend(&mut self, with: Self) -> &mut Self { + for slice in with.inner.zslices().cloned() { + self.inner.push_zslice(slice); + } + self + } + pub fn extend(&mut self, with: impl Into) -> &mut Self { + let with = with.into(); + self._extend(with) + } + } + #[zenoh_macros::unstable] + pub struct AttachmentIterator<'a> { + reader: ZBufReader<'a>, + } + #[zenoh_macros::unstable] + impl<'a> core::iter::IntoIterator for &'a Attachment { + type Item = (ZSlice, ZSlice); + type IntoIter = AttachmentIterator<'a>; + fn into_iter(self) -> Self::IntoIter { + AttachmentIterator { + reader: self.inner.reader(), + } + } + } + #[zenoh_macros::unstable] + impl core::fmt::Debug for Attachment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{{")?; + for (key, value) in self { + let key = key.as_slice(); + let value = value.as_slice(); + match core::str::from_utf8(key) { + Ok(key) => write!(f, "\"{key}\": ")?, + Err(_) => { + write!(f, "0x")?; + for byte in key { + write!(f, "{byte:02X}")? + } + } + } + match core::str::from_utf8(value) { + Ok(value) => write!(f, "\"{value}\", ")?, + Err(_) => { + write!(f, "0x")?; + for byte in value { + write!(f, "{byte:02X}")? + } + write!(f, ", ")? + } + } + } + write!(f, "}}") + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::Iterator for AttachmentIterator<'a> { + type Item = (ZSlice, ZSlice); + fn next(&mut self) -> Option { + let key = Zenoh080.read(&mut self.reader).ok()?; + let value = Zenoh080.read(&mut self.reader).ok()?; + Some((key, value)) + } + fn size_hint(&self) -> (usize, Option) { + ( + (self.reader.remaining() != 0) as usize, + Some(self.reader.remaining() / 2), + ) + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder { + fn from_iter>(iter: T) -> Self { + let codec = Zenoh080; + let mut buffer: Vec = Vec::new(); + let mut writer = buffer.writer(); + for (key, value) in iter { + codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures + codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures + } + Self { inner: buffer } + } + } + #[zenoh_macros::unstable] + impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment { + fn from_iter>(iter: T) -> Self { + AttachmentBuilder::from_iter(iter).into() + } + } +} +#[zenoh_macros::unstable] +pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; + /// A zenoh sample. #[non_exhaustive] #[derive(Clone, Debug)] @@ -120,6 +335,16 @@ pub struct Sample { /// /// Infos on the source of this Sample. pub source_info: SourceInfo, + + #[cfg(feature = "unstable")] + ///
+ /// 🔬 + /// This API has been marked as unstable: it works as advertised, but we may change it in a future release. + /// To use it, you must enable zenoh's unstable feature flag. + ///
+ /// + /// A map of key-value pairs, where each key and value are byte-slices. + pub attachment: Option, } impl Sample { @@ -137,6 +362,8 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, } } /// Creates a new Sample. @@ -157,6 +384,8 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, }) } @@ -179,6 +408,8 @@ impl Sample { timestamp: data_info.timestamp, #[cfg(feature = "unstable")] source_info: data_info.into(), + #[cfg(feature = "unstable")] + attachment: None, } } else { Sample { @@ -188,28 +419,12 @@ impl Sample { timestamp: None, #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, } } } - #[inline] - pub(crate) fn split(self) -> (KeyExpr<'static>, ZBuf, DataInfo) { - let info = DataInfo { - kind: self.kind, - encoding: Some(self.value.encoding), - timestamp: self.timestamp, - #[cfg(feature = "unstable")] - source_id: self.source_info.source_id, - #[cfg(not(feature = "unstable"))] - source_id: None, - #[cfg(feature = "unstable")] - source_sn: self.source_info.source_sn, - #[cfg(not(feature = "unstable"))] - source_sn: None, - }; - (self.key_expr, self.value.payload, info) - } - /// Gets the timestamp of this Sample. #[inline] pub fn get_timestamp(&self) -> Option<&Timestamp> { @@ -244,6 +459,23 @@ impl Sample { self.timestamp.as_ref().unwrap() } } + + #[zenoh_macros::unstable] + pub fn attachment(&self) -> Option<&Attachment> { + self.attachment.as_ref() + } + + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> &mut Option { + &mut self.attachment + } + + #[allow(clippy::result_large_err)] + #[zenoh_macros::unstable] + pub fn with_attachment(mut self, attachment: Attachment) -> Self { + self.attachment = Some(attachment); + self + } } impl std::ops::Deref for Sample { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index e59bc83d2a..6609d1361d 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -28,6 +28,8 @@ use crate::prelude::{KeyExpr, Parameters}; use crate::publication::*; use crate::query::*; use crate::queryable::*; +#[cfg(feature = "unstable")] +use crate::sample::Attachment; use crate::sample::DataInfo; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; @@ -721,6 +723,8 @@ impl Session { publisher: self.declare_publisher(key_expr), value: value.into(), kind: SampleKind::Put, + #[cfg(feature = "unstable")] + attachment: None, } } @@ -752,6 +756,8 @@ impl Session { publisher: self.declare_publisher(key_expr), value: Value::empty(), kind: SampleKind::Delete, + #[cfg(feature = "unstable")] + attachment: None, } } /// Query data from the matching queryables in the system. @@ -794,6 +800,8 @@ impl Session { destination: Locality::default(), timeout: Duration::from_millis(unwrap_or_default!(conf.queries_default_timeout())), value: None, + #[cfg(feature = "unstable")] + attachment: None, handler: DefaultHandler, } } @@ -1606,6 +1614,7 @@ impl Session { key_expr: &WireExpr, info: Option, payload: ZBuf, + #[cfg(feature = "unstable")] attachment: Option, ) { let mut callbacks = SingleOrVec::default(); let state = zread!(self.state); @@ -1706,10 +1715,22 @@ impl Session { drop(state); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - cb(Sample::with_info(key_expr, payload.clone(), info.clone())); + #[allow(unused_mut)] + let mut sample = Sample::with_info(key_expr, payload.clone(), info.clone()); + #[cfg(feature = "unstable")] + { + sample.attachment = attachment.clone(); + } + cb(sample); } if let Some((cb, key_expr)) = last { - cb(Sample::with_info(key_expr, payload, info)); + #[allow(unused_mut)] + let mut sample = Sample::with_info(key_expr, payload, info); + #[cfg(feature = "unstable")] + { + sample.attachment = attachment; + } + cb(sample); } } @@ -1746,6 +1767,7 @@ impl Session { destination: Locality, timeout: Duration, value: Option, + #[cfg(feature = "unstable")] attachment: Option, callback: Callback<'static, Reply>, ) -> ZResult<()> { log::trace!("get({}, {:?}, {:?})", selector, target, consolidation); @@ -1813,6 +1835,14 @@ impl Session { drop(state); if destination != Locality::SessionLocal { + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + if let Some(attachment) = attachment.clone() { + ext_attachment = Some(attachment.into()); + } + } primitives.send_request(Request { id: qid, wire_expr: wexpr.clone(), @@ -1832,7 +1862,7 @@ impl Session { encoding: v.encoding.clone(), payload: v.payload.clone(), }), - ext_attachment: None, // @TODO: expose it in the API + ext_attachment, ext_unknown: vec![], }), }); @@ -1851,6 +1881,8 @@ impl Session { encoding: v.encoding.clone(), payload: v.payload.clone(), }), + #[cfg(feature = "unstable")] + attachment, ); } Ok(()) @@ -1866,6 +1898,7 @@ impl Session { _target: TargetType, _consolidation: ConsolidationType, body: Option, + #[cfg(feature = "unstable")] attachment: Option, ) { let (primitives, key_expr, callbacks) = { let state = zread!(self.state); @@ -1926,6 +1959,8 @@ impl Session { } else { primitives }, + #[cfg(feature = "unstable")] + attachment, }), }; for callback in callbacks.iter() { @@ -2124,7 +2159,14 @@ impl Primitives for Session { .starts_with(crate::liveliness::PREFIX_LIVELINESS) { drop(state); - self.handle_data(false, &m.wire_expr, None, ZBuf::default()); + self.handle_data( + false, + &m.wire_expr, + None, + ZBuf::default(), + #[cfg(feature = "unstable")] + None, + ); } } Err(err) => { @@ -2156,6 +2198,8 @@ impl Primitives for Session { &m.ext_wire_expr.wire_expr, Some(data_info), ZBuf::default(), + #[cfg(feature = "unstable")] + None, ); } } @@ -2190,7 +2234,14 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.handle_data(false, &msg.wire_expr, Some(info), m.payload) + self.handle_data( + false, + &msg.wire_expr, + Some(info), + m.payload, + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), + ) } PushBody::Del(m) => { let info = DataInfo { @@ -2200,7 +2251,14 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - self.handle_data(false, &msg.wire_expr, Some(info), ZBuf::empty()) + self.handle_data( + false, + &msg.wire_expr, + Some(info), + ZBuf::empty(), + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), + ) } } } @@ -2216,6 +2274,8 @@ impl Primitives for Session { msg.ext_target, m.ext_consolidation, m.ext_body, + #[cfg(feature = "unstable")] + m.ext_attachment.map(Into::into), ), RequestBody::Put(_) => (), RequestBody::Del(_) => (), @@ -2327,12 +2387,15 @@ impl Primitives for Session { source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; + #[allow(unused_mut)] + let mut sample = + Sample::with_info(key_expr.into_owned(), m.payload, Some(info)); + #[cfg(feature = "unstable")] + { + sample.attachment = m.ext_attachment.map(Into::into); + } let new_reply = Reply { - sample: Ok(Sample::with_info( - key_expr.into_owned(), - m.payload, - Some(info), - )), + sample: Ok(sample), replier_id: ZenohId::rand(), // TODO }; let callback = diff --git a/zenoh/tests/attachments.rs b/zenoh/tests/attachments.rs new file mode 100644 index 0000000000..d1fbd1086a --- /dev/null +++ b/zenoh/tests/attachments.rs @@ -0,0 +1,112 @@ +#[cfg(feature = "unstable")] +#[test] +fn pubsub() { + use zenoh::prelude::sync::*; + + let zenoh = zenoh::open(Config::default()).res().unwrap(); + let _sub = zenoh + .declare_subscriber("test/attachment") + .callback(|sample| { + println!( + "{}", + std::str::from_utf8(&sample.payload.contiguous()).unwrap() + ); + for (k, v) in &sample.attachment.unwrap() { + assert!(k.iter().rev().zip(v.as_slice()).all(|(k, v)| k == v)) + } + }) + .res() + .unwrap(); + let publisher = zenoh.declare_publisher("test/attachment").res().unwrap(); + for i in 0..10 { + let mut backer = [( + [0; std::mem::size_of::()], + [0; std::mem::size_of::()], + ); 10]; + for (j, backer) in backer.iter_mut().enumerate() { + *backer = ((i * 10 + j).to_le_bytes(), (i * 10 + j).to_be_bytes()) + } + zenoh + .put("test/attachment", "put") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + publisher + .put("publisher") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + } +} +#[cfg(feature = "unstable")] +#[test] +fn queries() { + use zenoh::{prelude::sync::*, sample::Attachment}; + + let zenoh = zenoh::open(Config::default()).res().unwrap(); + let _sub = zenoh + .declare_queryable("test/attachment") + .callback(|query| { + println!( + "{}", + std::str::from_utf8( + &query + .value() + .map(|q| q.payload.contiguous()) + .unwrap_or_default() + ) + .unwrap() + ); + let mut attachment = Attachment::new(); + for (k, v) in query.attachment().unwrap() { + assert!(k.iter().rev().zip(v.as_slice()).all(|(k, v)| k == v)); + attachment.insert(&k, &k); + } + query + .reply(Ok(Sample::new( + query.key_expr().clone(), + query.value().unwrap().clone(), + ) + .with_attachment(attachment))) + .res() + .unwrap(); + }) + .res() + .unwrap(); + for i in 0..10 { + let mut backer = [( + [0; std::mem::size_of::()], + [0; std::mem::size_of::()], + ); 10]; + for (j, backer) in backer.iter_mut().enumerate() { + *backer = ((i * 10 + j).to_le_bytes(), (i * 10 + j).to_be_bytes()) + } + let get = zenoh + .get("test/attachment") + .with_value("query") + .with_attachment( + backer + .iter() + .map(|b| (b.0.as_slice(), b.1.as_slice())) + .collect(), + ) + .res() + .unwrap(); + while let Ok(reply) = get.recv() { + let response = reply.sample.as_ref().unwrap(); + for (k, v) in response.attachment().unwrap() { + assert_eq!(k, v) + } + } + } +}