From 34fe5795dacb07398e1606f436a77d17b1ed5ae7 Mon Sep 17 00:00:00 2001 From: Michael Carlstrom Date: Sun, 4 Aug 2024 23:35:45 -0400 Subject: [PATCH] Add types to rcutils_logger.py (#1249) * Types to rcutils_logger.py Signed-off-by: Michael Carlstrom * remove unused types Signed-off-by: Michael Carlstrom * Add back empty dictionary Signed-off-by: Michael Carlstrom * rerun ci Signed-off-by: Michael Carlstrom * update Signed-off-by: Michael Carlstrom * Add ClassVar Signed-off-by: Michael Carlstrom * re-run Signed-off-by: Michael Carlstrom * re-run Signed-off-by: Michael Carlstrom * Remove explicit TypeAlias for older python versions Signed-off-by: Michael Carlstrom * Fix error suppresion Signed-off-by: Michael Carlstrom * Switch to ModuleNotFoundError Signed-off-by: Michael Carlstrom * Use typing.OrderedDict for python 3.8 support Signed-off-by: Michael Carlstrom --------- Signed-off-by: Michael Carlstrom Co-authored-by: Shane Loretz --- rclpy/rclpy/impl/rcutils_logger.py | 190 +++++++++++++++++++++-------- 1 file changed, 137 insertions(+), 53 deletions(-) diff --git a/rclpy/rclpy/impl/rcutils_logger.py b/rclpy/rclpy/impl/rcutils_logger.py index c14cd793c..041566397 100644 --- a/rclpy/rclpy/impl/rcutils_logger.py +++ b/rclpy/rclpy/impl/rcutils_logger.py @@ -12,23 +12,44 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from collections import namedtuple -from collections import OrderedDict +from contextlib import suppress import inspect import os +import sys +from types import FrameType +from typing import cast +from typing import ClassVar +from typing import Dict +from typing import List +from typing import Literal +from typing import NamedTuple +from typing import Optional +from typing import OrderedDict +from typing import Tuple +from typing import Type +from typing import TypedDict +from typing import Union from rclpy.clock import Clock from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy from rclpy.impl.logging_severity import LoggingSeverity +if sys.version_info >= (3, 12): + from typing import Unpack +else: + with suppress(ModuleNotFoundError): + from typing_extensions import Unpack + + +SupportedFiltersKeys = Literal['throttle', 'skip_first', 'once'] + # Known filenames from which logging methods can be called (will be ignored in `_find_caller`). -_internal_callers = [] +_internal_callers: List[str] = [] # This will cause rclpy filenames to be registered in `_internal_callers` on first logging call. _populate_internal_callers = True -def _find_caller(frame): +def _find_caller(frame: Optional[FrameType]) -> FrameType: """Get the first calling frame that is outside of rclpy.""" global _populate_internal_callers global _internal_callers @@ -44,17 +65,28 @@ def _find_caller(frame): ]) _populate_internal_callers = False + if frame is None: + raise ValueError('Cannot get frame info if frame is None') + file_path = os.path.realpath(inspect.getframeinfo(frame).filename) while any(f in file_path for f in _internal_callers): frame = frame.f_back + + if frame is None: + raise ValueError('Cannot get frame info if frame is None') + file_path = os.path.realpath(inspect.getframeinfo(frame).filename) return frame class CallerId( - namedtuple('CallerId', ['function_name', 'file_path', 'line_number', 'last_index'])): + NamedTuple('CallerId', + [('function_name', str), + ('file_path', str), + ('line_number', int), + ('last_index', int)])): - def __new__(cls, frame=None): + def __new__(cls, frame: Optional[FrameType] = None) -> 'CallerId': if not frame: frame = _find_caller(inspect.currentframe()) return super(CallerId, cls).__new__( @@ -66,6 +98,44 @@ def __new__(cls, frame=None): ) +class RcutilsLoggerContext(TypedDict): + name: str + severity: LoggingSeverity + filters: List[SupportedFiltersKeys] + + +class OnceContext(RcutilsLoggerContext): + has_been_logged_once: bool + + +class ThrottleContext(RcutilsLoggerContext): + throttle_duration_sec: float + throttle_time_source_type: Clock + throttle_last_logged: int + + +class SkipFirstContext(RcutilsLoggerContext): + first_has_been_skipped: bool + + +class LoggingFilterArgs(TypedDict, total=False): + once: bool + throttle_duration_sec: float + throttle_time_source_type: Clock + skip_first: bool + + +class LoggingFilterParams(TypedDict, total=False): + once: Optional[bool] + throttle_duration_sec: Optional[float] + throttle_time_source_type: Clock + skip_first: Optional[bool] + + +class LoggingArgs(LoggingFilterArgs, total=False): + name: str + + class LoggingFilter: """Base class for logging filters.""" @@ -74,44 +144,49 @@ class LoggingFilter: A default value of None makes a parameter required. """ - params = {} + params: ClassVar[LoggingFilterParams] = {} """ Initialize the context of a logging call, e.g. declare variables needed for determining the log condition and add them to the context. """ @classmethod - def initialize_context(cls, context, **kwargs): + def initialize_context(cls, context: RcutilsLoggerContext, + **kwargs: 'Unpack[LoggingFilterArgs]') -> None: # Store all parameters in the context so we can check that users never try to change them. - for param in cls.params: - context[param] = kwargs.get(param, cls.params[param]) - if context[param] is None: + for param_name, param_value in cls.params.items(): + kwargs_value = kwargs.get(param_name, param_value) + if kwargs_value is None: raise TypeError( 'Required parameter "{0}" was not specified for logging filter "{1}"' - .format(param, cls.__name__)) + .format(param_name, cls.__name__)) + context[param_name] = kwargs_value # type: ignore[literal-required] """ Decide if it's appropriate to log given a context, and update the context accordingly. """ @staticmethod - def should_log(context): + def should_log(context: RcutilsLoggerContext) -> bool: return True class Once(LoggingFilter): """Ignore all log calls except the first one.""" - params = { + params: ClassVar[LoggingFilterParams] = { 'once': None, } @classmethod - def initialize_context(cls, context, **kwargs): + def initialize_context(cls, context: RcutilsLoggerContext, + **kwargs: 'Unpack[LoggingFilterArgs]') -> None: + context = cast(OnceContext, context) super(Once, cls).initialize_context(context, **kwargs) context['has_been_logged_once'] = False @staticmethod - def should_log(context): + def should_log(context: RcutilsLoggerContext) -> bool: + context = cast(OnceContext, context) logging_condition = False if not context['has_been_logged_once']: logging_condition = True @@ -122,13 +197,15 @@ def should_log(context): class Throttle(LoggingFilter): """Ignore log calls if the last call is not longer ago than the specified duration.""" - params = { + params: ClassVar[LoggingFilterParams] = { 'throttle_duration_sec': None, 'throttle_time_source_type': Clock(), } @classmethod - def initialize_context(cls, context, **kwargs): + def initialize_context(cls, context: RcutilsLoggerContext, + **kwargs: 'Unpack[LoggingFilterArgs]') -> None: + context = cast(ThrottleContext, context) super(Throttle, cls).initialize_context(context, **kwargs) context['throttle_last_logged'] = 0 if not isinstance(context['throttle_time_source_type'], Clock): @@ -138,7 +215,8 @@ def initialize_context(cls, context, **kwargs): .format(context['throttle_time_source_type'])) @staticmethod - def should_log(context): + def should_log(context: RcutilsLoggerContext) -> bool: + context = cast(ThrottleContext, context) logging_condition = True now = context['throttle_time_source_type'].now().nanoseconds next_log_time = context['throttle_last_logged'] + (context['throttle_duration_sec'] * 1e+9) @@ -151,17 +229,20 @@ def should_log(context): class SkipFirst(LoggingFilter): """Ignore the first log call but process all subsequent calls.""" - params = { + params: ClassVar[LoggingFilterParams] = { 'skip_first': None, } @classmethod - def initialize_context(cls, context, **kwargs): + def initialize_context(cls, context: RcutilsLoggerContext, + **kwargs: 'Unpack[LoggingFilterArgs]') -> None: + context = cast(SkipFirstContext, context) super(SkipFirst, cls).initialize_context(context, **kwargs) context['first_has_been_skipped'] = False @staticmethod - def should_log(context): + def should_log(context: RcutilsLoggerContext) -> bool: + context = cast(SkipFirstContext, context) logging_condition = True if not context['first_has_been_skipped']: logging_condition = False @@ -170,20 +251,20 @@ def should_log(context): # The ordering of this dictionary defines the order in which filters will be processed. -supported_filters = OrderedDict() +supported_filters: OrderedDict[SupportedFiltersKeys, Type[LoggingFilter]] = OrderedDict() supported_filters['throttle'] = Throttle supported_filters['skip_first'] = SkipFirst supported_filters['once'] = Once -def get_filters_from_kwargs(**kwargs): +def get_filters_from_kwargs(**kwargs: 'Unpack[LoggingFilterArgs]') -> List[SupportedFiltersKeys]: """ Determine which filters have had parameters specified in the given keyword arguments. Returns the list of filters using the order specified by `supported_filters`. """ - detected_filters = [] - all_supported_params = [] + detected_filters: List[SupportedFiltersKeys] = [] + all_supported_params: List[str] = [] for supported_filter, filter_class in supported_filters.items(): filter_params = filter_class.params.keys() all_supported_params.extend(filter_params) @@ -201,7 +282,7 @@ def get_filters_from_kwargs(**kwargs): 'required parameter "{0}" not specified ' 'but is required for the the logging filter "{1}"'.format( param_name, detected_filter)) - kwargs[param_name] = default_value + kwargs[param_name] = default_value # type: ignore for kwarg in kwargs: if kwarg not in all_supported_params: raise TypeError( @@ -213,18 +294,18 @@ def get_filters_from_kwargs(**kwargs): class RcutilsLogger: - def __init__(self, name=''): + def __init__(self, name: str = '') -> None: self.name = name - self.logger_sublogger_namepair = None - self.contexts = {} + self.logger_sublogger_namepair: Optional[Tuple[str, str]] = None + self.contexts: Dict[CallerId, RcutilsLoggerContext] = {} - def __del__(self): + def __del__(self) -> None: if self.logger_sublogger_namepair: _rclpy.rclpy_logging_rosout_remove_sublogger( self.logger_sublogger_namepair[0], self.logger_sublogger_namepair[1]) self.logger_sublogger_namepair = None - def get_child(self, name): + def get_child(self, name: str) -> 'RcutilsLogger': if not name: raise ValueError('Child logger name must not be empty.') @@ -239,20 +320,22 @@ def get_child(self, name): logger.logger_sublogger_namepair = (self.name, name) return logger - def set_level(self, level): + def set_level(self, level: Union[int, LoggingSeverity]) -> None: level = LoggingSeverity(level) - return _rclpy.rclpy_logging_set_logger_level(self.name, level) + _rclpy.rclpy_logging_set_logger_level(self.name, level) - def get_effective_level(self): + def get_effective_level(self) -> LoggingSeverity: level = LoggingSeverity( _rclpy.rclpy_logging_get_logger_effective_level(self.name)) return level - def is_enabled_for(self, severity): + def is_enabled_for(self, severity: Union[int, LoggingSeverity]) -> bool: severity = LoggingSeverity(severity) - return _rclpy.rclpy_logging_logger_is_enabled_for(self.name, severity) + result: bool = _rclpy.rclpy_logging_logger_is_enabled_for(self.name, severity) + return result - def log(self, message, severity, **kwargs): + def log(self, message: str, severity: Union[int, LoggingSeverity], + name: Optional[str] = None, **kwargs: 'Unpack[LoggingFilterArgs]') -> bool: r""" Log a message with the specified severity. @@ -265,10 +348,9 @@ def log(self, message, severity, **kwargs): Logging filters will only be evaluated if the logger is enabled for the message's severity. - :param message str: message to log. + :param message: message to log. :param severity: severity of the message. - :type severity: :py:class:LoggingSeverity - :keyword name str: name of the logger to use. + :keyword name: name of the logger to use. :param \**kwargs: optional parameters for logging filters (see below). :Keyword Arguments: @@ -284,7 +366,6 @@ def log(self, message, severity, **kwargs): :returns: False if a filter caused the message to not be logged; True otherwise. :raises: TypeError on invalid filter parameter combinations. :raises: ValueError on invalid parameters values. - :rtype: bool """ # Gather context info and check filters only if the severity is appropriate. if not self.is_enabled_for(severity): @@ -292,7 +373,8 @@ def log(self, message, severity, **kwargs): severity = LoggingSeverity(severity) - name = kwargs.pop('name', self.name) + if name is None: + name = self.name # Infer the requested log filters from the keyword arguments detected_filters = get_filters_from_kwargs(**kwargs) @@ -300,11 +382,11 @@ def log(self, message, severity, **kwargs): # Get/prepare the context corresponding to the caller. caller_id = CallerId() if caller_id not in self.contexts: - context = {'name': name, 'severity': severity} + context: RcutilsLoggerContext = {'name': name, 'severity': severity, + 'filters': detected_filters} for detected_filter in detected_filters: if detected_filter in supported_filters: supported_filters[detected_filter].initialize_context(context, **kwargs) - context['filters'] = detected_filters self.contexts[caller_id] = context else: context = self.contexts[caller_id] @@ -317,7 +399,9 @@ def log(self, message, severity, **kwargs): raise ValueError('Requested logging filters cannot be changed between calls.') for detected_filter in detected_filters: filter_params = supported_filters[detected_filter].params - if any(context[p] != kwargs.get(p, filter_params[p]) for p in filter_params): + if any( + context[p] != kwargs.get(p, filter_params[p]) # type: ignore + for p in filter_params): raise ValueError( 'Logging filter parameters cannot be changed between calls.') @@ -334,19 +418,19 @@ def log(self, message, severity, **kwargs): caller_id.function_name, caller_id.file_path, caller_id.line_number) return True - def debug(self, message, **kwargs): + def debug(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """Log a message with `DEBUG` severity via :py:classmethod:RcutilsLogger.log:.""" return self.log(message, LoggingSeverity.DEBUG, **kwargs) - def info(self, message, **kwargs): + def info(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """Log a message with `INFO` severity via :py:classmethod:RcutilsLogger.log:.""" return self.log(message, LoggingSeverity.INFO, **kwargs) - def warning(self, message, **kwargs): + def warning(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """Log a message with `WARN` severity via :py:classmethod:RcutilsLogger.log:.""" return self.log(message, LoggingSeverity.WARN, **kwargs) - def warn(self, message, **kwargs): + def warn(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """ Log a message with `WARN` severity via :py:classmethod:RcutilsLogger.log:. @@ -354,10 +438,10 @@ def warn(self, message, **kwargs): """ return self.warning(message, **kwargs) - def error(self, message, **kwargs): + def error(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """Log a message with `ERROR` severity via :py:classmethod:RcutilsLogger.log:.""" return self.log(message, LoggingSeverity.ERROR, **kwargs) - def fatal(self, message, **kwargs): + def fatal(self, message: str, **kwargs: 'Unpack[LoggingArgs]') -> bool: """Log a message with `FATAL` severity via :py:classmethod:RcutilsLogger.log:.""" return self.log(message, LoggingSeverity.FATAL, **kwargs)