Skip to content

Commit

Permalink
Implement matched event (#1083)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <[email protected]>
  • Loading branch information
Barry-Xu-2018 authored Mar 22, 2023
1 parent 32f8231 commit f6d652f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 2 deletions.
28 changes: 28 additions & 0 deletions rclpy/rclpy/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@
# Payload type for Subscription Incompatible QoS callback.
QoSRequestedIncompatibleQoSInfo = _rclpy.rmw_requested_qos_incompatible_event_status_t

# Payload type for Subscription matched callback.
QoSSubscriptionMatchedInfo = _rclpy.rmw_matched_status_t

# Payload type for Publisher Deadline callback.
QoSOfferedDeadlineMissedInfo = _rclpy.rmw_offered_deadline_missed_status_t

# Payload type for Publisher Liveliness callback.
QoSLivelinessLostInfo = _rclpy.rmw_liveliness_lost_status_t

# Payload type for Publisher matched callback.
QoSPublisherMatchedInfo = _rclpy.rmw_matched_status_t

"""
Payload type for Publisher Incompatible QoS callback.
Expand Down Expand Up @@ -154,6 +160,7 @@ def __init__(
liveliness: Optional[Callable[[QoSLivelinessChangedInfo], None]] = None,
message_lost: Optional[Callable[[QoSMessageLostInfo], None]] = None,
incompatible_type: Optional[Callable[[IncompatibleTypeInfo], None]] = None,
matched: Optional[Callable[[QoSSubscriptionMatchedInfo], None]] = None,
use_default_callbacks: bool = True,
) -> None:
"""
Expand All @@ -168,6 +175,8 @@ def __init__(
:param message_lost: A user-defined callback that is called when a messages is lost.
:param incompatible_type: A user-defined callback that is called when a topic type
doesn't match.
:param matched: A user-defined callback that is called when a Publisher is connected or
disconnected.
:param use_default_callbacks: Whether or not to use default callbacks when the user
doesn't supply one
"""
Expand All @@ -176,6 +185,7 @@ def __init__(
self.liveliness = liveliness
self.message_lost = message_lost
self.incompatible_type = incompatible_type
self.matched = matched
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
Expand Down Expand Up @@ -250,6 +260,13 @@ def _default_incompatible_type_callback(event):
except UnsupportedEventTypeError:
pass

if self.matched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED,
parent_impl=subscription))

return event_handlers


Expand All @@ -263,6 +280,7 @@ def __init__(
liveliness: Optional[Callable[[QoSLivelinessLostInfo], None]] = None,
incompatible_qos: Optional[Callable[[QoSRequestedIncompatibleQoSInfo], None]] = None,
incompatible_type: Optional[Callable[[IncompatibleTypeInfo], None]] = None,
matched: Optional[Callable[[QoSPublisherMatchedInfo], None]] = None,
use_default_callbacks: bool = True,
) -> None:
"""
Expand All @@ -276,13 +294,16 @@ def __init__(
with incompatible QoS policies is discovered on subscribed topic.
:param incompatible_type: A user-defined callback that is called when a topic type
doesn't match.
:param matched: A user-defined callback that is called when a Subscription is connected or
disconnected.
:param use_default_callbacks: Whether or not to use default callbacks when the user
doesn't supply one
"""
self.deadline = deadline
self.liveliness = liveliness
self.incompatible_qos = incompatible_qos
self.incompatible_type = incompatible_type
self.matched = matched
self.use_default_callbacks = use_default_callbacks

def create_event_handlers(
Expand Down Expand Up @@ -350,4 +371,11 @@ def _default_incompatible_type_callback(event):
except UnsupportedEventTypeError:
pass

if self.matched:
event_handlers.append(QoSEventHandler(
callback_group=callback_group,
callback=self.matched,
event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED,
parent_impl=publisher))

return event_handlers
19 changes: 17 additions & 2 deletions rclpy/src/rclpy/event_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ typedef union event_callback_data {
rmw_liveliness_changed_status_t liveliness_changed;
rmw_message_lost_status_t message_lost;
rmw_requested_qos_incompatible_event_status_t requested_incompatible_qos;
rmw_matched_status_t subscription_matched;
// Publisher events
rmw_offered_deadline_missed_status_t offered_deadline_missed;
rmw_liveliness_lost_status_t liveliness_lost;
rmw_offered_qos_incompatible_event_status_t offered_incompatible_qos;
rmw_matched_status_t publisher_matched;

rmw_incompatible_type_status_t incompatible_type;
} event_callback_data_t;
Expand Down Expand Up @@ -144,6 +146,8 @@ EventHandle::take_event()
return py::cast(data.requested_incompatible_qos);
case RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE:
return py::cast(data.incompatible_type);
case RCL_SUBSCRIPTION_MATCHED:
return py::cast(data.subscription_matched);
default:
// suggests a misalignment between C and Python interfaces
throw py::value_error("event type for subscriptions not understood");
Expand All @@ -158,6 +162,8 @@ EventHandle::take_event()
return py::cast(data.offered_incompatible_qos);
case RCL_PUBLISHER_INCOMPATIBLE_TYPE:
return py::cast(data.incompatible_type);
case RCL_PUBLISHER_MATCHED:
return py::cast(data.publisher_matched);
default:
// suggests a misalignment between C and Python interfaces
throw py::value_error("event type for publishers not understood");
Expand Down Expand Up @@ -186,13 +192,15 @@ define_event_handle(py::module module)
.value("RCL_SUBSCRIPTION_LIVELINESS_CHANGED", RCL_SUBSCRIPTION_LIVELINESS_CHANGED)
.value("RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS", RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)
.value("RCL_SUBSCRIPTION_MESSAGE_LOST", RCL_SUBSCRIPTION_MESSAGE_LOST)
.value("RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE", RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE);
.value("RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE", RCL_SUBSCRIPTION_INCOMPATIBLE_TYPE)
.value("RCL_SUBSCRIPTION_MATCHED", RCL_SUBSCRIPTION_MATCHED);

py::enum_<rcl_publisher_event_type_t>(module, "rcl_publisher_event_type_t")
.value("RCL_PUBLISHER_OFFERED_DEADLINE_MISSED", RCL_PUBLISHER_OFFERED_DEADLINE_MISSED)
.value("RCL_PUBLISHER_LIVELINESS_LOST", RCL_PUBLISHER_LIVELINESS_LOST)
.value("RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS", RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)
.value("RCL_PUBLISHER_INCOMPATIBLE_TYPE", RCL_PUBLISHER_INCOMPATIBLE_TYPE);
.value("RCL_PUBLISHER_INCOMPATIBLE_TYPE", RCL_PUBLISHER_INCOMPATIBLE_TYPE)
.value("RCL_PUBLISHER_MATCHED", RCL_PUBLISHER_MATCHED);

py::class_<rmw_requested_deadline_missed_status_t>(
module, "rmw_requested_deadline_missed_status_t")
Expand Down Expand Up @@ -231,6 +239,13 @@ define_event_handle(py::module module)
.def_readonly("total_count", &rmw_liveliness_lost_status_t::total_count)
.def_readonly("total_count_change", &rmw_liveliness_lost_status_t::total_count_change);

py::class_<rmw_matched_status_t>(module, "rmw_matched_status_t")
.def(py::init<>())
.def_readonly("total_count", &rmw_matched_status_t::total_count)
.def_readonly("total_count_change", &rmw_matched_status_t::total_count_change)
.def_readonly("current_count", &rmw_matched_status_t::current_count)
.def_readonly("current_count_change", &rmw_matched_status_t::current_count_change);

py::enum_<rmw_qos_policy_kind_t>(module, "rmw_qos_policy_kind_t")
.value("RMW_QOS_POLICY_INVALID", RMW_QOS_POLICY_INVALID)
.value("RMW_QOS_POLICY_DURABILITY", RMW_QOS_POLICY_DURABILITY)
Expand Down
102 changes: 102 additions & 0 deletions rclpy/test/test_qos_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
from rclpy.event_handler import QoSOfferedDeadlineMissedInfo
from rclpy.event_handler import QoSOfferedIncompatibleQoSInfo
from rclpy.event_handler import QoSPublisherEventType
from rclpy.event_handler import QoSPublisherMatchedInfo
from rclpy.event_handler import QoSRequestedDeadlineMissedInfo
from rclpy.event_handler import QoSRequestedIncompatibleQoSInfo
from rclpy.event_handler import QoSSubscriptionEventType
from rclpy.event_handler import QoSSubscriptionMatchedInfo
from rclpy.event_handler import SubscriptionEventCallbacks
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
from rclpy.qos import QoSDurabilityPolicy
Expand Down Expand Up @@ -208,6 +210,8 @@ def test_publisher_event_create_destroy(self):
publisher, QoSPublisherEventType.RCL_PUBLISHER_LIVELINESS_LOST)
self._do_create_destroy(
publisher, QoSPublisherEventType.RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS)
self._do_create_destroy(
publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED)
self.node.destroy_publisher(publisher)

def test_subscription_event_create_destroy(self):
Expand All @@ -220,6 +224,8 @@ def test_subscription_event_create_destroy(self):
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED)
self._do_create_destroy(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS)
self._do_create_destroy(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED)
self.node.destroy_subscription(subscription)

def test_call_publisher_rclpy_event_apis(self):
Expand Down Expand Up @@ -354,3 +360,99 @@ def test_call_subscription_rclpy_event_apis(self):
pass

self.node.destroy_subscription(subscription)

def test_call_publisher_rclpy_event_matched(self):
publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10)
with self.context.handle:
wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle)

matched_event_handle = self._create_event_handle(
publisher, QoSPublisherEventType.RCL_PUBLISHER_MATCHED)
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

wait_set.wait(0)
self.assertFalse(wait_set.is_ready('event', matched_event_index))

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

subscription = self.node.create_subscription(EmptyMsg, self.topic_name, Mock(), 10)
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))

matched_status = matched_event_handle.take_event()
self.assertIsInstance(matched_status, QoSPublisherMatchedInfo)
self.assertEqual(matched_status.total_count, 1)
self.assertEqual(matched_status.total_count_change, 1)
self.assertEqual(matched_status.current_count, 1)
self.assertEqual(matched_status.current_count_change, 1)

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

subscription.destroy()
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))

matched_status = matched_event_handle.take_event()
self.assertEqual(matched_status.total_count, 1)
self.assertEqual(matched_status.total_count_change, 0)
self.assertEqual(matched_status.current_count, 0)
self.assertEqual(matched_status.current_count_change, -1)

def test_call_subscription_rclpy_event_matched_unmatched(self):
message_callback = Mock()
subscription = self.node.create_subscription(
EmptyMsg, self.topic_name, message_callback, 10)
with self.context.handle:
wait_set = _rclpy.WaitSet(0, 0, 0, 0, 0, 2, self.context.handle)

matched_event_handle = self._create_event_handle(
subscription, QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED)
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

wait_set.wait(0)
self.assertFalse(wait_set.is_ready('event', matched_event_index))

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

publisher = self.node.create_publisher(EmptyMsg, self.topic_name, 10)
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))

matched_status = matched_event_handle.take_event()
self.assertIsInstance(matched_status, QoSSubscriptionMatchedInfo)
self.assertEqual(matched_status.total_count, 1)
self.assertEqual(matched_status.total_count_change, 1)
self.assertEqual(matched_status.current_count, 1)
self.assertEqual(matched_status.current_count_change, 1)

wait_set.clear_entities()
with matched_event_handle:
matched_event_index = wait_set.add_event(matched_event_handle)
self.assertIsNotNone(matched_event_index)

publisher.destroy()
# wait 500ms
wait_set.wait(500000000)
self.assertTrue(wait_set.is_ready('event', matched_event_index))

matched_status = matched_event_handle.take_event()
self.assertEqual(matched_status.total_count, 1)
self.assertEqual(matched_status.total_count_change, 0)
self.assertEqual(matched_status.current_count, 0)
self.assertEqual(matched_status.current_count_change, -1)

0 comments on commit f6d652f

Please sign in to comment.