Skip to content

Commit

Permalink
Adds types to TypeDescriptionService. (#1329)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Carlstrom <[email protected]>
  • Loading branch information
InvincibleRMC authored Aug 8, 2024
1 parent a485f90 commit b25877f
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions rclpy/rclpy/type_description_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING
import weakref

from rcl_interfaces.msg import ParameterDescriptor
Expand All @@ -29,6 +30,10 @@
START_TYPE_DESCRIPTION_SERVICE_PARAM = 'start_type_description_service'


if TYPE_CHECKING:
from rclpy.node import Node


class TypeDescriptionService:
"""
Optionally initializes and contains the ~/get_type_description service.
Expand All @@ -39,7 +44,7 @@ class TypeDescriptionService:
This is not intended for use by end users, rather it is a component to be used by Node.
"""

def __init__(self, node):
def __init__(self, node: 'Node'):
"""Initialize the service, if the parameter is set to true."""
self._node_weak_ref = weakref.ref(node)
node_name = node.get_name()
Expand Down Expand Up @@ -73,13 +78,13 @@ def __init__(self, node):
if self.enabled:
self._start_service()

def destroy(self):
def destroy(self) -> None:
# Required manual destruction because this is not managed by rclpy.Service
if self._type_description_srv is not None:
self._type_description_srv.destroy_when_not_in_use()
self._type_description_srv = None

def _start_service(self):
def _start_service(self) -> None:
node = self._get_node()
self._type_description_srv = _rclpy.TypeDescriptionService(node.handle)
# Because we are creating our own service wrapper, must manually add the service
Expand All @@ -96,11 +101,15 @@ def _start_service(self):
node._services.append(service)
node._wake_executor()

def _service_callback(self, request, response):
def _service_callback(
self,
request: GetTypeDescription.Request,
response: GetTypeDescription.Response
) -> GetTypeDescription.Response:
return self._type_description_srv.handle_request(
request, GetTypeDescription.Response, self._get_node().handle)

def _get_node(self):
def _get_node(self) -> 'Node':
node = self._node_weak_ref()
if node is None:
raise ReferenceError('Expected valid node weak reference')
Expand Down

0 comments on commit b25877f

Please sign in to comment.