Skip to content

Commit

Permalink
Rename RosRequestId to CddsRequestHeader (as specific to rmw_cycloned…
Browse files Browse the repository at this point in the history
…ds_cpp)
  • Loading branch information
JEnoch committed Jan 5, 2024
1 parent 83b1dc7 commit 863fea1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 50 deletions.
75 changes: 39 additions & 36 deletions zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,37 +165,40 @@ pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String {
)
}

const ATTACHMENT_KEY_REQUEST_ID: [u8; 5] = [0x72, 0x65, 0x71, 0x69, 0x64]; // "reqid"
const ATTACHMENT_KEY_REQUEST_HEADER: [u8; 3] = [0x72, 0x71, 0x68]; // "rqh" in ASCII

/// In rmw_cyclonedds_cpp a Request id is used as header in each request and reply payload.
/// In rmw_cyclonedds_cpp a cdds_request_header sent within each request and reply payload.
/// See https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
/// It's a 16 bytes buffer made of a 8 bytes client id and a 8 bytes sequence number (all endianness dependent)
/// Note that it's different from the rmw_request_id_t defined in RMW interfaces in
/// https://github.com/ros2/rmw/blob/9b3d9d0e3021b7a6e75d8886e3e061a53c36c789/rmw/include/rmw/types.h#L360
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
pub struct RosRequestId {
// A ROS 2 request id is a 16 bytes buffer with the client id as 8 first bytes and
id: [u8; 16],
pub struct CddsRequestHeader {
// The header contains a u64 GUID (Client's) and a i64 sequence number.
// Keep those as a single buffer, as it's transfered as such between DDS and Zenoh.
header: [u8; 16],
// the sequence number is subject to endianness, we need to keep a flag for it
is_little_endian: bool,
}

impl RosRequestId {
pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> RosRequestId {
let mut id = [0u8; 16];
impl CddsRequestHeader {
pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> CddsRequestHeader {
let mut header = [0u8; 16];
if is_little_endian {
id[..8].copy_from_slice(&client_id.to_le_bytes());
id[8..].copy_from_slice(&seq_num.to_le_bytes())
header[..8].copy_from_slice(&client_id.to_le_bytes());
header[8..].copy_from_slice(&seq_num.to_le_bytes())
} else {
id[..8].copy_from_slice(&client_id.to_be_bytes());
id[8..].copy_from_slice(&seq_num.to_be_bytes())
header[..8].copy_from_slice(&client_id.to_be_bytes());
header[8..].copy_from_slice(&seq_num.to_be_bytes())
}
RosRequestId {
id,
CddsRequestHeader {
header,
is_little_endian,
}
}

pub fn from_slice(id: [u8; 16], is_little_endian: bool) -> RosRequestId {
RosRequestId {
id,
pub fn from_slice(header: [u8; 16], is_little_endian: bool) -> CddsRequestHeader {
CddsRequestHeader {
header,
is_little_endian,
}
}
Expand All @@ -205,63 +208,63 @@ impl RosRequestId {
}

pub fn as_slice(&self) -> &[u8] {
&self.id
&self.header
}

pub fn as_attachment(&self) -> Attachment {
let mut attach = Attachment::new();

// concat id + endianness flag
// concat header + endianness flag
let mut buf = [0u8; 17];
buf[0..16].copy_from_slice(&self.id);
buf[0..16].copy_from_slice(&self.header);
buf[16] = self.is_little_endian as u8;

attach.insert(&ATTACHMENT_KEY_REQUEST_ID, &buf);
attach.insert(&ATTACHMENT_KEY_REQUEST_HEADER, &buf);
attach
}
}

impl TryFrom<&Attachment> for RosRequestId {
impl TryFrom<&Attachment> for CddsRequestHeader {
type Error = ZError;
fn try_from(value: &Attachment) -> Result<Self, Self::Error> {
match value.get(&ATTACHMENT_KEY_REQUEST_ID) {
match value.get(&ATTACHMENT_KEY_REQUEST_HEADER) {
Some(buf) => {
if buf.len() == 17 {
let id: [u8; 16] = buf[0..16]
let header: [u8; 16] = buf[0..16]
.try_into()
.expect("Shouldn't happen: buf is 17 bytes");
Ok(RosRequestId {
id,
Ok(CddsRequestHeader {
header,
is_little_endian: buf[16] != 0,
})
} else {
bail!("Attachment 'reqid' is not 16 bytes: {buf:02x?}")
bail!("Attachment 'header' is not 16 bytes: {buf:02x?}")
}
}
None => bail!("No 'reqid' key found in Attachment"),
None => bail!("No 'header' key found in Attachment"),
}
}
}

impl std::fmt::Display for RosRequestId {
impl std::fmt::Display for CddsRequestHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// a request id is made of 8 bytes client id + 8 bytes sequence number
// a request header is made of 8 bytes client guid + 8 bytes sequence number
// display as such for easier understanding
write!(f, "(")?;
for i in &self.id[0..8] {
for i in &self.header[0..8] {
write!(f, "{i:02x}")?;
}
let seq_num = if self.is_little_endian {
u64::from_le_bytes(
self.id[8..]
self.header[8..]
.try_into()
.expect("Shouldn't happen: self.id is 16 bytes"),
.expect("Shouldn't happen: self.header is 16 bytes"),
)
} else {
u64::from_be_bytes(
self.id[8..]
self.header[8..]
.try_into()
.expect("Shouldn't happen: self.id is 16 bytes"),
.expect("Shouldn't happen: self.header is 16 bytes"),
)
};
write!(f, ",{seq_num})",)
Expand Down
10 changes: 5 additions & 5 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid};
use crate::liveliness_mgt::new_ke_liveliness_service_cli;
use crate::ros2_utils::{
is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type,
ros2_service_type_to_request_dds_type, RosRequestId, QOS_DEFAULT_SERVICE,
ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE,
};
use crate::routes_mgr::Context;
use crate::LOG_PAYLOAD;
Expand Down Expand Up @@ -280,7 +280,7 @@ fn route_dds_request_to_zenoh(
sample: &DDSRawSample,
zenoh_key_expr: &OwnedKeyExpr,
zsession: &Arc<Session>,
queries_timeout: Duration,
query_timeout: Duration,
rep_writer: dds_entity_t,
) {
// request payload is expected to be the Request type encoded as CDR, including a 4 bytes header,
Expand All @@ -295,7 +295,7 @@ fn route_dds_request_to_zenoh(
let dds_req_buf = zbuf.contiguous();
let is_little_endian =
is_cdr_little_endian(&dds_req_buf).expect("Shouldn't happen: sample.len >= 20");
let request_id = RosRequestId::from_slice(
let request_id = CddsRequestHeader::from_slice(
dds_req_buf[4..20]
.try_into()
.expect("Shouldn't happen: sample.len >= 20"),
Expand Down Expand Up @@ -325,7 +325,7 @@ fn route_dds_request_to_zenoh(
.with_value(zenoh_req_buf)
.with_attachment(request_id.as_attachment())
.allowed_destination(Locality::Remote)
.timeout(queries_timeout)
.timeout(query_timeout)
.callback(move |reply| {
route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer)
})
Expand All @@ -338,7 +338,7 @@ fn route_dds_request_to_zenoh(
fn route_zenoh_reply_to_dds(
route_id: String,
reply: Reply,
request_id: RosRequestId,
request_id: CddsRequestHeader,
rep_writer: dds_entity_t,
) {
match reply.sample {
Expand Down
18 changes: 9 additions & 9 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid};
use crate::liveliness_mgt::new_ke_liveliness_service_srv;
use crate::ros2_utils::{
is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type,
ros2_service_type_to_request_dds_type, RosRequestId, QOS_DEFAULT_SERVICE,
ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE,
};
use crate::routes_mgr::Context;
use crate::{serialize_option_as_bool, LOG_PAYLOAD};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct RouteServiceSrv<'a> {
sequence_number: Arc<AtomicU64>,
// queries waiting for a reply
#[serde(skip)]
queries_in_progress: Arc<RwLock<HashMap<RosRequestId, Query>>>,
queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>>,
// a liveliness token associated to this route, for announcement to other plugins
#[serde(skip)]
liveliness_token: Option<LivelinessToken<'a>>,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl RouteServiceSrv<'_> {
);

// map of queries in progress
let queries_in_progress: Arc<RwLock<HashMap<RosRequestId, Query>>> =
let queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>> =
Arc::new(RwLock::new(HashMap::new()));

// create DDS Reader to receive replies and route them to Zenoh
Expand Down Expand Up @@ -224,7 +224,7 @@ impl RouteServiceSrv<'_> {

// create the zenoh Queryable
// if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data
let queries_in_progress: Arc<RwLock<HashMap<RosRequestId, Query>>> =
let queries_in_progress: Arc<RwLock<HashMap<CddsRequestHeader, Query>>> =
self.queries_in_progress.clone();
let sequence_number: Arc<AtomicU64> = self.sequence_number.clone();
let route_id: String = self.to_string();
Expand Down Expand Up @@ -340,7 +340,7 @@ impl RouteServiceSrv<'_> {

fn route_zenoh_request_to_dds(
query: Query,
queries_in_progress: &mut HashMap<RosRequestId, Query>,
queries_in_progress: &mut HashMap<CddsRequestHeader, Query>,
sequence_number: &AtomicU64,
route_id: &str,
client_guid: u64,
Expand All @@ -359,9 +359,9 @@ fn route_zenoh_request_to_dds(
// Otherwise, create one using client_guid + sequence_number
let request_id = query
.attachment()
.and_then(|a| RosRequestId::try_from(a).ok())
.and_then(|a| CddsRequestHeader::try_from(a).ok())
.unwrap_or_else(|| {
RosRequestId::create(
CddsRequestHeader::create(
client_guid,
sequence_number.fetch_add(1, Ordering::Relaxed),
is_little_endian,
Expand Down Expand Up @@ -423,7 +423,7 @@ fn route_zenoh_request_to_dds(
fn route_dds_reply_to_zenoh(
sample: &DDSRawSample,
zenoh_key_expr: OwnedKeyExpr,
queries_in_progress: &mut HashMap<RosRequestId, Query>,
queries_in_progress: &mut HashMap<CddsRequestHeader, Query>,
route_id: &str,
) {
// reply payload is expected to be the Response type encoded as CDR, including a 4 bytes header,
Expand All @@ -439,7 +439,7 @@ fn route_dds_reply_to_zenoh(
let cdr_header = &dds_rep_buf[..4];
let is_little_endian =
is_cdr_little_endian(cdr_header).expect("Shouldn't happen: cdr_header is 4 bytes");
let request_id = RosRequestId::from_slice(
let request_id = CddsRequestHeader::from_slice(
dds_rep_buf[4..20]
.try_into()
.expect("Shouldn't happen: slice is 16 bytes"),
Expand Down

0 comments on commit 863fea1

Please sign in to comment.