Skip to content

Commit

Permalink
feat!: improve serialization efficiency
Browse files Browse the repository at this point in the history
Fixed-size objects are now written directly on the writer,
instead of being wrapped first into a ZBytes.
Slice like objects are also written on the writer,
but obviously prefixed by their size, like ZBytes.
  • Loading branch information
wyfo committed Sep 20, 2024
1 parent 3b6d773 commit a8a4f14
Show file tree
Hide file tree
Showing 46 changed files with 1,031 additions and 2,833 deletions.
39 changes: 1 addition & 38 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ secrecy = { version = "0.8.0", features = ["serde", "alloc"] }
serde = { version = "1.0.210", default-features = false, features = [
"derive",
] } # Default features are disabled due to usage in no_std crates
serde_cbor = "0.11.2"
serde_json = "1.0.128"
serde-pickle = "1.1.1"
serde_yaml = "0.9.34"
static_init = "1.0.3"
stabby = "36.1.1"
Expand Down
2 changes: 1 addition & 1 deletion ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() {
sample.key_expr().as_str(),
sample
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e))
);
})
Expand Down
4 changes: 2 additions & 2 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ async fn main() {
sample.key_expr().as_str(),
sample
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e))
),
Err(err) => println!(
">> Received (ERROR: '{}')",
err.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e))
),
}
Expand Down
5 changes: 5 additions & 0 deletions commons/zenoh-buffers/src/zbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ impl<'a> ZBufWriter<'a> {
self.zslice_writer = zbuf.slices.last_mut().unwrap().writer();
self.zslice_writer.as_mut().unwrap()
}

#[inline]
pub fn reserve(&mut self, additional: usize) {
self.zslice_writer().reserve(additional)
}
}

