Skip to content

Commit

Permalink
Revert "Revert "launch testing : Wait for topics to publish (#274)" (#…
Browse files Browse the repository at this point in the history
…276)"

This reverts commit 06a47b0.

Signed-off-by: Shane Loretz <[email protected]>
  • Loading branch information
sloretz committed Oct 11, 2021
1 parent e18a829 commit 29a8bf2
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 0 deletions.
2 changes: 2 additions & 0 deletions launch_testing_ros/launch_testing_ros/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from .data_republisher import DataRepublisher
from .message_pump import MessagePump
from .test_runner import LaunchTestRunner
from . wait_for_topics import WaitForTopics

__all__ = [
'DataRepublisher',
'LaunchTestRunner',
'MessagePump',
'tools',
'WaitForTopics',
]
138 changes: 138 additions & 0 deletions launch_testing_ros/launch_testing_ros/wait_for_topics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import string
from threading import Event
from threading import Thread

import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node


class WaitForTopics:
"""
Wait to receive messages on supplied topics.
Example usage:
--------------
from std_msgs.msg import String
# Method 1, using the 'with' keyword
def method_1():
topic_list = [('topic_1', String), ('topic_2', String)]
with WaitForTopics(topic_list, timeout=5.0):
# 'topic_1' and 'topic_2' received at least one message each
print('Given topics are receiving messages !')
# Method 2, calling wait() and shutdown() manually
def method_2():
topic_list = [('topic_1', String), ('topic_2', String)]
wait_for_topics = WaitForTopics(topic_list, timeout=5.0)
assert wait_for_topics.wait()
print('Given topics are receiving messages !')
print(wait_for_topics.topics_not_received()) # Should be an empty set
print(wait_for_topics.topics_received()) # Should be {'topic_1', 'topic_2'}
wait_for_topics.shutdown()
"""

def __init__(self, topic_tuples, timeout=5.0):
self.topic_tuples = topic_tuples
self.timeout = timeout
self.__ros_context = rclpy.Context()
rclpy.init(context=self.__ros_context)
self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)

self._prepare_ros_node()

# Start spinning
self.__running = True
self.__ros_spin_thread = Thread(target=self._spin_function)
self.__ros_spin_thread.start()

def _prepare_ros_node(self):
node_name = '_test_node_' +\
''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
self.__ros_node = _WaitForTopicsNode(name=node_name, node_context=self.__ros_context)
self.__ros_executor.add_node(self.__ros_node)

def _spin_function(self):
while self.__running:
self.__ros_executor.spin_once(1.0)

def wait(self):
self.__ros_node.start_subscribers(self.topic_tuples)
return self.__ros_node.msg_event_object.wait(self.timeout)

def shutdown(self):
self.__running = False
self.__ros_spin_thread.join()
self.__ros_node.destroy_node()
rclpy.shutdown(context=self.__ros_context)

def topics_received(self):
"""Topics that received at least one message."""
return self.__ros_node.received_topics

def topics_not_received(self):
"""Topics that did not receive any messages."""
return self.__ros_node.expected_topics - self.__ros_node.received_topics

def __enter__(self):
if not self.wait():
raise RuntimeError('Did not receive messages on these topics: ',
self.topics_not_received())
return self

def __exit__(self, exep_type, exep_value, trace):
if exep_type is not None:
raise Exception('Exception occured, value: ', exep_value)
self.shutdown()


class _WaitForTopicsNode(Node):
"""Internal node used for subscribing to a set of topics."""

def __init__(self, name='test_node', node_context=None):
super().__init__(node_name=name, context=node_context)
self.msg_event_object = Event()

def start_subscribers(self, topic_tuples):
self.subscriber_list = []
self.expected_topics = {name for name, _ in topic_tuples}
self.received_topics = set()

for topic_name, topic_type in topic_tuples:
# Create a subscriber
self.subscriber_list.append(
self.create_subscription(
topic_type,
topic_name,
self.callback_template(topic_name),
10
)
)

def callback_template(self, topic_name):

def topic_callback(data):
if topic_name not in self.received_topics:
self.get_logger().debug('Message received for ' + topic_name)
self.received_topics.add(topic_name)
if self.received_topics == self.expected_topics:
self.msg_event_object.set()

return topic_callback
88 changes: 88 additions & 0 deletions launch_testing_ros/test/examples/wait_for_topic_launch_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import unittest

import launch
import launch.actions
import launch_ros.actions
import launch_testing.actions
import launch_testing.markers
from launch_testing_ros import WaitForTopics
import pytest
from std_msgs.msg import String


def generate_node(i):
"""Return node and remap the topic based on the index provided."""
path_to_test = os.path.dirname(__file__)
return launch_ros.actions.Node(
executable=sys.executable,
arguments=[os.path.join(path_to_test, 'talker.py')],
name='demo_node_' + str(i),
additional_env={'PYTHONUNBUFFERED': '1'},
remappings=[('chatter', 'chatter_' + str(i))]
)


@pytest.mark.launch_test
@launch_testing.markers.keep_alive
def generate_test_description():
# 'n' changes the number of nodes and topics generated for this test
n = 5
description = [generate_node(i) for i in range(n)] + [launch_testing.actions.ReadyToTest()]
return launch.LaunchDescription(description), {'count': n}


class TestFixture(unittest.TestCase):

def test_topics_successful(self, count):
"""All the supplied topics should be read successfully."""
topic_list = [('chatter_' + str(i), String) for i in range(count)]
expected_topics = {'chatter_' + str(i) for i in range(count)}

# Method 1 : Using the magic methods and 'with' keyword
with WaitForTopics(topic_list, timeout=2.0) as wait_for_node_object_1:
assert wait_for_node_object_1.topics_received() == expected_topics
assert wait_for_node_object_1.topics_not_received() == set()

# Multiple instances of WaitForNode() can be created safely as
# their internal nodes spin in separate contexts
# Method 2 : Manually calling wait() and shutdown()
wait_for_node_object_2 = WaitForTopics(topic_list, timeout=2.0)
assert wait_for_node_object_2.wait()
assert wait_for_node_object_2.topics_received() == expected_topics
assert wait_for_node_object_2.topics_not_received() == set()
wait_for_node_object_2.shutdown()

def test_topics_unsuccessful(self, count):
"""All topics should be read except for the 'invalid_topic'."""
topic_list = [('chatter_' + str(i), String) for i in range(count)]
# Add a topic that will never have anything published on it
topic_list.append(('invalid_topic', String))
expected_topics = {'chatter_' + str(i) for i in range(count)}

# Method 1
with pytest.raises(RuntimeError):
with WaitForTopics(topic_list, timeout=2.0):
pass

# Method 2
wait_for_node_object = WaitForTopics(topic_list, timeout=2.0)
assert not wait_for_node_object.wait()
assert wait_for_node_object.topics_received() == expected_topics
assert wait_for_node_object.topics_not_received() == {'invalid_topic'}
wait_for_node_object.shutdown()

0 comments on commit 29a8bf2

Please sign in to comment.