Skip to content

Commit

Permalink
support ros2 topic pub yaml file input (#925)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoya Fujita <[email protected]>
  • Loading branch information
fujitatomoya authored Feb 7, 2025
1 parent a91bbc1 commit 4e51c02
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 9 deletions.
61 changes: 52 additions & 9 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def add_arguments(self, parser, cli_name):
group.add_argument(
'--stdin', action='store_true',
help='Read values from standard input')
group.add_argument(
'--yaml-file', type=str, default=None,
help='YAML file that has message contents, '
'e.g STDOUT from ros2 topic echo <topic>')
parser.add_argument(
'-r', '--rate', metavar='N', type=positive_float, default=1.0,
help='Publishing rate in Hz (default: 1)')
Expand Down Expand Up @@ -131,6 +135,7 @@ def main(args):
args.message_type,
args.topic_name,
values,
args.yaml_file,
1. / args.rate,
args.print,
times,
Expand All @@ -146,6 +151,7 @@ def publisher(
message_type: MsgType,
topic_name: str,
values: dict,
yaml_file: str,
period: float,
print_nth: int,
times: int,
Expand All @@ -159,9 +165,14 @@ def publisher(
msg_module = get_message(message_type)
except (AttributeError, ModuleNotFoundError, ValueError):
raise RuntimeError('The passed message type is invalid')
values_dictionary = yaml.safe_load(values)
if not isinstance(values_dictionary, dict):
return 'The passed value needs to be a dictionary in YAML format'

msg_reader = None
if yaml_file:
msg_reader = read_msg_from_yaml(yaml_file)
else:
values_dictionary = yaml.safe_load(values)
if not isinstance(values_dictionary, dict):
return 'The passed value needs to be a dictionary in YAML format'

pub = node.create_publisher(msg_module, topic_name, qos_profile)

Expand All @@ -184,15 +195,38 @@ def publisher(
total_wait_time += DEFAULT_WAIT_TIME

msg = msg_module()
try:
timestamp_fields = set_message_fields(
msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)
timestamp_fields = None

if not msg_reader:
# Set the static message from specified values once
try:
timestamp_fields = set_message_fields(
msg, values_dictionary, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)

print('publisher: beginning loop')
count = 0
more_message = True

def timer_callback():
if msg_reader:
# Try to read out the contents for each message
try:
one_msg = next(msg_reader)
if not isinstance(one_msg, dict):
print('The contents in YAML file need to be a YAML format')
except StopIteration:
nonlocal more_message
more_message = False
return
# Set the message with contents
try:
nonlocal timestamp_fields
timestamp_fields = set_message_fields(
msg, one_msg, expand_header_auto=True, expand_time_now=True)
except Exception as e:
return 'Failed to populate field: {0}'.format(e)
stamp_now = node.get_clock().now().to_msg()
for field_setter in timestamp_fields:
field_setter(stamp_now)
Expand All @@ -205,11 +239,20 @@ def timer_callback():
timer_callback()
if times != 1:
timer = node.create_timer(period, timer_callback)
while times == 0 or count < times:
while (times == 0 or count < times) and more_message:
rclpy.spin_once(node)
# give some time for the messages to reach the wire before exiting
time.sleep(keep_alive)
node.destroy_timer(timer)
else:
# give some time for the messages to reach the wire before exiting
time.sleep(keep_alive)


def read_msg_from_yaml(yaml_file):
with open(yaml_file, 'r') as f:
for document in yaml.load_all(f, Loader=yaml.FullLoader):
if document is None:
continue # Skip if there's no more document

yield document
10 changes: 10 additions & 0 deletions ros2topic/test/resources/chatter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
data: 'Hello ROS Users'
---
---
data: Hello ROS Developers
---
data: Hello ROS Developers
---
---
data: 'Hello ROS Users'
39 changes: 39 additions & 0 deletions ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import functools
import pathlib
import sys
import unittest

Expand Down Expand Up @@ -49,6 +50,8 @@
TEST_NODE = 'cli_echo_pub_test_node'
TEST_NAMESPACE = 'cli_echo_pub'

TEST_RESOURCES_DIR = pathlib.Path(__file__).resolve().parent / 'resources'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
Expand Down Expand Up @@ -289,6 +292,42 @@ def test_pub_times(self, launch_service, proc_info, proc_output):
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_pub_yaml(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
# yaml file prevails to the values 'data: hello'
cmd=(['ros2', 'topic', 'pub', '/clitest/topic/chatter',
'std_msgs/String', '--yaml-file',
str(TEST_RESOURCES_DIR / 'chatter.yaml')]),
additional_env={
'PYTHONUNBUFFERED': '1'
},
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
filtered_rmw_implementation=get_rmw_implementation_identifier()
)
) as command:
assert command.wait_for_shutdown(timeout=10)
assert command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='Hello ROS Users')",
'',
"publishing #2: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #3: std_msgs.msg.String(data='Hello ROS Developers')",
'',
"publishing #4: std_msgs.msg.String(data='Hello ROS Users')",
'',
],
text=command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=5)
def test_echo_basic(self, launch_service, proc_info, proc_output):
params = [
Expand Down

0 comments on commit 4e51c02

Please sign in to comment.