Skip to content

Commit

Permalink
unfinished
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 31, 2024
1 parent 0776580 commit 7afbc0d
Show file tree
Hide file tree
Showing 23 changed files with 422 additions and 359 deletions.
6 changes: 0 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 13 additions & 7 deletions zenoh-ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ unstable = []
default = []

[dependencies]
tokio = { workspace = true, features = ["rt", "sync", "time", "macros", "io-std"] }
tokio = { workspace = true, features = [
"rt",
"sync",
"time",
"macros",
"io-std",
] }
bincode = { workspace = true }
env_logger = { workspace = true }
flume = { workspace = true }
Expand All @@ -42,12 +48,12 @@ serde = { workspace = true, features = ["default"] }
serde_cbor = { workspace = true }
serde_json = { workspace = true }
zenoh = { workspace = true, features = ["unstable"], default-features = false }
zenoh-core = { workspace = true }
zenoh-macros = { workspace = true }
zenoh-result = { workspace = true }
zenoh-sync = { workspace = true }
zenoh-util = { workspace = true }
zenoh-runtime = { workspace = true }
# zenoh-core = { workspace = true }
# zenoh-macros = { workspace = true }
# zenoh-result = { workspace = true }
# zenoh-sync = { workspace = true }
# zenoh-util = { workspace = true }
# zenoh-runtime = { workspace = true }

