From 863fea181d5b71b2e38ac9f19a7a313fed64d065 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Fri, 5 Jan 2024 15:54:12 +0100 Subject: [PATCH] Rename RosRequestId to CddsRequestHeader (as specific to rmw_cyclonedds_cpp) --- zenoh-plugin-ros2dds/src/ros2_utils.rs | 75 ++++++++++--------- zenoh-plugin-ros2dds/src/route_service_cli.rs | 10 +-- zenoh-plugin-ros2dds/src/route_service_srv.rs | 18 ++--- 3 files changed, 53 insertions(+), 50 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index d273a88..a88ad10 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -165,37 +165,40 @@ pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String { ) } -const ATTACHMENT_KEY_REQUEST_ID: [u8; 5] = [0x72, 0x65, 0x71, 0x69, 0x64]; // "reqid" +const ATTACHMENT_KEY_REQUEST_HEADER: [u8; 3] = [0x72, 0x71, 0x68]; // "rqh" in ASCII -/// In rmw_cyclonedds_cpp a Request id is used as header in each request and reply payload. +/// In rmw_cyclonedds_cpp a cdds_request_header sent within each request and reply payload. /// See https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 -/// It's a 16 bytes buffer made of a 8 bytes client id and a 8 bytes sequence number (all endianness dependent) +/// Note that it's different from the rmw_request_id_t defined in RMW interfaces in +/// https://github.com/ros2/rmw/blob/9b3d9d0e3021b7a6e75d8886e3e061a53c36c789/rmw/include/rmw/types.h#L360 #[derive(Clone, Copy, Hash, PartialEq, Eq)] -pub struct RosRequestId { - // A ROS 2 request id is a 16 bytes buffer with the client id as 8 first bytes and - id: [u8; 16], +pub struct CddsRequestHeader { + // The header contains a u64 GUID (Client's) and a i64 sequence number. + // Keep those as a single buffer, as it's transfered as such between DDS and Zenoh. + header: [u8; 16], + // the sequence number is subject to endianness, we need to keep a flag for it is_little_endian: bool, } -impl RosRequestId { - pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> RosRequestId { - let mut id = [0u8; 16]; +impl CddsRequestHeader { + pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> CddsRequestHeader { + let mut header = [0u8; 16]; if is_little_endian { - id[..8].copy_from_slice(&client_id.to_le_bytes()); - id[8..].copy_from_slice(&seq_num.to_le_bytes()) + header[..8].copy_from_slice(&client_id.to_le_bytes()); + header[8..].copy_from_slice(&seq_num.to_le_bytes()) } else { - id[..8].copy_from_slice(&client_id.to_be_bytes()); - id[8..].copy_from_slice(&seq_num.to_be_bytes()) + header[..8].copy_from_slice(&client_id.to_be_bytes()); + header[8..].copy_from_slice(&seq_num.to_be_bytes()) } - RosRequestId { - id, + CddsRequestHeader { + header, is_little_endian, } } - pub fn from_slice(id: [u8; 16], is_little_endian: bool) -> RosRequestId { - RosRequestId { - id, + pub fn from_slice(header: [u8; 16], is_little_endian: bool) -> CddsRequestHeader { + CddsRequestHeader { + header, is_little_endian, } } @@ -205,63 +208,63 @@ impl RosRequestId { } pub fn as_slice(&self) -> &[u8] { - &self.id + &self.header } pub fn as_attachment(&self) -> Attachment { let mut attach = Attachment::new(); - // concat id + endianness flag + // concat header + endianness flag let mut buf = [0u8; 17]; - buf[0..16].copy_from_slice(&self.id); + buf[0..16].copy_from_slice(&self.header); buf[16] = self.is_little_endian as u8; - attach.insert(&ATTACHMENT_KEY_REQUEST_ID, &buf); + attach.insert(&ATTACHMENT_KEY_REQUEST_HEADER, &buf); attach } } -impl TryFrom<&Attachment> for RosRequestId { +impl TryFrom<&Attachment> for CddsRequestHeader { type Error = ZError; fn try_from(value: &Attachment) -> Result { - match value.get(&ATTACHMENT_KEY_REQUEST_ID) { + match value.get(&ATTACHMENT_KEY_REQUEST_HEADER) { Some(buf) => { if buf.len() == 17 { - let id: [u8; 16] = buf[0..16] + let header: [u8; 16] = buf[0..16] .try_into() .expect("Shouldn't happen: buf is 17 bytes"); - Ok(RosRequestId { - id, + Ok(CddsRequestHeader { + header, is_little_endian: buf[16] != 0, }) } else { - bail!("Attachment 'reqid' is not 16 bytes: {buf:02x?}") + bail!("Attachment 'header' is not 16 bytes: {buf:02x?}") } } - None => bail!("No 'reqid' key found in Attachment"), + None => bail!("No 'header' key found in Attachment"), } } } -impl std::fmt::Display for RosRequestId { +impl std::fmt::Display for CddsRequestHeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // a request id is made of 8 bytes client id + 8 bytes sequence number + // a request header is made of 8 bytes client guid + 8 bytes sequence number // display as such for easier understanding write!(f, "(")?; - for i in &self.id[0..8] { + for i in &self.header[0..8] { write!(f, "{i:02x}")?; } let seq_num = if self.is_little_endian { u64::from_le_bytes( - self.id[8..] + self.header[8..] .try_into() - .expect("Shouldn't happen: self.id is 16 bytes"), + .expect("Shouldn't happen: self.header is 16 bytes"), ) } else { u64::from_be_bytes( - self.id[8..] + self.header[8..] .try_into() - .expect("Shouldn't happen: self.id is 16 bytes"), + .expect("Shouldn't happen: self.header is 16 bytes"), ) }; write!(f, ",{seq_num})",) diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index a301c4a..ba79266 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -32,7 +32,7 @@ use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, RosRequestId, QOS_DEFAULT_SERVICE, + ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::LOG_PAYLOAD; @@ -280,7 +280,7 @@ fn route_dds_request_to_zenoh( sample: &DDSRawSample, zenoh_key_expr: &OwnedKeyExpr, zsession: &Arc, - queries_timeout: Duration, + query_timeout: Duration, rep_writer: dds_entity_t, ) { // request payload is expected to be the Request type encoded as CDR, including a 4 bytes header, @@ -295,7 +295,7 @@ fn route_dds_request_to_zenoh( let dds_req_buf = zbuf.contiguous(); let is_little_endian = is_cdr_little_endian(&dds_req_buf).expect("Shouldn't happen: sample.len >= 20"); - let request_id = RosRequestId::from_slice( + let request_id = CddsRequestHeader::from_slice( dds_req_buf[4..20] .try_into() .expect("Shouldn't happen: sample.len >= 20"), @@ -325,7 +325,7 @@ fn route_dds_request_to_zenoh( .with_value(zenoh_req_buf) .with_attachment(request_id.as_attachment()) .allowed_destination(Locality::Remote) - .timeout(queries_timeout) + .timeout(query_timeout) .callback(move |reply| { route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer) }) @@ -338,7 +338,7 @@ fn route_dds_request_to_zenoh( fn route_zenoh_reply_to_dds( route_id: String, reply: Reply, - request_id: RosRequestId, + request_id: CddsRequestHeader, rep_writer: dds_entity_t, ) { match reply.sample { diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 34d0c83..8c57746 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -35,7 +35,7 @@ use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; use crate::liveliness_mgt::new_ke_liveliness_service_srv; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, RosRequestId, QOS_DEFAULT_SERVICE, + ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::{serialize_option_as_bool, LOG_PAYLOAD}; @@ -70,7 +70,7 @@ pub struct RouteServiceSrv<'a> { sequence_number: Arc, // queries waiting for a reply #[serde(skip)] - queries_in_progress: Arc>>, + queries_in_progress: Arc>>, // a liveliness token associated to this route, for announcement to other plugins #[serde(skip)] liveliness_token: Option>, @@ -158,7 +158,7 @@ impl RouteServiceSrv<'_> { ); // map of queries in progress - let queries_in_progress: Arc>> = + let queries_in_progress: Arc>> = Arc::new(RwLock::new(HashMap::new())); // create DDS Reader to receive replies and route them to Zenoh @@ -224,7 +224,7 @@ impl RouteServiceSrv<'_> { // create the zenoh Queryable // if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data - let queries_in_progress: Arc>> = + let queries_in_progress: Arc>> = self.queries_in_progress.clone(); let sequence_number: Arc = self.sequence_number.clone(); let route_id: String = self.to_string(); @@ -340,7 +340,7 @@ impl RouteServiceSrv<'_> { fn route_zenoh_request_to_dds( query: Query, - queries_in_progress: &mut HashMap, + queries_in_progress: &mut HashMap, sequence_number: &AtomicU64, route_id: &str, client_guid: u64, @@ -359,9 +359,9 @@ fn route_zenoh_request_to_dds( // Otherwise, create one using client_guid + sequence_number let request_id = query .attachment() - .and_then(|a| RosRequestId::try_from(a).ok()) + .and_then(|a| CddsRequestHeader::try_from(a).ok()) .unwrap_or_else(|| { - RosRequestId::create( + CddsRequestHeader::create( client_guid, sequence_number.fetch_add(1, Ordering::Relaxed), is_little_endian, @@ -423,7 +423,7 @@ fn route_zenoh_request_to_dds( fn route_dds_reply_to_zenoh( sample: &DDSRawSample, zenoh_key_expr: OwnedKeyExpr, - queries_in_progress: &mut HashMap, + queries_in_progress: &mut HashMap, route_id: &str, ) { // reply payload is expected to be the Response type encoded as CDR, including a 4 bytes header, @@ -439,7 +439,7 @@ fn route_dds_reply_to_zenoh( let cdr_header = &dds_rep_buf[..4]; let is_little_endian = is_cdr_little_endian(cdr_header).expect("Shouldn't happen: cdr_header is 4 bytes"); - let request_id = RosRequestId::from_slice( + let request_id = CddsRequestHeader::from_slice( dds_rep_buf[4..20] .try_into() .expect("Shouldn't happen: slice is 16 bytes"),