Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attachment api #613

Merged
merged 9 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 94 additions & 4 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
ZSlice,
};
use alloc::{sync::Arc, vec::Vec};
use core::{cmp, iter, mem, num::NonZeroUsize, ptr, slice};
use core::{cmp, iter, mem, num::NonZeroUsize, ops::RangeBounds, ptr};
use zenoh_collections::SingleOrVec;

fn get_mut_unchecked<T>(arc: &mut Arc<T>) -> &mut T {
Expand Down Expand Up @@ -55,6 +55,85 @@ impl ZBuf {
self.slices.push(zslice);
}
}

pub fn splice<Range: RangeBounds<usize>>(&mut self, erased: Range, replacement: &[u8]) {
let start = match erased.start_bound() {
core::ops::Bound::Included(n) => *n,
core::ops::Bound::Excluded(n) => n + 1,
core::ops::Bound::Unbounded => 0,
};
let end = match erased.end_bound() {
core::ops::Bound::Included(n) => n + 1,
core::ops::Bound::Excluded(n) => *n,
core::ops::Bound::Unbounded => self.len(),
};
if start != end {
self.remove(start, end);
}
self.insert(start, replacement);
}
fn remove(&mut self, mut start: usize, mut end: usize) {
assert!(start <= end);
assert!(end <= self.len());
let mut start_slice_idx = 0;
let mut start_idx_in_start_slice = 0;
let mut end_slice_idx = 0;
let mut end_idx_in_end_slice = 0;
for (i, slice) in self.slices.as_mut().iter_mut().enumerate() {
if slice.len() > start {
start_slice_idx = i;
start_idx_in_start_slice = start;
}
if slice.len() >= end {
end_slice_idx = i;
end_idx_in_end_slice = end;
break;
}
start -= slice.len();
end -= slice.len();
}
let start_slice = &mut self.slices.as_mut()[start_slice_idx];
start_slice.end = start_slice.start + start_idx_in_start_slice;
let drain_start = start_slice_idx + (start_slice.start < start_slice.end) as usize;
let end_slice = &mut self.slices.as_mut()[end_slice_idx];
end_slice.start += end_idx_in_end_slice;
let drain_end = end_slice_idx + (end_slice.start >= end_slice.end) as usize;
self.slices.drain(drain_start..drain_end);
}
fn insert(&mut self, mut at: usize, slice: &[u8]) {
if slice.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to check if at less than buffer size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, but computing len here is a bit expensive... that check can be done by checking for slice_index == usize::MAX a bit later, adding to the todo list

return;
}
let old_at = at;
let mut slice_index = usize::MAX;
for (i, slice) in self.slices.as_ref().iter().enumerate() {
if at < slice.len() {
slice_index = i;
break;
}
if let Some(new_at) = at.checked_sub(slice.len()) {
at = new_at
} else {
panic!(
"Out of bounds insert attempted: at={old_at}, len={}",
self.len()
)
}
}
if at != 0 {
let split = &self.slices.as_ref()[slice_index];
let (l, r) = (
split.subslice(0, at).unwrap(),
split.subslice(at, split.len()).unwrap(),
);
self.slices.drain(slice_index..(slice_index + 1));
self.slices.insert(slice_index, l);
self.slices.insert(slice_index + 1, Vec::from(slice).into());
self.slices.insert(slice_index + 2, r);
} else {
self.slices.insert(slice_index, Vec::from(slice).into())
}
}
}