impl<'a> HasWriter for &'a mut ZBuf {
Expand Down
9 changes: 8 additions & 1 deletion commons/zenoh-buffers/src/zslice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ impl ZSlice {
// See https://github.com/eclipse-zenoh/zenoh/pull/1289#discussion_r1701796640
#[inline]
pub(crate) fn writer(&mut self) -> Option<ZSliceWriter> {
const MAX_REALLOC_SIZE: usize = 64;
let vec = Arc::get_mut(&mut self.buf)?
.as_any_mut()
.downcast_mut::<Vec<u8>>()?;
if self.end == vec.len() {
if self.end == vec.len() && (vec.len() < vec.capacity() || vec.len() <= MAX_REALLOC_SIZE) {
Some(ZSliceWriter {
vec,
end: &mut self.end,
Expand Down Expand Up @@ -294,6 +295,12 @@ pub(crate) struct ZSliceWriter<'a> {
end: &'a mut usize,
}

impl ZSliceWriter<'_> {
pub(crate) fn reserve(&mut self, additional: usize) {
self.vec.reserve(additional)
}
}

impl Writer for ZSliceWriter<'_> {
fn write(&mut self, bytes: &[u8]) -> Result<NonZeroUsize, DidntWrite> {
let len = self.vec.write(bytes)?;
Expand Down
5 changes: 5 additions & 0 deletions commons/zenoh-protocol/src/core/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ impl<'s> From<Cow<'s, str>> for Parameters<'s> {
}
}
}
impl<'s> From<Parameters<'s>> for Cow<'s, str> {
fn from(value: Parameters<'s>) -> Self {
value.0
}
}

impl<'a> From<Parameters<'a>> for Cow<'_, Parameters<'a>> {
fn from(props: Parameters<'a>) -> Self {
Expand Down
26 changes: 13 additions & 13 deletions examples/examples/z_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ fn main() {
// Numeric: u8, u16, u32, u128, usize, i8, i16, i32, i128, isize, f32, f64
let input = 1234_u32;
let payload = ZBytes::from(input);
let output: u32 = payload.deserialize().unwrap();
let output: u32 = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::ZENOH_UINT32;

// String
let input = String::from("test");
let payload = ZBytes::from(&input);
let output: String = payload.deserialize().unwrap();
let output: String = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::ZENOH_STRING;
Expand All @@ -38,18 +38,18 @@ fn main() {
// See [`zenoh::bytes::ZBytes`] documentation for zero-copy behaviour.
let input = Cow::from("test");
let payload = ZBytes::from(&input);
let output: Cow<str> = payload.deserialize().unwrap();
let output: Cow<str> = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::ZENOH_STRING;

// Vec<u8>: The deserialization should be infallible
let input: Vec<u8> = vec![1, 2, 3, 4];
let payload = ZBytes::from(&input);
let output: Vec<u8> = payload.deserialize().unwrap();
let output: Vec<u8> = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Deserialization of Vec<u8> is infallible. See https://docs.rs/unwrap-infallible/latest/unwrap_infallible/.
let output: Vec<u8> = payload.deserialize().unwrap_infallible();
let output: Vec<u8> = payload.try_deserialize().unwrap_infallible();
assert_eq!(input, output);
// Since the deserialization of `Vec<u8>` is infallible, then `ZBytes` can be infallibly converted into a `Vec<u8>`.
let output: Vec<u8> = payload.into();
Expand All @@ -61,10 +61,10 @@ fn main() {
// See [`zenoh::bytes::ZBytes`] documentation for zero-copy behaviour.
let input = Cow::from(vec![1, 2, 3, 4]);
let payload = ZBytes::from(&input);
let output: Cow<[u8]> = payload.deserialize().unwrap();
let output: Cow<[u8]> = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Deserialization of `Cow<[u8]>` is infallible. See https://docs.rs/unwrap-infallible/latest/unwrap_infallible/.
let output: Cow<[u8]> = payload.deserialize().unwrap_infallible();
let output: Cow<[u8]> = payload.try_deserialize().unwrap_infallible();
assert_eq!(input, output);
// Since the deserialization of `Cow<[u8]>` is infallible, then `ZBytes` can be infallibly converted into a `Cow<[u8]>`.
let output: Vec<u8> = payload.into();
Expand All @@ -74,7 +74,7 @@ fn main() {

// Writer & Reader
// serialization
let mut bytes = ZBytes::empty();
let mut bytes = ZBytes::new();
let mut writer = bytes.writer();
let i1 = 1234_u32;
let i2 = String::from("test");
Expand All @@ -94,7 +94,7 @@ fn main() {
// Tuple
let input = (1234_u32, String::from("test"));
let payload = ZBytes::serialize(input.clone());
let output: (u32, String) = payload.deserialize().unwrap();
let output: (u32, String) = payload.try_deserialize().unwrap();
assert_eq!(input, output);

// Iterator
Expand All @@ -116,7 +116,7 @@ fn main() {
input.insert(0, String::from("abc"));
input.insert(1, String::from("def"));
let payload = ZBytes::from(input.clone());
let output = payload.deserialize::<HashMap<usize, String>>().unwrap();
let output = payload.try_deserialize::<HashMap<usize, String>>().unwrap();
assert_eq!(input, output);

// JSON
Expand All @@ -131,7 +131,7 @@ fn main() {
}"#;
let input: serde_json::Value = serde_json::from_str(data).unwrap();
let payload = ZBytes::try_serialize(input.clone()).unwrap();
let output: serde_json::Value = payload.deserialize().unwrap();
let output: serde_json::Value = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::APPLICATION_JSON;
Expand All @@ -146,7 +146,7 @@ fn main() {
"#;
let input: serde_yaml::Value = serde_yaml::from_str(data).unwrap();
let payload = ZBytes::try_serialize(input.clone()).unwrap();
let output: serde_yaml::Value = payload.deserialize().unwrap();
let output: serde_yaml::Value = payload.try_deserialize().unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::APPLICATION_YAML;
Expand All @@ -166,7 +166,7 @@ fn main() {
};
let payload = ZBytes::from(input.encode_to_vec());
let output =
EntityInfo::decode(Cursor::new(payload.deserialize::<Cow<[u8]>>().unwrap())).unwrap();
EntityInfo::decode(Cursor::new(payload.try_deserialize::<Cow<[u8]>>().unwrap())).unwrap();
assert_eq!(input, output);
// Corresponding encoding to be used in operations like `.put()`, `.reply()`, etc.
// let encoding = Encoding::APPLICATION_PROTOBUF;
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_bytes_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
// branch to illustrate immutable access to SHM data
{
// deserialize ZBytes as an immutably borrowed zshm (ZBytes -> &zshm)
let borrowed_shm_buf: &zshm = payload.deserialize().unwrap();
let borrowed_shm_buf: &zshm = payload.try_deserialize().unwrap();

// immutable API
let _data: &[u8] = borrowed_shm_buf;
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async fn main() {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> Received ('{}': '{}')",
Expand All @@ -60,7 +60,7 @@ async fn main() {
Err(err) => {
let payload = err
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_get_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() {
Err(err) => {
let payload = err
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_get_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ async fn main() {
match reply.result() {
Ok(sample) => {
print!(">> Received ('{}': ", sample.key_expr().as_str());
match sample.payload().deserialize::<&zshm>() {
match sample.payload().try_deserialize::<&zshm>() {
Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),),
Err(e) => println!("'Not a ShmBufInner: {:?}')", e),
}
}
Err(err) => {
let payload = err
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(">> Received (ERROR: '{}')", payload);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
Ok(sample) => {
let payload = sample
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Pulled {} ('{}': '{}')... performing a computation of {:#?}",
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() {
Some(query_payload) => {
// Refer to z_bytes.rs to see how to deserialize different types of message
let deserialized_payload = query_payload
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Queryable ] Received Query '{}' with payload '{}'",
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_queryable_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn handle_bytes(bytes: &ZBytes) -> (&str, String) {

// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type
#[cfg(all(feature = "shared-memory", feature = "unstable"))]
match bytes.deserialize::<&zshm>() {
match bytes.try_deserialize::<&zshm>() {
Ok(_) => "SHM",
Err(_) => "RAW",
}
Expand All @@ -157,7 +157,7 @@ fn handle_bytes(bytes: &ZBytes) -> (&str, String) {
//
// Refer to z_bytes.rs to see how to deserialize different types of message
let bytes_string = bytes
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

(bytes_type, bytes_string)
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn main() {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
let payload = sample.payload().deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
let payload = sample.payload().try_deserialize::<String>().unwrap_or_else(|e| format!("{}", e));
println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind(), sample.key_expr().as_str(),payload);
match sample.kind() {
SampleKind::Delete => stored.remove(&sample.key_expr().to_string()),
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn main() {
// Refer to z_bytes.rs to see how to deserialize different types of message
let payload = sample
.payload()
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));

print!(
Expand All @@ -44,7 +44,7 @@ async fn main() {
);
if let Some(att) = sample.attachment() {
let att = att
.deserialize::<String>()
.try_deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
print!(" ({})", att);
}
Expand Down
Loading

0 comments on commit a8a4f14

Please sign in to comment.