Skip to content

Commit

Permalink
Attachment api (#613)
Browse files Browse the repository at this point in the history
Co-authored-by: Pierre Avital <[email protected]>
  • Loading branch information
p-avital and Pierre Avital authored Dec 13, 2023
1 parent 08dd81a commit 678cc57
Show file tree
Hide file tree
Showing 18 changed files with 803 additions and 91 deletions.
98 changes: 94 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -55,6 +55,85 @@ impl ZBuf {
self.slices.push(zslice);
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
core::ops::Bound::Excluded(n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match erased.end_bound() {
core::ops::Bound::Included(n) => n + 1,
core::ops::Bound::Excluded(n) => *n,
core::ops::Bound::Unbounded => self.len(),
};
if start != end {
self.remove(start, end);
}
self.insert(start, replacement);
}
fn remove(&mut self, mut start: usize, mut end: usize) {
assert!(start <= end);
assert!(end <= self.len());
let mut start_slice_idx = 0;
let mut start_idx_in_start_slice = 0;
let mut end_slice_idx = 0;
let mut end_idx_in_end_slice = 0;
for (i, slice) in self.slices.as_mut().iter_mut().enumerate() {
if slice.len() > start {
start_slice_idx = i;
start_idx_in_start_slice = start;
}
if slice.len() >= end {
end_slice_idx = i;
end_idx_in_end_slice = end;
break;
}
start -= slice.len();
end -= slice.len();
}
let start_slice = &mut self.slices.as_mut()[start_slice_idx];
start_slice.end = start_slice.start + start_idx_in_start_slice;
let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize;
let end_slice = &mut self.slices.as_mut()[end_slice_idx];
end_slice.start += end_idx_in_end_slice;
let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize;
self.slices.drain(drain_start..drain_end);
}
fn insert(&mut self, mut at: usize, slice: &[u8]) {
if slice.is_empty() {
return;
}
let old_at = at;
let mut slice_index = usize::MAX;
for (i, slice) in self.slices.as_ref().iter().enumerate() {
if at < slice.len() {
slice_index = i;
break;
}
if let Some(new_at) = at.checked_sub(slice.len()) {
at = new_at
} else {
panic!(
"Out of bounds insert attempted: at={old_at}, len={}",
self.len()
)
}
}
if at != 0 {
let split = &self.slices.as_ref()[slice_index];
let (l, r) = (
split.subslice(0, at).unwrap(),
split.subslice(at, split.len()).unwrap(),
);
self.slices.drain(slice_index..(slice_index + 1));
self.slices.insert(slice_index, l);
self.slices.insert(slice_index + 1, Vec::from(slice).into());
self.slices.insert(slice_index + 2, r);
} else {
self.slices.insert(slice_index, Vec::from(slice).into())
}
}
}

// Buffer
Expand All @@ -70,7 +149,7 @@ impl Buffer for ZBuf {

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
type Slices<'a> = iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
Expand Down Expand Up @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf {
type Writer = ZBufWriter<'a>;

fn writer(self) -> Self::Writer {
let mut cache = None;
if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() {
// Verify the ZSlice is actually a Vec<u8>
if let Some(b) = buf.as_any().downcast_ref::<Vec<u8>>() {
// Check for the length
if *end == b.len() {
cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) })
}
}
}

ZBufWriter {
inner: self,
cache: Arc::new(Vec::new()),
cache: cache.unwrap_or_else(|| Arc::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> {
}

fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> {
self.write_exact(slice::from_ref(&byte))
self.write_exact(core::slice::from_ref(&byte))
}

fn remaining(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
let NetworkMessage { body } = x;
let NetworkMessage { body, .. } = x;

match body {
NetworkBody::Push(b) => self.write(&mut *writer, b),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/scouting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &ScoutingMessage) -> Self::Output {
let ScoutingMessage { body } = x;
let ScoutingMessage { body, .. } = x;

match body {
ScoutingBody::Scout(s) => self.write(&mut *writer, s),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
let TransportMessage { body } = x;
let TransportMessage { body, .. } = x;

match body {
TransportBody::Frame(b) => self.write(&mut *writer, b),
Expand Down
65 changes: 64 additions & 1 deletion commons/zenoh-collections/src/single_or_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use alloc::{vec, vec::Vec};
use core::{
cmp::PartialEq,
fmt, iter,
ops::{Index, IndexMut},
ops::{Index, IndexMut, RangeBounds},
ptr, slice,
};

Expand Down Expand Up @@ -112,6 +113,19 @@ impl<T> SingleOrVec<T> {
matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty())
}

fn vectorize(&mut self) -> &mut Vec<T> {
if let SingleOrVecInner::Single(v) = &self.0 {
unsafe {
let v = core::ptr::read(v);
core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v]))
};
}
let SingleOrVecInner::Vec(v) = &mut self.0 else {
unsafe { core::hint::unreachable_unchecked() }
};
v
}

pub fn get(&self, index: usize) -> Option<&T> {
match &self.0 {
SingleOrVecInner::Single(v) => (index == 0).then_some(v),
Expand Down Expand Up @@ -139,6 +153,55 @@ impl<T> SingleOrVec<T> {
SingleOrVecInner::Vec(v) => v.last_mut(),
}
}
pub fn drain<Range: RangeBounds<usize>>(&mut self, range: Range) -> Drain<T> {
match &mut self.0 {
this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain {
inner: DrainInner::Single(this),
},
SingleOrVecInner::Vec(vec) => Drain {
inner: DrainInner::Vec(vec.drain(range)),
},
_ => Drain {
inner: DrainInner::Done,
},
}
}
pub fn insert(&mut self, at: usize, value: T) {
assert!(at <= self.len());
self.vectorize().insert(at, value);
}
}
enum DrainInner<'a, T> {
Vec(alloc::vec::Drain<'a, T>),
Single(&'a mut SingleOrVecInner<T>),
Done,
}
pub struct Drain<'a, T> {
inner: DrainInner<'a, T>,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
DrainInner::Vec(drain) => drain.next(),
DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } {
SingleOrVecInner::Single(value) => unsafe {
core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new()));
Some(value)
},
SingleOrVecInner::Vec(_) => None,
},
_ => None,
}
}
}
impl<'a, T> Drop for Drain<'a, T> {
fn drop(&mut self) {
if let DrainInner::Single(_) = self.inner {
self.next();
}
}
}

impl<T> Default for SingleOrVec<T> {
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ pub mod ext {
}
}

/// 7 6 5 4 3 2 1 0
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// % num elems %
/// +-------+-+-+---+
Expand All @@ -266,6 +267,7 @@ pub mod ext {
/// ~ val: <u8;z32> ~
/// +---------------+
/// ... -- N times (key, value) tuples
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttachmentType<const ID: u8> {
pub buffer: ZBuf,
Expand Down
22 changes: 18 additions & 4 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value) = parse_args();
let (config, key_expr, value, attachment) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -35,7 +35,16 @@ async fn main() {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
publisher.put(buf).res().await.unwrap();
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.with_attachment(
attachment
.split('&')
.map(|pair| pair.as_bytes().split_at(pair.find('=').unwrap_or(0)))
.collect(),
)
}
put.res().await.unwrap();
}
}

Expand All @@ -47,11 +56,16 @@ struct Args {
#[arg(short, long, default_value = "Pub from Rust!")]
/// The value to write.
value: String,
#[arg(short, long)]
/// The attachments to add to each put.
///
/// The key-value pairs are &-separated, and = serves as the separator between key and value.
attach: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String) {
fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.value)
(args.common.into(), args.key, args.value, args.attach)
}
Loading

0 comments on commit 678cc57

Please sign in to comment.