[dev-dependencies]
clap = { workspace = true, features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod handlers;
mod info;
pub mod key_expr;
#[cfg(feature = "unstable")]
mod liveliness;
pub mod liveliness;
pub mod payload;
pub mod publication;
pub mod query;
Expand Down
3 changes: 2 additions & 1 deletion zenoh/src/api/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use crate::api::{
key_expr::{KeyExpr, KeyExprInner},
payload::Payload,
sample::{
attachment::Attachment,
builder::{QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait},
Attachment, DataInfo, Locality, QoS, Sample, SampleFields, SampleKind, SourceInfo,
DataInfo, Locality, QoS, Sample, SampleFields, SampleKind, SourceInfo,
},
session::{SessionRef, Undeclarable},
value::Value,
Expand Down
3 changes: 2 additions & 1 deletion zenoh/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use crate::api::{
payload::Payload,
publication::Priority,
sample::{
attachment::Attachment,
builder::{QoSBuilderTrait, SampleBuilderTrait, ValueBuilderTrait},
Attachment, Locality, QoSBuilder, Sample, SourceInfo,
Locality, QoSBuilder, Sample, SourceInfo,
},
selector::Selector,
session::Session,
Expand Down
3 changes: 2 additions & 1 deletion zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use crate::api::{
publication::Priority,
query::ReplyKeyExpr,
sample::{
attachment::Attachment,
builder::{
DeleteSampleBuilder, PutSampleBuilder, QoSBuilderTrait, SampleBuilder,
SampleBuilderTrait, TimestampBuilderTrait, ValueBuilderTrait,
},
Attachment, Locality, Sample, SampleKind, SourceInfo,
Locality, Sample, SampleKind, SourceInfo,
},
selector::{Parameters, Selector},
session::{SessionRef, Undeclarable},
Expand Down
231 changes: 3 additions & 228 deletions zenoh/src/api/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ use zenoh_protocol::core::EntityGlobalId;
use zenoh_protocol::network::declare::ext::QoSType;
use zenoh_protocol::{core::CongestionControl, zenoh};

use self::attachment::Attachment;

pub mod attachment;
pub mod builder;

pub type SourceSn = u64;
Expand Down Expand Up @@ -205,231 +208,6 @@ impl From<Option<DataInfo>> for SourceInfo {
}
}

mod attachment {
#[zenoh_macros::unstable]
use zenoh_buffers::{
reader::{HasReader, Reader},
writer::HasWriter,
ZBuf, ZBufReader, ZSlice,
};
#[zenoh_macros::unstable]
use zenoh_codec::{RCodec, WCodec, Zenoh080};
#[zenoh_macros::unstable]
use zenoh_protocol::zenoh::ext::AttachmentType;

/// A builder for [`Attachment`]
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct AttachmentBuilder {
pub(crate) inner: Vec<u8>,
}
#[zenoh_macros::unstable]
impl Default for AttachmentBuilder {
fn default() -> Self {
Self::new()
}
}
#[zenoh_macros::unstable]
impl AttachmentBuilder {
pub fn new() -> Self {
Self { inner: Vec::new() }
}
fn _insert(&mut self, key: &[u8], value: &[u8]) {
let codec = Zenoh080;
let mut writer = self.inner.writer();
codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure
codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure
}
/// Inserts a key-value pair to the attachment.
///
/// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key.
pub fn insert<Key: AsRef<[u8]> + ?Sized, Value: AsRef<[u8]> + ?Sized>(
&mut self,
key: &Key,
value: &Value,
) {
self._insert(key.as_ref(), value.as_ref())
}
pub fn build(self) -> Attachment {
Attachment {
inner: self.inner.into(),
}
}
}
#[zenoh_macros::unstable]
impl From<AttachmentBuilder> for Attachment {
fn from(value: AttachmentBuilder) -> Self {
Attachment {
inner: value.inner.into(),
}
}
}
#[zenoh_macros::unstable]
impl From<AttachmentBuilder> for Option<Attachment> {
fn from(value: AttachmentBuilder) -> Self {
if value.inner.is_empty() {
None
} else {
Some(value.into())
}
}
}

#[zenoh_macros::unstable]
#[derive(Clone)]
pub struct Attachment {
pub(crate) inner: ZBuf,
}
#[zenoh_macros::unstable]
impl Default for Attachment {
fn default() -> Self {
Self::new()
}
}
#[zenoh_macros::unstable]
impl<const ID: u8> From<Attachment> for AttachmentType<ID> {
fn from(this: Attachment) -> Self {
AttachmentType { buffer: this.inner }
}
}
#[zenoh_macros::unstable]
impl<const ID: u8> From<AttachmentType<ID>> for Attachment {
fn from(this: AttachmentType<ID>) -> Self {
Attachment { inner: this.buffer }
}
}
#[zenoh_macros::unstable]
impl Attachment {
pub fn new() -> Self {
Self {
inner: ZBuf::empty(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
self.iter().count()
}
pub fn iter(&self) -> AttachmentIterator {
self.into_iter()
}
fn _get(&self, key: &[u8]) -> Option<ZSlice> {
self.iter()
.find_map(|(k, v)| (k.as_slice() == key).then_some(v))
}
pub fn get<Key: AsRef<[u8]>>(&self, key: &Key) -> Option<ZSlice> {
self._get(key.as_ref())
}
fn _insert(&mut self, key: &[u8], value: &[u8]) {
let codec = Zenoh080;
let mut writer = self.inner.writer();
codec.write(&mut writer, key).unwrap(); // Infallible, barring alloc failure
codec.write(&mut writer, value).unwrap(); // Infallible, barring alloc failure
}
/// Inserts a key-value pair to the attachment.
///
/// Note that [`Attachment`] is a list of non-unique key-value pairs: inserting at the same key multiple times leads to both values being transmitted for that key.
///
/// [`Attachment`] is not very efficient at inserting, so if you wish to perform multiple inserts, it's generally better to [`Attachment::extend`] after performing the inserts on an [`AttachmentBuilder`]
pub fn insert<Key: AsRef<[u8]> + ?Sized, Value: AsRef<[u8]> + ?Sized>(
&mut self,
key: &Key,
value: &Value,
) {
self._insert(key.as_ref(), value.as_ref())
}
fn _extend(&mut self, with: Self) -> &mut Self {
for slice in with.inner.zslices().cloned() {
self.inner.push_zslice(slice);
}
self
}
pub fn extend(&mut self, with: impl Into<Self>) -> &mut Self {
let with = with.into();
self._extend(with)
}
}
#[zenoh_macros::unstable]
pub struct AttachmentIterator<'a> {
reader: ZBufReader<'a>,
}
#[zenoh_macros::unstable]
impl<'a> core::iter::IntoIterator for &'a Attachment {
type Item = (ZSlice, ZSlice);
type IntoIter = AttachmentIterator<'a>;
fn into_iter(self) -> Self::IntoIter {
AttachmentIterator {
reader: self.inner.reader(),
}
}
}
#[zenoh_macros::unstable]
impl core::fmt::Debug for Attachment {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{")?;
for (key, value) in self {
let key = key.as_slice();
let value = value.as_slice();
match core::str::from_utf8(key) {
Ok(key) => write!(f, "\"{key}\": ")?,
Err(_) => {
write!(f, "0x")?;
for byte in key {
write!(f, "{byte:02X}")?
}
}
}
match core::str::from_utf8(value) {
Ok(value) => write!(f, "\"{value}\", ")?,
Err(_) => {
write!(f, "0x")?;
for byte in value {
write!(f, "{byte:02X}")?
}
write!(f, ", ")?
}
}
}
write!(f, "}}")
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::Iterator for AttachmentIterator<'a> {
type Item = (ZSlice, ZSlice);
fn next(&mut self) -> Option<Self::Item> {
let key = Zenoh080.read(&mut self.reader).ok()?;
let value = Zenoh080.read(&mut self.reader).ok()?;
Some((key, value))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(
(self.reader.remaining() != 0) as usize,
Some(self.reader.remaining() / 2),
)
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for AttachmentBuilder {
fn from_iter<T: IntoIterator<Item = (&'a [u8], &'a [u8])>>(iter: T) -> Self {
let codec = Zenoh080;
let mut buffer: Vec<u8> = Vec::new();
let mut writer = buffer.writer();
for (key, value) in iter {
codec.write(&mut writer, key).unwrap(); // Infallible, barring allocation failures
codec.write(&mut writer, value).unwrap(); // Infallible, barring allocation failures
}
Self { inner: buffer }
}
}
#[zenoh_macros::unstable]
impl<'a> core::iter::FromIterator<(&'a [u8], &'a [u8])> for Attachment {
fn from_iter<T: IntoIterator<Item = (&'a [u8], &'a [u8])>>(iter: T) -> Self {
AttachmentBuilder::from_iter(iter).into()
}
}
}

/// The kind of a `Sample`.
#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -461,9 +239,6 @@ impl TryFrom<u64> for SampleKind {
}
}

#[zenoh_macros::unstable]
pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator};

/// Structure with public fields for sample. It's convenient if it's necessary to decompose a sample into its fields.
pub struct SampleFields {
pub key_expr: KeyExpr<'static>,
Expand Down
Loading

0 comments on commit 7afbc0d

Please sign in to comment.