Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

launch testing : Wait for topics to publish #274

Merged
merged 11 commits into from
Oct 6, 2021
2 changes: 2 additions & 0 deletions launch_testing_ros/launch_testing_ros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from .data_republisher import DataRepublisher
from .message_pump import MessagePump
from .test_runner import LaunchTestRunner
from . wait_for_topics import WaitForTopics

__all__ = [
'DataRepublisher',
'LaunchTestRunner',
'MessagePump',
'tools',
'WaitForTopics',
]
122 changes: 122 additions & 0 deletions launch_testing_ros/launch_testing_ros/wait_for_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import string
from threading import Event
from threading import Thread

import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node


class WaitForTopics:
"""
Wait to receive messages on supplied topics.

Example usage:
--------------

from std_msgs.msg import String

# Method 1, using the 'with' keyword
def method_1():
topic_list = [('topic_1', String), ('topic_2', String)]
with WaitForTopics(topic_list, timeout=5.0):
# 'topic_1' and 'topic_2' received at least one message each
print('Given topics are receiving messages !')

# Method 2, calling wait() and shutdown() manually
def method_2():
topic_list = [('topic_1', String), ('topic_2', String)]
wait_for_topics = WaitForTopics(topic_list, timeout=5.0)
assert wait_for_topics.wait()
print('Given topics are receiving messages !')
wait_for_topics.shutdown()
"""

def __init__(self, topic_tuples, timeout=5.0):
self.topic_tuples = topic_tuples
self.timeout = timeout
self.__ros_context = rclpy.Context()
rclpy.init(context=self.__ros_context)
self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)
node_name = '_test_node_' +\
''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
self.__ros_node = _WaitForTopicsNode(name=node_name, node_context=self.__ros_context)
self.__ros_executor.add_node(self.__ros_node)

# Start spinning
self.__running = True
self.__ros_spin_thread = Thread(target=self.spin_function)
self.__ros_spin_thread.start()

def spin_function(self):
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
while self.__running:
self.__ros_executor.spin_once(1.0)

def wait(self):
self.__ros_node.start_subscribers(self.topic_tuples)
return self.__ros_node.msg_event_object.wait(self.timeout)

def shutdown(self):
self.__running = False
self.__ros_spin_thread.join()
self.__ros_node.destroy_node()
rclpy.shutdown(context=self.__ros_context)

def __enter__(self):
assert self.wait()
adityapande-1995 marked this conversation as resolved.
Show resolved Hide resolved
return self

def __exit__(self, exep_type, exep_value, trace):
if exep_type is not None:
raise Exception('Exception occured, value: ', exep_value)
self.shutdown()


class _WaitForTopicsNode(Node):
"""Internal dummy node to be used for listening on topics."""

def __init__(self, name='test_node', node_context=None):
super().__init__(node_name=name, context=node_context)
self.msg_event_object = Event()

def start_subscribers(self, topic_tuples):
self.subscriber_list = []
self.expected_topics = {name for name, _ in topic_tuples}
self.received_topics = set()

for topic_name, topic_type in topic_tuples:
# Create a subscriber
self.subscriber_list.append(
self.create_subscription(
topic_type,
topic_name,
self.callback_template(topic_name),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can use functools.partial.

functools.partial(self.topic_callback, topic_name)

and then the callback would look like:

def topic_callback(self, topic_name, data):
    ...

I don't think there's anything wrong with this, so feel free to leave it as-is.

10
)
)

def callback_template(self, topic_name):

def topic_callback(data):
if topic_name not in self.received_topics:
self.get_logger().debug('Message received for ' + topic_name)
self.received_topics.add(topic_name)
if self.received_topics == self.expected_topics:
self.msg_event_object.set()

return topic_callback