We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
launch_testing
It would be nice to have a way to check if a topic is publishing messages (with a timeout). THis suggestion came up in #263
One way to do that can be :
from threading import Event, Thread import unittest import launch import launch.actions import launch_ros.actions import launch_testing.actions import pytest import rclpy from rclpy.node import Node from std_msgs.msg import String @pytest.mark.launch_test def generate_test_description(): return launch.LaunchDescription([ launch_ros.actions.Node( package='demo_nodes_cpp', executable='talker', name='demo_node_1', ), launch_testing.actions.ReadyToTest() ]) class TestFixture(unittest.TestCase): def test_check_if_msgs_published(self, proc_output): rclpy.init() node = MakeTestNode('test_node') node.start_subscriber() msgs_received_flag = node.msg_event_object.wait(timeout=5.0) assert msgs_received_flag, 'Did not receive msgs !' rclpy.shutdown() class MakeTestNode(Node): def __init__(self, name='test_node'): super().__init__(name) self.msg_event_object = Event() def start_subscriber(self): # Create a subscriber self.subscription = self.create_subscription( String, 'chatter', self.subscriber_callback, 10 ) # Add a spin thread self.ros_spin_thread = Thread(target=lambda node: rclpy.spin(node), args=(self,)) self.ros_spin_thread.start() def subscriber_callback(self, data): self.msg_event_object.set()
The text was updated successfully, but these errors were encountered:
IMO, ideally syntax would look like this:
def test_foo(self): wait_for_topics = WaitForTopics(['foo', 'bar', 'baz']) assert wait_for_topics.wait(timeout=5.0) ...
However, we need an instance to a ROS node (and context). Maybe we can use the existing ROSAdapter class, e.g.:
ROSAdapter
class WaitForTopics: def __init__(self, *, topics): self.__ros_adapter = ROSAdapter() self.__ros_adapter.start() def __del__(self): self.__ros_adapter.shutdown()
@ivanpauno @wjwwood, thoughts?
Sorry, something went wrong.
Relevant PR : #274
adityapande-1995
Successfully merging a pull request may close this issue.
Feature request
Feature description
It would be nice to have a way to check if a topic is publishing messages (with a timeout).
THis suggestion came up in #263
Implementation considerations
One way to do that can be :
The text was updated successfully, but these errors were encountered: