From 8b627fa53bc31545560593e841df12134ca58477 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 13 Dec 2023 22:42:22 +0800 Subject: [PATCH 1/5] Add functions to_serialized_bytes and from_serialized_bytes --- r2r/src/error.rs | 2 +- r2r/src/msg_types.rs | 143 +++++++++++++++++++++++++++++++++++++++++++ r2r_rcl/build.rs | 1 + 3 files changed, 145 insertions(+), 1 deletion(-) diff --git a/r2r/src/error.rs b/r2r/src/error.rs index 649b8fdf2..178baa6ea 100644 --- a/r2r/src/error.rs +++ b/r2r/src/error.rs @@ -11,7 +11,7 @@ pub type Result = std::result::Result; /// These values are mostly copied straight from the RCL headers, but /// some are specific to r2r, such as `GoalCancelRejected` which does /// not have an analogue in the rcl. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum Error { #[error("RCL_RET_OK")] RCL_RET_OK, diff --git a/r2r/src/msg_types.rs b/r2r/src/msg_types.rs index 49b8a639d..ed6802167 100644 --- a/r2r/src/msg_types.rs +++ b/r2r/src/msg_types.rs @@ -2,12 +2,14 @@ use crate::error::*; use r2r_msg_gen::*; use r2r_rcl::{ rosidl_action_type_support_t, rosidl_message_type_support_t, rosidl_service_type_support_t, + rcl_serialized_message_t, }; use serde::{Deserialize, Serialize}; use std::boxed::Box; use std::convert::TryInto; use std::fmt::Debug; use std::ops::{Deref, DerefMut}; +use std::cell::RefCell; pub mod generated_msgs { #![allow(clippy::all)] @@ -32,6 +34,30 @@ pub(crate) fn uuid_msg_to_uuid(msg: &unique_identifier_msgs::msg::UUID) -> uuid: uuid::Uuid::from_bytes(bytes) } +// TODO where is the best place for this? +thread_local! { + pub static SERIALIZED_MESSAGE_CACHE: Result> = { + use r2r_rcl::*; + + let mut msg_buf: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() }; + + let ret = unsafe { + rcutils_uint8_array_init( + &mut msg_buf as *mut rcl_serialized_message_t, + 0, + &rcutils_get_default_allocator(), + ) + }; + + if ret != RCL_RET_OK as i32 { + Err(Error::from_rcl_error(ret)) + } else { + Ok(RefCell::new(msg_buf)) + } + + }; +} + pub trait WrappedTypesupport: Serialize + for<'de> Deserialize<'de> + Default + Debug + Clone { @@ -42,6 +68,84 @@ pub trait WrappedTypesupport: fn destroy_msg(msg: *mut Self::CStruct); fn from_native(msg: &Self::CStruct) -> Self; fn copy_to_native(&self, msg: &mut Self::CStruct); + + fn to_serialized_bytes(&self) -> Result> { + use r2r_rcl::*; + + let msg = Self::create_msg(); + + self.copy_to_native(unsafe { msg.as_mut().expect("not null") }); + + // let mut msg_buf: rcl_serialized_message_t = + // unsafe { rcutils_get_zero_initialized_uint8_array() }; + + // let ret = unsafe { + // rcutils_uint8_array_init( + // &mut msg_buf as *mut rcl_serialized_message_t, + // 0, + // &rcutils_get_default_allocator(), + // ) + // }; + + // if ret != RCL_RET_OK as i32 { + // return Err(Error::from_rcl_error(ret)); + // } + + SERIALIZED_MESSAGE_CACHE.with(|msg_buf| { + + let msg_buf: &mut rcl_serialized_message_t = &mut *msg_buf.as_ref().map_err(|err| err.clone())?.borrow_mut(); + + let result = unsafe { + rmw_serialize( + msg as *const ::std::os::raw::c_void, + Self::get_ts(), + msg_buf as *mut rcl_serialized_message_t, + ) + }; + + let data_bytes = unsafe { + std::slice::from_raw_parts(msg_buf.buffer, msg_buf.buffer_length).to_vec() + }; + + if result == RCL_RET_OK as i32 { + Ok(data_bytes) + } else { + Err(Error::from_rcl_error(result)) + } + }) + } + + fn from_serialized_bytes(data: &[u8]) -> Result { + use r2r_rcl::*; + + let msg = Self::create_msg(); + + let msg_buf = rcl_serialized_message_t { + buffer: data.as_ptr() as *mut u8, + buffer_length: data.len(), + buffer_capacity: data.len(), + + // Since its read only, this should never be used .. + allocator: unsafe { rcutils_get_default_allocator() } + }; + + // Note From the docs of rmw_deserialize, its not clear whether this reuses + // any part of msg_buf. However it shouldn't matter since from_native + // clones everything again anyway .. + let result = unsafe { + rmw_deserialize( + &msg_buf as *const rcl_serialized_message_t, + Self::get_ts(), + msg as *mut std::os::raw::c_void, + ) + }; + + if result == RCL_RET_OK as i32 { + Ok(Self::from_native(unsafe{ msg.as_ref().expect("not null") })) + } else { + Err(Error::from_rcl_error(result)) + } + } } pub trait WrappedServiceTypeSupport: Debug + Clone { @@ -605,6 +709,45 @@ mod tests { assert!(native.void_ptr() == borrowed_msg as *mut core::ffi::c_void); } + #[test] + fn test_serialization_fixed_size() { + let message = std_msgs::msg::Int32 { data: 10}; + + let bytes = message.to_serialized_bytes().unwrap(); + + let message_2 = std_msgs::msg::Int32::from_serialized_bytes(&bytes).unwrap(); + + assert_eq!(message.data, message_2.data); + + let bytes_2 = message_2.to_serialized_bytes().unwrap(); + let bytes_3 = message_2.to_serialized_bytes().unwrap(); + + assert_eq!(bytes, bytes_2); + assert_eq!(bytes, bytes_3); + + } + + #[test] + fn test_serialization_dynamic_size() { + let message = std_msgs::msg::Int32MultiArray { + layout: std_msgs::msg::MultiArrayLayout::default(), + data: vec![10, 20, 30] + }; + + let bytes = message.to_serialized_bytes().unwrap(); + + let message_2 = std_msgs::msg::Int32MultiArray::from_serialized_bytes(&bytes).unwrap(); + + assert_eq!(message.data, message_2.data); + + let bytes_2 = message_2.to_serialized_bytes().unwrap(); + let bytes_3 = message_2.to_serialized_bytes().unwrap(); + + assert_eq!(bytes, bytes_2); + assert_eq!(bytes, bytes_3); + + } + #[cfg(r2r__test_msgs__msg__Defaults)] #[test] fn test_untyped_json_default() { diff --git a/r2r_rcl/build.rs b/r2r_rcl/build.rs index f5ea061a6..50a5621a9 100644 --- a/r2r_rcl/build.rs +++ b/r2r_rcl/build.rs @@ -63,6 +63,7 @@ fn run_dynlink() { println!("cargo:rustc-link-lib=dylib=rcl_yaml_param_parser"); println!("cargo:rustc-link-lib=dylib=rcutils"); println!("cargo:rustc-link-lib=dylib=rmw"); + println!("cargo:rustc-link-lib=dylib=rmw_implementation"); println!("cargo:rustc-link-lib=dylib=rosidl_typesupport_c"); println!("cargo:rustc-link-lib=dylib=rosidl_runtime_c"); } From 2e17b8ed40c42fbfffc0224ed633d7ec656879b0 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 13 Dec 2023 22:44:45 +0800 Subject: [PATCH 2/5] Add comment --- r2r/src/msg_types.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/r2r/src/msg_types.rs b/r2r/src/msg_types.rs index ed6802167..f309d269f 100644 --- a/r2r/src/msg_types.rs +++ b/r2r/src/msg_types.rs @@ -69,6 +69,7 @@ pub trait WrappedTypesupport: fn from_native(msg: &Self::CStruct) -> Self; fn copy_to_native(&self, msg: &mut Self::CStruct); + /// This serializes the message using ROS2 methods. fn to_serialized_bytes(&self) -> Result> { use r2r_rcl::*; @@ -115,6 +116,7 @@ pub trait WrappedTypesupport: }) } + /// This deserializes the message using ROS2 methods. fn from_serialized_bytes(data: &[u8]) -> Result { use r2r_rcl::*; From f5ba556e2e4aff9a2b92dcf53683c4abce502b8b Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 13 Dec 2023 22:51:57 +0800 Subject: [PATCH 3/5] Move location of from_rcl_error to avoid the need for r2r::Error to be Clone --- r2r/src/error.rs | 2 +- r2r/src/msg_types.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/r2r/src/error.rs b/r2r/src/error.rs index 178baa6ea..649b8fdf2 100644 --- a/r2r/src/error.rs +++ b/r2r/src/error.rs @@ -11,7 +11,7 @@ pub type Result = std::result::Result; /// These values are mostly copied straight from the RCL headers, but /// some are specific to r2r, such as `GoalCancelRejected` which does /// not have an analogue in the rcl. -#[derive(Error, Debug, Clone)] +#[derive(Error, Debug)] pub enum Error { #[error("RCL_RET_OK")] RCL_RET_OK, diff --git a/r2r/src/msg_types.rs b/r2r/src/msg_types.rs index f309d269f..831153f73 100644 --- a/r2r/src/msg_types.rs +++ b/r2r/src/msg_types.rs @@ -36,7 +36,7 @@ pub(crate) fn uuid_msg_to_uuid(msg: &unique_identifier_msgs::msg::UUID) -> uuid: // TODO where is the best place for this? thread_local! { - pub static SERIALIZED_MESSAGE_CACHE: Result> = { + pub static SERIALIZED_MESSAGE_CACHE: std::result::Result, i32> = { use r2r_rcl::*; let mut msg_buf: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() }; @@ -50,7 +50,7 @@ thread_local! { }; if ret != RCL_RET_OK as i32 { - Err(Error::from_rcl_error(ret)) + Err(ret) } else { Ok(RefCell::new(msg_buf)) } @@ -94,7 +94,7 @@ pub trait WrappedTypesupport: SERIALIZED_MESSAGE_CACHE.with(|msg_buf| { - let msg_buf: &mut rcl_serialized_message_t = &mut *msg_buf.as_ref().map_err(|err| err.clone())?.borrow_mut(); + let msg_buf: &mut rcl_serialized_message_t = &mut *msg_buf.as_ref().map_err(|err| Error::from_rcl_error(*err))?.borrow_mut(); let result = unsafe { rmw_serialize( From eb1f461943a396d74aadfdfab280889bb201a4c7 Mon Sep 17 00:00:00 2001 From: Michael Hoy Date: Wed, 13 Dec 2023 22:59:16 +0800 Subject: [PATCH 4/5] Add missing descructors --- r2r/src/msg_types.rs | 33 ++++++++++++--------------------- 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/r2r/src/msg_types.rs b/r2r/src/msg_types.rs index 831153f73..989a3f8a6 100644 --- a/r2r/src/msg_types.rs +++ b/r2r/src/msg_types.rs @@ -73,26 +73,10 @@ pub trait WrappedTypesupport: fn to_serialized_bytes(&self) -> Result> { use r2r_rcl::*; - let msg = Self::create_msg(); - - self.copy_to_native(unsafe { msg.as_mut().expect("not null") }); - - // let mut msg_buf: rcl_serialized_message_t = - // unsafe { rcutils_get_zero_initialized_uint8_array() }; - - // let ret = unsafe { - // rcutils_uint8_array_init( - // &mut msg_buf as *mut rcl_serialized_message_t, - // 0, - // &rcutils_get_default_allocator(), - // ) - // }; - - // if ret != RCL_RET_OK as i32 { - // return Err(Error::from_rcl_error(ret)); - // } - SERIALIZED_MESSAGE_CACHE.with(|msg_buf| { + let msg = Self::create_msg(); + + self.copy_to_native(unsafe { msg.as_mut().expect("not null") }); let msg_buf: &mut rcl_serialized_message_t = &mut *msg_buf.as_ref().map_err(|err| Error::from_rcl_error(*err))?.borrow_mut(); @@ -108,6 +92,8 @@ pub trait WrappedTypesupport: std::slice::from_raw_parts(msg_buf.buffer, msg_buf.buffer_length).to_vec() }; + Self::destroy_msg(msg); + if result == RCL_RET_OK as i32 { Ok(data_bytes) } else { @@ -142,11 +128,16 @@ pub trait WrappedTypesupport: ) }; - if result == RCL_RET_OK as i32 { + let ret_val = if result == RCL_RET_OK as i32 { Ok(Self::from_native(unsafe{ msg.as_ref().expect("not null") })) } else { Err(Error::from_rcl_error(result)) - } + }; + + Self::destroy_msg(msg); + + ret_val + } } From fb08e1f94b33849333b61bfd22a31b947d69847e Mon Sep 17 00:00:00 2001 From: Martin Dahl Date: Wed, 13 Dec 2023 17:05:12 +0100 Subject: [PATCH 5/5] Add deserialization benchmark. --- r2r/Cargo.toml | 5 +++ r2r/benches/deserialization.rs | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 r2r/benches/deserialization.rs diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 39baf8ba6..76f9838cb 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -34,6 +34,7 @@ futures = "0.3.25" tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] } rand = "0.8.5" cdr = "0.2.4" +criterion = "0.5.1" [build-dependencies] r2r_common = { path = "../r2r_common", version = "0.8.2" } @@ -51,3 +52,7 @@ doc-only = ["r2r_common/doc-only", "r2r_rcl/doc-only", "r2r_msg_gen/doc-only", " [package.metadata.docs.rs] features = ["doc-only"] + +[[bench]] +name = "deserialization" +harness = false diff --git a/r2r/benches/deserialization.rs b/r2r/benches/deserialization.rs new file mode 100644 index 000000000..f81342b08 --- /dev/null +++ b/r2r/benches/deserialization.rs @@ -0,0 +1,57 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use r2r::*; +use rand::Rng; +use rand::thread_rng; + +const NUM_ELEMENTS: usize = 10_000; +const NUM_TIMES: usize = 1_000; + +fn bench_ros_deserialization() { + let mut rng = thread_rng(); + let mut numbers = Vec::::with_capacity(NUM_ELEMENTS); + + for _ in 0..NUM_ELEMENTS { + numbers.push(rng.gen_range(0..i32::MAX)); + } + let message = std_msgs::msg::Int32MultiArray { + layout: std_msgs::msg::MultiArrayLayout::default(), + data: numbers + }; + + let bytes = message.to_serialized_bytes().unwrap(); + + for _ in 0..NUM_TIMES { + let _ = std_msgs::msg::Int32MultiArray::from_serialized_bytes(&bytes).unwrap(); + } +} + +fn bench_cdr_deserialization() { + let mut rng = thread_rng(); + let mut numbers = Vec::::with_capacity(NUM_ELEMENTS); + + for _ in 0..NUM_ELEMENTS { + numbers.push(rng.gen_range(0..i32::MAX)); + } + let message = std_msgs::msg::Int32MultiArray { + layout: std_msgs::msg::MultiArrayLayout::default(), + data: numbers + }; + + let bytes = message.to_serialized_bytes().unwrap(); + + for _ in 0..NUM_TIMES { + let _msg1 = cdr::deserialize::(&bytes).unwrap(); + // just for testing that we get the same result. + // let msg2 = std_msgs::msg::Int32MultiArray::from_serialized_bytes(&bytes).unwrap(); + // assert_eq!(msg1, msg2); + // assert_eq!(msg2, message); + } +} + +pub fn criterion_benchmark(c: &mut Criterion) { + c.bench_function("ros_deserialization", |b| b.iter(|| bench_ros_deserialization())); + c.bench_function("cdr_deserialization", |b| b.iter(|| bench_cdr_deserialization())); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches);