Skip to content

Commit

Permalink
Added raw message subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
mchhoy committed Dec 13, 2023
1 parent 6bc1f5f commit 21a6551
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 0 deletions.
24 changes: 24 additions & 0 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,30 @@ impl Node {
Ok(receiver)
}

/// Subscribe to a ROS topic.
///
/// This function returns a `Stream` of ros messages as non-deserialized `Vec<u8>`:s.
/// Useful if you just want to pass the data along to another part of the system.
pub fn subscribe_raw(
&mut self, topic: &str, topic_type: &str, qos_profile: QosProfile,
) -> Result<impl Stream<Item = Vec<u8>> + Unpin> {
// TODO is it possible to handle the raw message without type support?
//
// Passing null ts to rcl_subscription_init throws an error ..
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;

let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);

let ws = RawSubscriber {
rcl_handle: subscription_handle,
sender,
};
self.subscribers.push(Box::new(ws));
Ok(receiver)
}

/// Create a ROS service.
///
/// This function returns a `Stream` of `ServiceRequest`:s. Call
Expand Down
106 changes: 106 additions & 0 deletions r2r/src/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub struct UntypedSubscriber {
pub sender: mpsc::Sender<Result<serde_json::Value>>,
}

pub struct RawSubscriber {
pub rcl_handle: rcl_subscription_t,
pub sender: mpsc::Sender<Vec<u8>>,
}


impl<T: 'static> Subscriber_ for TypedSubscriber<T>
where
T: WrappedTypesupport,
Expand Down Expand Up @@ -179,6 +185,105 @@ impl Subscriber_ for UntypedSubscriber {
}
}

impl Subscriber_ for RawSubscriber {
fn handle(&self) -> &rcl_subscription_t {
&self.rcl_handle
}

fn handle_incoming(&mut self) -> bool {

// This code is based on:
//
// https://github.com/josephduchesne/rclpy/blob/502e2135498460dd4c74cf3a6fa543590364a1fe/rclpy/src/rclpy/_rclpy.c#L2612-L2649

let mut msg_info = rmw_message_info_t::default(); // we dont care for now

let mut msg: rcl_serialized_message_t = unsafe { rcutils_get_zero_initialized_uint8_array() };
let allocator: rcutils_allocator_t = unsafe { rcutils_get_default_allocator() };
let ret: rcl_ret_t = unsafe {
rcutils_uint8_array_init(&mut msg as *mut rcl_serialized_message_t, 0, &allocator)
};

if ret != RCL_RET_OK as i32 {
log::error!("Failed to initialize message: {:?}", unsafe { rcutils_get_error_string().str_ });
unsafe { rcutils_reset_error() };

let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};

if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");
}
return false;
}

let ret = unsafe {
rcl_take_serialized_message(
&self.rcl_handle,
&mut msg as *mut rcl_serialized_message_t,
&mut msg_info,
std::ptr::null_mut())
};
if ret != RCL_RET_OK as i32 {
log::error!(
"Failed to take_serialized from a subscription: {:?}",
unsafe { rcutils_get_error_string().str_ });

// rcl_reset_error();
unsafe { rcutils_reset_error() };

let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};

if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");
}

return false;
}

// TODO put rcutils_uint8_array_fini in a message drop guard?
//
// Or is is safe to deallocate with Vec::drop instead of rcutils_uint8_array_fini?

// let data_bytes = unsafe {
// Vec::from_raw_parts(msg.buffer, msg.buffer_length, msg.buffer_capacity)
// };

let data_bytes = unsafe {
std::slice::from_raw_parts(msg.buffer, msg.buffer_length).to_vec()
};

let r_fini: rmw_ret_t = unsafe {
rcutils_uint8_array_fini(&mut msg as *mut rcl_serialized_message_t)
};

if r_fini != RMW_RET_OK as i32 {
log::error!("Failed to deallocate message buffer: {r_fini}");

return false;
}

if let Err(e) = self.sender.try_send(data_bytes) {
if e.is_disconnected() {
// user dropped the handle to the stream, signal removal.
return true;
}
log::debug!("error {:?}", e)
}

false
}

fn destroy(&mut self, node: &mut rcl_node_t) {
unsafe {
rcl_subscription_fini(&mut self.rcl_handle, node);
}
}
}

pub fn create_subscription_helper(
node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t,
qos_profile: QosProfile,
Expand All @@ -203,3 +308,4 @@ pub fn create_subscription_helper(
Err(Error::from_rcl_error(result))
}
}

78 changes: 78 additions & 0 deletions r2r/tests/tokio_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,81 @@ async fn tokio_testing() -> Result<(), Box<dyn std::error::Error>> {
assert_eq!(x, 19);
Ok(())
}


#[tokio::test(flavor = "multi_thread")]
async fn tokio_subscribe_raw_testing() -> Result<(), Box<dyn std::error::Error>> {
let ctx = r2r::Context::create()?;
let mut node = r2r::Node::create(ctx, "testnode", "")?;

let mut sub_int =
node.subscribe_raw("/int", "std_msgs/msg/Int32", QosProfile::default())?;

let mut sub_array =
node.subscribe_raw("/int_array", "std_msgs/msg/Int32MultiArray", QosProfile::default())?;

let pub_int =
node.create_publisher::<r2r::std_msgs::msg::Int32>("/int", QosProfile::default())?;

// Use an array as well since its a variable sized type
let pub_array =
node.create_publisher::<r2r::std_msgs::msg::Int32MultiArray>("/int_array", QosProfile::default())?;


task::spawn(async move {
(0..10).for_each(|i| {
pub_int
.publish(&r2r::std_msgs::msg::Int32 { data: i })
.unwrap();

pub_array.publish(&r2r::std_msgs::msg::Int32MultiArray {
layout: r2r::std_msgs::msg::MultiArrayLayout::default(),
data: vec![i]
})
.unwrap();
});
});


let sub_int_handle = task::spawn(async move {
while let Some(msg) = sub_int.next().await {
println!("Got int msg of len {}", msg.len());

// assert_eq!(msg.len(), 4);
// TODO is there padding or something?
assert_eq!(msg.len(), 8);

// assert_eq!(msg,)

}

panic!("int msg finished");
});

let sub_array_handle = task::spawn(async move {
while let Some(msg) = sub_array.next().await {

println!("Got array msg of len {}", msg.len());
// assert_eq!(msg.data, )

}

panic!("array msg finished");
});

let handle = std::thread::spawn(move || {
for _ in 1..=30 {
node.spin_once(std::time::Duration::from_millis(100));

}

});

handle.join().unwrap();

// This means something panicked ..
assert!(!sub_int_handle.is_finished());
assert!(!sub_array_handle.is_finished());

Ok(())
}

0 comments on commit 21a6551

Please sign in to comment.