Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add get_publishers_info_by_topic #80

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.git
.github
target
95 changes: 94 additions & 1 deletion r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ impl Node {
}

/// Create a ROS publisher with a type given at runtime, where the data may either be
/// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
/// supplied as JSON (using the `publish` method) or a pre-serialized ROS message
/// (i.e. &[u8], using the `publish_raw` method).
pub fn create_publisher_untyped(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
Expand Down Expand Up @@ -1169,6 +1169,46 @@ impl Node {
Ok(res)
}

pub fn get_publishers_info_by_topic(
&self, topic_name: &str, no_mangle: bool,
) -> Result<Vec<TopicEndpointInfo>> {
let node = self.node_handle.as_ref();

let topic_c_string =
CString::new(topic_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;

let mut allocator = unsafe { rcutils_get_default_allocator() };

let mut info_array: rcl_topic_endpoint_info_array_t =
unsafe { rmw_get_zero_initialized_topic_endpoint_info_array() };

let result = unsafe {
rcl_get_publishers_info_by_topic(
node,
&mut allocator,
topic_c_string.as_ptr(),
no_mangle,
&mut info_array,
)
};

if result != RCL_RET_OK as i32 {
unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };
return Err(Error::from_rcl_error(result));
}

// Convert info_array to Vec<TopicEndpointInfo>
let topic_info_list = convert_info_array_to_vec(&info_array);

let result = unsafe { rmw_topic_endpoint_info_array_fini(&mut info_array, &mut allocator) };

if result != RCL_RET_OK as i32 {
return Err(Error::from_rcl_error(result));
}

Ok(topic_info_list)
}

/// Create a ROS wall timer.
///
/// Create a ROS timer that is woken up by spin every `period`.
Expand Down Expand Up @@ -1324,3 +1364,56 @@ impl Drop for Node {
pub trait IsAvailablePollable {
fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()>;
}

pub struct TopicEndpointInfo {
pub node_name: String,
pub node_namespace: String,
pub topic_type: String,
pub endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize],
pub qos_profile: QosProfile,
}

impl From<rmw_topic_endpoint_info_t> for TopicEndpointInfo {
fn from(info: rmw_topic_endpoint_info_t) -> Self {
// Convert C strings to Rust String
let node_name = unsafe { CStr::from_ptr(info.node_name) }
.to_string_lossy()
.into_owned();
let node_namespace = unsafe { CStr::from_ptr(info.node_namespace) }
.to_string_lossy()
.into_owned();
let topic_type = unsafe { CStr::from_ptr(info.topic_type) }
.to_string_lossy()
.into_owned();

// Copy the endpoint_gid array
let endpoint_gid: [u8; RMW_GID_STORAGE_SIZE as usize] = info.endpoint_gid;

// Convert qos_profile
let qos_profile = QosProfile::from(info.qos_profile); // Adjust this line based on how QosProfile is defined

TopicEndpointInfo {
node_name,
node_namespace,
topic_type,
endpoint_gid,
qos_profile,
}
}
}

fn convert_info_array_to_vec(
info_array: &rcl_topic_endpoint_info_array_t,
) -> Vec<TopicEndpointInfo> {
let mut topic_info_list = Vec::with_capacity(info_array.size);

unsafe {
let infos = std::slice::from_raw_parts(info_array.info_array, info_array.size);
for &info in infos {
let endpoint_info = TopicEndpointInfo::from(info);
topic_info_list.push(endpoint_info);
}
}

topic_info_list
}
117 changes: 117 additions & 0 deletions r2r/src/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@ impl From<HistoryPolicy> for rmw_qos_history_policy_t {
}
}

impl From<rmw_qos_history_policy_t> for HistoryPolicy {
fn from(rmw_history_policy: rmw_qos_history_policy_t) -> Self {
match rmw_history_policy {
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_ALL => HistoryPolicy::KeepAll,
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_KEEP_LAST => HistoryPolicy::KeepLast,
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT => {
HistoryPolicy::SystemDefault
}
rmw_qos_history_policy_t::RMW_QOS_POLICY_HISTORY_UNKNOWN => HistoryPolicy::Unknown,
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReliabilityPolicy {
BestEffort,
Expand All @@ -259,6 +272,25 @@ impl From<ReliabilityPolicy> for rmw_qos_reliability_policy_t {
}
}

impl From<rmw_qos_reliability_policy_t> for ReliabilityPolicy {
fn from(rmw_reliability_policy: rmw_qos_reliability_policy_t) -> Self {
match rmw_reliability_policy {
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT => {
ReliabilityPolicy::BestEffort
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_RELIABLE => {
ReliabilityPolicy::Reliable
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT => {
ReliabilityPolicy::SystemDefault
}
rmw_qos_reliability_policy_t::RMW_QOS_POLICY_RELIABILITY_UNKNOWN => {
ReliabilityPolicy::Unknown
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DurabilityPolicy {
TransientLocal,
Expand Down Expand Up @@ -286,6 +318,25 @@ impl From<DurabilityPolicy> for rmw_qos_durability_policy_t {
}
}

impl From<rmw_qos_durability_policy_t> for DurabilityPolicy {
fn from(rmw_durability_policy: rmw_qos_durability_policy_t) -> Self {
match rmw_durability_policy {
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL => {
DurabilityPolicy::TransientLocal
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_VOLATILE => {
DurabilityPolicy::Volatile
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT => {
DurabilityPolicy::SystemDefault
}
rmw_qos_durability_policy_t::RMW_QOS_POLICY_DURABILITY_UNKNOWN => {
DurabilityPolicy::Unknown
}
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LivelinessPolicy {
Automatic,
Expand Down Expand Up @@ -317,6 +368,28 @@ impl From<LivelinessPolicy> for rmw_qos_liveliness_policy_t {
}
}

impl From<rmw_qos_liveliness_policy_t> for LivelinessPolicy {
fn from(rmw_liveliness_policy: rmw_qos_liveliness_policy_t) -> Self {
match rmw_liveliness_policy {
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_AUTOMATIC => {
LivelinessPolicy::Automatic
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE => {
LivelinessPolicy::ManualByNode
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC => {
LivelinessPolicy::ManualByTopic
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT => {
LivelinessPolicy::SystemDefault
}
rmw_qos_liveliness_policy_t::RMW_QOS_POLICY_LIVELINESS_UNKNOWN => {
LivelinessPolicy::Unknown
}
}
}
}

/// QoS profile
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QosProfile {
Expand Down Expand Up @@ -719,8 +792,28 @@ impl From<QosProfile> for r2r_rcl::rmw_qos_profile_t {
}
}
}

impl From<r2r_rcl::rmw_qos_profile_t> for QosProfile {
fn from(rmw_qos: r2r_rcl::rmw_qos_profile_t) -> Self {
QosProfile {
history: rmw_qos.history.into(),
depth: rmw_qos.depth,
reliability: rmw_qos.reliability.into(),
durability: rmw_qos.durability.into(),
deadline: Duration::from_rmw_time_t(&rmw_qos.deadline),
lifespan: Duration::from_rmw_time_t(&rmw_qos.lifespan),
liveliness: rmw_qos.liveliness.into(),
liveliness_lease_duration: Duration::from_rmw_time_t(
&rmw_qos.liveliness_lease_duration,
),
avoid_ros_namespace_conventions: rmw_qos.avoid_ros_namespace_conventions,
}
}
}

pub(crate) trait RclDurationT {
fn to_rmw_time_t(&self) -> rmw_time_t;
fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self;
}

impl RclDurationT for Duration {
Expand All @@ -730,4 +823,28 @@ impl RclDurationT for Duration {
nsec: self.subsec_nanos().into(),
}
}

fn from_rmw_time_t(rmw_time: &rmw_time_t) -> Self {
#[cfg(not(r2r__ros__distro__foxy))]
{
assert!(
rmw_time.nsec < 1_000_000_000,
"nsec part of rmw_time_t should be less than 1 billion"
);
}

#[cfg(r2r__ros__distro__foxy)]
{
// FIXME: In foxy, duration data obtained from publisher with default qos profile is
// sec: 7FFFFFFF (2147483647), nsec: FFFFFFFF (4294967295)
if rmw_time.nsec == 4294967295 {
// 0s indicates deadline policies are not tracked or enforced in foxy
return Duration::new(0, 0);
} else if rmw_time.nsec > 1_000_000_000 {
panic!("nsec part of rmw_time_t should be less than 1 billion");
}
}

Duration::new(rmw_time.sec, rmw_time.nsec as u32)
}
}
Loading