// Buffer
Expand All @@ -70,7 +149,7 @@ impl Buffer for ZBuf {

// SplitBuffer
impl SplitBuffer for ZBuf {
type Slices<'a> = iter::Map<slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;
type Slices<'a> = iter::Map<core::slice::Iter<'a, ZSlice>, fn(&'a ZSlice) -> &'a [u8]>;

fn slices(&self) -> Self::Slices<'_> {
self.slices.as_ref().iter().map(ZSlice::as_slice)
Expand Down Expand Up @@ -380,9 +459,20 @@ impl<'a> HasWriter for &'a mut ZBuf {
type Writer = ZBufWriter<'a>;

fn writer(self) -> Self::Writer {
let mut cache = None;
if let Some(ZSlice { buf, end, .. }) = self.slices.last_mut() {
// Verify the ZSlice is actually a Vec<u8>
if let Some(b) = buf.as_any().downcast_ref::<Vec<u8>>() {
// Check for the length
if *end == b.len() {
cache = Some(unsafe { Arc::from_raw(Arc::into_raw(buf.clone()).cast()) })
}
}
}

ZBufWriter {
inner: self,
cache: Arc::new(Vec::new()),
cache: cache.unwrap_or_else(|| Arc::new(Vec::new())),
}
}
}
Expand Down Expand Up @@ -433,7 +523,7 @@ impl Writer for ZBufWriter<'_> {
}

fn write_u8(&mut self, byte: u8) -> Result<(), DidntWrite> {
self.write_exact(slice::from_ref(&byte))
self.write_exact(core::slice::from_ref(&byte))
}

fn remaining(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &NetworkMessage) -> Self::Output {
let NetworkMessage { body } = x;
let NetworkMessage { body, .. } = x;

match body {
NetworkBody::Push(b) => self.write(&mut *writer, b),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/scouting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &ScoutingMessage) -> Self::Output {
let ScoutingMessage { body } = x;
let ScoutingMessage { body, .. } = x;

match body {
ScoutingBody::Scout(s) => self.write(&mut *writer, s),
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-codec/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &TransportMessage) -> Self::Output {
let TransportMessage { body } = x;
let TransportMessage { body, .. } = x;

match body {
TransportBody::Frame(b) => self.write(&mut *writer, b),
Expand Down
65 changes: 64 additions & 1 deletion commons/zenoh-collections/src/single_or_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use alloc::{vec, vec::Vec};
use core::{
cmp::PartialEq,
fmt, iter,
ops::{Index, IndexMut},
ops::{Index, IndexMut, RangeBounds},
ptr, slice,
};

Expand Down Expand Up @@ -112,6 +113,19 @@ impl<T> SingleOrVec<T> {
matches!(&self.0, SingleOrVecInner::Vec(v) if v.is_empty())
}

fn vectorize(&mut self) -> &mut Vec<T> {
if let SingleOrVecInner::Single(v) = &self.0 {
unsafe {
let v = core::ptr::read(v);
core::ptr::write(&mut self.0, SingleOrVecInner::Vec(vec![v]))
};
}
let SingleOrVecInner::Vec(v) = &mut self.0 else {
unsafe { core::hint::unreachable_unchecked() }
};
v
}

pub fn get(&self, index: usize) -> Option<&T> {
match &self.0 {
SingleOrVecInner::Single(v) => (index == 0).then_some(v),
Expand Down Expand Up @@ -139,6 +153,55 @@ impl<T> SingleOrVec<T> {
SingleOrVecInner::Vec(v) => v.last_mut(),
}
}
pub fn drain<Range: RangeBounds<usize>>(&mut self, range: Range) -> Drain<T> {
match &mut self.0 {
this @ SingleOrVecInner::Single(_) if range.contains(&0) => Drain {
inner: DrainInner::Single(this),
},
SingleOrVecInner::Vec(vec) => Drain {
inner: DrainInner::Vec(vec.drain(range)),
},
_ => Drain {
inner: DrainInner::Done,
},
}
}
pub fn insert(&mut self, at: usize, value: T) {
assert!(at <= self.len());
self.vectorize().insert(at, value);
}
}
enum DrainInner<'a, T> {
Vec(alloc::vec::Drain<'a, T>),
Single(&'a mut SingleOrVecInner<T>),
Done,
}
pub struct Drain<'a, T> {
inner: DrainInner<'a, T>,
}
impl<'a, T> Iterator for Drain<'a, T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
match &mut self.inner {
DrainInner::Vec(drain) => drain.next(),
DrainInner::Single(inner) => match unsafe { core::ptr::read(*inner) } {
SingleOrVecInner::Single(value) => unsafe {
core::ptr::write(*inner, SingleOrVecInner::Vec(Vec::new()));
Some(value)
},
SingleOrVecInner::Vec(_) => None,
},
_ => None,
}
}
}
impl<'a, T> Drop for Drain<'a, T> {
fn drop(&mut self) {
if let DrainInner::Single(_) = self.inner {
self.next();
}
}
}

impl<T> Default for SingleOrVec<T> {
Expand Down
4 changes: 3 additions & 1 deletion commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ pub mod ext {
}
}

/// 7 6 5 4 3 2 1 0
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// % num elems %
/// +-------+-+-+---+
Expand All @@ -266,6 +267,7 @@ pub mod ext {
/// ~ val: <u8;z32> ~
/// +---------------+
/// ... -- N times (key, value) tuples
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttachmentType<const ID: u8> {
pub buffer: ZBuf,
Expand Down
22 changes: 18 additions & 4 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value) = parse_args();
let (config, key_expr, value, attachment) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -35,7 +35,16 @@ async fn main() {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Putting Data ('{}': '{}')...", &key_expr, buf);
publisher.put(buf).res().await.unwrap();
let mut put = publisher.put(buf);
if let Some(attachment) = &attachment {
put = put.with_attachment(
attachment
.split('&')
.map(|pair| pair.as_bytes().split_at(pair.find('=').unwrap_or(0)))
.collect(),
)
}
put.res().await.unwrap();
}
}

Expand All @@ -47,11 +56,16 @@ struct Args {
#[arg(short, long, default_value = "Pub from Rust!")]
/// The value to write.
value: String,
#[arg(short, long)]
/// The attachments to add to each put.
///
/// The key-value pairs are &-separated, and = serves as the separator between key and value.
attach: Option<String>,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>, String) {
fn parse_args() -> (Config, KeyExpr<'static>, String, Option<String>) {
let args = Args::parse();
(args.common.into(), args.key, args.value)
(args.common.into(), args.key, args.value, args.attach)
}
Loading
Loading