From b25877f97dd16ffd7d24f72ac560e0f128985a92 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Thu, 8 Aug 2024 07:53:23 -0400 Subject: [PATCH] Adds types to TypeDescriptionService. (#1329) Signed-off-by: Michael Carlstrom --- rclpy/rclpy/type_description_service.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/rclpy/rclpy/type_description_service.py b/rclpy/rclpy/type_description_service.py index 0bdc640a1..0b7dcc39b 100644 --- a/rclpy/rclpy/type_description_service.py +++ b/rclpy/rclpy/type_description_service.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING import weakref from rcl_interfaces.msg import ParameterDescriptor @@ -29,6 +30,10 @@ START_TYPE_DESCRIPTION_SERVICE_PARAM = 'start_type_description_service' +if TYPE_CHECKING: + from rclpy.node import Node + + class TypeDescriptionService: """ Optionally initializes and contains the ~/get_type_description service. @@ -39,7 +44,7 @@ class TypeDescriptionService: This is not intended for use by end users, rather it is a component to be used by Node. """ - def __init__(self, node): + def __init__(self, node: 'Node'): """Initialize the service, if the parameter is set to true.""" self._node_weak_ref = weakref.ref(node) node_name = node.get_name() @@ -73,13 +78,13 @@ def __init__(self, node): if self.enabled: self._start_service() - def destroy(self): + def destroy(self) -> None: # Required manual destruction because this is not managed by rclpy.Service if self._type_description_srv is not None: self._type_description_srv.destroy_when_not_in_use() self._type_description_srv = None - def _start_service(self): + def _start_service(self) -> None: node = self._get_node() self._type_description_srv = _rclpy.TypeDescriptionService(node.handle) # Because we are creating our own service wrapper, must manually add the service @@ -96,11 +101,15 @@ def _start_service(self): node._services.append(service) node._wake_executor() - def _service_callback(self, request, response): + def _service_callback( + self, + request: GetTypeDescription.Request, + response: GetTypeDescription.Response + ) -> GetTypeDescription.Response: return self._type_description_srv.handle_request( request, GetTypeDescription.Response, self._get_node().handle) - def _get_node(self): + def _get_node(self) -> 'Node': node = self._node_weak_ref() if node is None: raise ReferenceError('Expected valid node weak reference')