diff --git a/ros2service/ros2service/api/__init__.py b/ros2service/ros2service/api/__init__.py index 9143ca163..1584171e5 100644 --- a/ros2service/ros2service/api/__init__.py +++ b/ros2service/ros2service/api/__init__.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import sleep from rclpy.topic_or_service_is_hidden import topic_or_service_is_hidden from ros2cli.node.strategy import NodeStrategy from rosidl_runtime_py import get_service_interfaces from rosidl_runtime_py import message_to_yaml from rosidl_runtime_py.utilities import get_service +import rclpy + def get_service_names_and_types(*, node, include_hidden_services=False): service_names_and_types = node.get_service_names_and_types() @@ -34,6 +37,57 @@ def get_service_names(*, node, include_hidden_services=False): return [n for (n, t) in service_names_and_types] +def get_service_class(node, service, blocking=False, include_hidden_services=False): + """ + Load service type module for the given service. + + The service should be running for this function to find the service type. + :param node: The node object of rclpy Node class. + :param service: The name of the service. + :param blocking: If blocking is True this function will wait for the service to start. + :param include_hidden_services: Whether to include hidden services while finding the + list of currently running services. + :return: + """ + srv_class = _get_service_class(node, service, include_hidden_services) + if srv_class is not None: + return srv_class + elif blocking: + print(f'WARNING: service [{service}] does not appear to be started yet') + while rclpy.ok(): + srv_class = _get_service_class(node, service, include_hidden_services) + if srv_class is not None: + return srv_class + sleep(0.1) + else: + print(f'WARNING: service [{service}] does not appear to be started yet') + return None + + +def _get_service_class(node, service, include_hidden_services): + service_names_and_types = get_service_names_and_types( + node=node, + include_hidden_services=include_hidden_services) + + service_type = None + for (service_name, service_types) in service_names_and_types: + if service == service_name: + if len(service_types) > 1: + raise RuntimeError( + f"Cannot echo service '{service}', as it contains more than one " + f"type: [{', '.join(service_types)}]") + service_type = service_types[0] + break + + if service_type is None: + return None + + try: + return get_service(service_type) + except (AttributeError, ModuleNotFoundError, ValueError): + raise RuntimeError(f"The service type '{service_type}' is invalid") + + def service_type_completer(**kwargs): """Callable returning a list of service types.""" service_types = [] diff --git a/ros2service/ros2service/verb/echo.py b/ros2service/ros2service/verb/echo.py new file mode 100644 index 000000000..ef223a0b0 --- /dev/null +++ b/ros2service/ros2service/verb/echo.py @@ -0,0 +1,208 @@ +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +from typing import TypeVar + +from collections import OrderedDict +import sys +from typing import Optional, TypeVar + +import uuid +import rclpy +from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy +from rclpy.qos import QoSPresetProfiles +from rosidl_runtime_py import message_to_csv +from rosidl_runtime_py import message_to_ordereddict +from rosidl_runtime_py.utilities import get_service +from service_msgs.msg import ServiceEventInfo +import yaml + +from ros2cli.node.strategy import NodeStrategy +from ros2service.api import get_service_class +from ros2service.api import ServiceNameCompleter +from ros2service.api import ServiceTypeCompleter +from ros2service.verb import VerbExtension +from ros2topic.api import unsigned_int + +DEFAULT_TRUNCATE_LENGTH = 128 +MsgType = TypeVar('MsgType') + +def represent_ordereddict(dumper, data): + items = [] + for k, v in data.items(): + items.append((dumper.represent_data(k), dumper.represent_data(v))) + return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', items) + +class EchoVerb(VerbExtension): + """Echo a service.""" + + def __init__(self): + super().__init__() + self.no_str = None + self.no_arr = None + self.csv = None + self.flow_style = None + self.client_only = None + self.server_only = None + self.truncate_length = None + self.exclude_message_info = None + self.srv_module = None + self.event_enum = None + self.qos_profile = QoSPresetProfiles.get_from_short_key("services_default") + self.__yaml_representer_registered = False + self.event_type_map = dict( + (v, k) for k, v in ServiceEventInfo._Metaclass_ServiceEventInfo__constants.items()) + + def add_arguments(self, parser, cli_name): + arg = parser.add_argument( + 'service_name', + help="Name of the ROS service to echo (e.g. '/add_two_ints')") + arg.completer = ServiceNameCompleter( + include_hidden_services_key='include_hidden_services') + arg = parser.add_argument( + 'service_type', nargs='?', + help="Type of the ROS service (e.g. 'example_interfaces/srv/AddTwoInts')") + arg.completer = ServiceTypeCompleter(service_name_key='service_name') + parser.add_argument( + '--full-length', '-f', action='store_true', + help='Output all elements for arrays, bytes, and string with a ' + "length > '--truncate-length', by default they are truncated " + "after '--truncate-length' elements with '...'") + parser.add_argument( + '--truncate-length', '-l', type=unsigned_int, default=DEFAULT_TRUNCATE_LENGTH, + help='The length to truncate arrays, bytes, and string to ' + f'(default: {DEFAULT_TRUNCATE_LENGTH})') + parser.add_argument( + '--no-arr', action='store_true', help="Don't print array fields of messages") + parser.add_argument( + '--no-str', action='store_true', help="Don't print string fields of messages") + parser.add_argument( + '--csv', action='store_true', + help=( + 'Output all recursive fields separated by commas (e.g. for plotting).' + )) + parser.add_argument( + '--exclude-message-info', action='store_true', help='Hide associated message info.') + parser.add_argument( + '--client-only', action='store_true', help='Echo only request sent or response received by service client') + parser.add_argument( + '--server-only', action='store_true', help='Echo only request received or response sent by service server') + parser.add_argument( + '--uuid-list', + action='store_true', help='Print client_id as uint8 list UUID instead of string UUID') + + def main(self, *, args): + self.truncate_length = args.truncate_length if not args.full_length else None + self.no_arr = args.no_arr + self.no_str = args.no_str + self.csv = args.csv + self.exclude_message_info = args.exclude_message_info + self.client_only = args.client_only + self.server_only = args.server_only + self.uuid_list = args.uuid_list + event_topic_name = args.service_name + \ + _rclpy.service_introspection.RCL_SERVICE_INTROSPECTION_TOPIC_POSTFIX + + if self.server_only and self.client_only: + raise RuntimeError("--client-only and --server-only are mutually exclusive") + + if args.service_type is None: + with NodeStrategy(args) as node: + try: + self.srv_module = get_service_class( + node, args.service_name, blocking=False, include_hidden_services=True) + self.event_msg_type = self.srv_module.Event + except (AttributeError, ModuleNotFoundError, ValueError): + raise RuntimeError("The service name '%s' is invalid" % args.service_name) + else: + try: + self.srv_module = get_service(args.service_type) + self.event_msg_type = self.srv_module.Event + except (AttributeError, ModuleNotFoundError, ValueError): + raise RuntimeError(f"The service type '{args.service_type}' is invalid") + + if self.srv_module is None: + raise RuntimeError('Could not load the type for the passed service') + + with NodeStrategy(args) as node: + self.subscribe_and_spin( + node, + event_topic_name, + self.event_msg_type) + + def subscribe_and_spin(self, node, event_topic_name: str, event_msg_type: MsgType) -> Optional[str]: + """Initialize a node with a single subscription and spin.""" + node.create_subscription( + event_msg_type, + event_topic_name, + self._subscriber_callback, + self.qos_profile) + rclpy.spin(node) + + def _subscriber_callback(self, msg): + if self.csv: + print(self.format_csv_output(msg)) + else: + print(self.format_yaml_output(msg)) + print('---------------------------') + + def format_csv_output(self, msg: MsgType): + """Convert a message to a CSV string.""" + if self.exclude_message_info: + msg.info = ServiceEventInfo() + to_print = message_to_csv( + msg, + truncate_length=self.truncate_length, + no_arr=self.no_arr, + no_str=self.no_str) + return to_print + + def format_yaml_output(self, msg: MsgType): + """Pretty-format a service event message.""" + event_dict = message_to_ordereddict( + msg, + truncate_length=self.truncate_length, + no_arr=self.no_arr, + no_str=self.no_str) + + event_dict['info']['event_type'] = \ + self.event_type_map[event_dict['info']['event_type']] + + if not self.uuid_list: + uuid_hex_str = "".join([f'{i:02x}' for i in event_dict['info']['client_id']['uuid']]) + event_dict['info']['client_id']['uuid'] = str(uuid.UUID(uuid_hex_str)) + + if self.exclude_message_info: + del event_dict['info'] + + # unpack Request, Response sequences + if len(event_dict['request']) == 0: + del event_dict['request'] + else: + event_dict['request'] = event_dict['request'][0] + + if len(event_dict['response']) == 0: + del event_dict['response'] + else: + event_dict['response'] = event_dict['response'][0] + + # Register custom representer for YAML output + if not self.__yaml_representer_registered: + yaml.add_representer(OrderedDict, represent_ordereddict) + self.__yaml_representer_registered = True + + return yaml.dump(event_dict, + allow_unicode=True, + width=sys.maxsize, + default_flow_style=self.flow_style) diff --git a/ros2service/setup.py b/ros2service/setup.py index 2cdc3f5a3..30dc011be 100644 --- a/ros2service/setup.py +++ b/ros2service/setup.py @@ -41,6 +41,7 @@ ], 'ros2service.verb': [ 'call = ros2service.verb.call:CallVerb', + 'echo = ros2service.verb.echo:EchoVerb', 'find = ros2service.verb.find:FindVerb', 'list = ros2service.verb.list:ListVerb', 'type = ros2service.verb.type:TypeVerb', diff --git a/ros2service/test/fixtures/echo_client.py b/ros2service/test/fixtures/echo_client.py new file mode 100644 index 000000000..63a0e58e4 --- /dev/null +++ b/ros2service/test/fixtures/echo_client.py @@ -0,0 +1,49 @@ +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import rclpy +from rclpy.node import Node + +from test_msgs.srv import BasicTypes + + +class EchoClient(Node): + + def __init__(self): + super().__init__('echo_client') + self.future = None + self.client = self.create_client(BasicTypes, 'echo') + while not self.client.wait_for_service(timeout_sec=1.0): + self.get_logger().info('echo service not available, waiting again...') + self.req = BasicTypes.Request() + + def send_request(self): + print("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") + print("send_request") + self.req.string_value = "test" + self.future = self.client.call_async(self.req) + rclpy.spin_until_future_complete(self, self.future) + return self.future.result() + + +def main(args=None): + rclpy.init(args=args) + node = EchoClient() + node.send_request() + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/ros2service/test/test_cli.py b/ros2service/test/test_cli.py index 67967472e..f5d3ec1a4 100644 --- a/ros2service/test/test_cli.py +++ b/ros2service/test/test_cli.py @@ -65,10 +65,29 @@ def get_echo_call_output(**kwargs): @pytest.mark.rostest @launch_testing.parametrize('rmw_implementation', get_available_rmw_implementations()) def generate_test_description(rmw_implementation): - path_to_echo_server_script = os.path.join( - os.path.dirname(__file__), 'fixtures', 'echo_server.py' - ) + path_to_fixtures = os.path.join(os.path.dirname(__file__), 'fixtures') + path_to_echo_server_script = os.path.join(path_to_fixtures, 'echo_server.py') + path_to_echo_client_script = os.path.join(path_to_fixtures, 'echo_client.py') + additional_env = {'RMW_IMPLEMENTATION': rmw_implementation} + + echo_service_server_action = Node( + executable=sys.executable, + arguments=[path_to_echo_server_script], + name='echo_server', + namespace='my_ns', + additional_env=additional_env + ) + + echo_hidden_service_server_action = Node( + executable=sys.executable, + arguments=[path_to_echo_server_script], + name='_hidden_echo_server', + namespace='my_ns', + remappings=[('echo', '_echo')], + additional_env=additional_env, + ) + return LaunchDescription([ # Always restart daemon to isolate tests. ExecuteProcess( @@ -80,28 +99,15 @@ def generate_test_description(rmw_implementation): name='daemon-start', on_exit=[ # Add test fixture actions. - Node( - executable=sys.executable, - arguments=[path_to_echo_server_script], - name='echo_server', - namespace='my_ns', - additional_env=additional_env, - ), - Node( - executable=sys.executable, - arguments=[path_to_echo_server_script], - name='_hidden_echo_server', - namespace='my_ns', - remappings=[('echo', '_echo')], - additional_env=additional_env, - ), + echo_service_server_action, + echo_hidden_service_server_action, launch_testing.actions.ReadyToTest() ], additional_env=additional_env ) ] ), - ]) + ]), locals() class TestROS2ServiceCLI(unittest.TestCase): @@ -138,6 +144,31 @@ def launch_service_command(self, arguments): yield service_command cls.launch_service_command = launch_service_command + def test_service_echo(self, launch_service, proc_info, proc_output, path_to_echo_client_script, additional_env): + echo_service_client_action = Node( + executable=sys.executable, + arguments=[path_to_echo_client_script], + name='echo_client', + namespace='my_ns', + additional_env=additional_env + ) + + with self.launch_service_command( + arguments=['echo', '/my_ns/echo'] + ) as service_command: + with launch_testing.tools.launch_process( + launch_service, echo_service_client_action, + proc_info, proc_output + ) as client_node: + assert client_node.wait_for_shutdown(10) + + @launch_testing.markers.retry_on_failure(times=5, delay=1) + def test_echo_service_not_running(self): + with self.launch_service_command( + arguments=['echo', '/service_not_running'] + ) as service_command: + assert service_command.wait_for_output(timeout=20) + @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_list_services(self): with self.launch_service_command(arguments=['list']) as service_command: @@ -215,6 +246,7 @@ def test_find(self): strict=True ) + @launch_testing.markers.retry_on_failure(times=5, delay=1) def test_find_hidden(self): with self.launch_service_command( diff --git a/ros2topic/ros2topic/api/__init__.py b/ros2topic/ros2topic/api/__init__.py index c9688071c..f05237504 100644 --- a/ros2topic/ros2topic/api/__init__.py +++ b/ros2topic/ros2topic/api/__init__.py @@ -142,8 +142,9 @@ def _get_msg_class(node, topic, include_hidden_topics): try: return get_message(message_type) - except (AttributeError, ModuleNotFoundError, ValueError): - raise RuntimeError("The message type '%s' is invalid" % message_type) + except (AttributeError, ModuleNotFoundError, ValueError) as e: + raise e + # raise RuntimeError("The message type '%s' is invalid" % message_type) class TopicMessagePrototypeCompleter: