diff --git a/source/modulo_components/modulo_components/component.py b/source/modulo_components/modulo_components/component.py index ec5fbbe2..86394cfd 100644 --- a/source/modulo_components/modulo_components/component.py +++ b/source/modulo_components/modulo_components/component.py @@ -30,10 +30,10 @@ def _step(self): Step function that is called periodically. """ try: - self._evaluate_periodic_callbacks() + self._evaluate_periodic_component_callbacks() self.on_step_callback() - self._publish_outputs() - self._publish_predicates() + self._publish_component_outputs() + self._publish_component_predicates() except Exception as e: self.get_logger().error(f"Failed to execute step function: {e}", throttle_duration_sec=1.0) self.raise_error() @@ -88,12 +88,12 @@ def add_output(self, signal_name: str, data: str, message_type: MsgT, :param publish_on_step: If true, the output is published periodically on step """ try: - parsed_signal_name = self._create_output(signal_name, data, message_type, clproto_message_type, - default_topic, fixed_topic, publish_on_step) + parsed_signal_name = self._create_component_output(signal_name, data, message_type, clproto_message_type, + default_topic, fixed_topic, publish_on_step) topic_name = self.get_parameter_value(parsed_signal_name + "_topic") self.get_logger().debug(f"Adding output '{parsed_signal_name}' with topic name '{topic_name}'.") - publisher = self.create_publisher(message_type, topic_name, self._qos) - self._outputs[parsed_signal_name]["publisher"] = publisher + publisher = self.create_publisher(message_type, topic_name, self.get_qos()) + self._set_output_publisher(parsed_signal_name, publisher) except Exception as e: self.get_logger().error(f"Failed to add output '{signal_name}': {e}") @@ -103,4 +103,4 @@ def raise_error(self): """ super().raise_error() self.set_predicate("in_error_state", True) - self._finalize_interfaces() + self._finalize_component_interfaces() diff --git a/source/modulo_components/modulo_components/component_interface.py b/source/modulo_components/modulo_components/component_interface.py index bd47e863..3f9df3d0 100644 --- a/source/modulo_components/modulo_components/component_interface.py +++ b/source/modulo_components/modulo_components/component_interface.py @@ -45,37 +45,37 @@ def __init__(self, node_name: str, *args, **kwargs): node_kwargs = {key: value for key, value in kwargs.items() if key in NODE_KWARGS} super().__init__(node_name, *args, **node_kwargs) self.__step_lock = Lock() - self._parameter_dict: Dict[str, Union[str, sr.Parameter]] = {} + self.__parameter_dict: Dict[str, Union[str, sr.Parameter]] = {} self.__read_only_parameters: Dict[str, bool] = {} self.__pre_set_parameters_callback_called = False self.__set_parameters_result = SetParametersResult() - self._predicates: Dict[str, Predicate] = {} - self._triggers: List[str] = [] - self._periodic_callbacks: Dict[str, Callable[[], None]] = {} - self._inputs = {} - self._outputs = {} - self._periodic_outputs: Dict[str, bool] = {} - self._services_dict: Dict[str, Service] = {} + self.__predicates: Dict[str, Predicate] = {} + self.__triggers: List[str] = [] + self.__periodic_callbacks: Dict[str, Callable[[], None]] = {} + self.__inputs = {} + self.__outputs = {} + self.__periodic_outputs: Dict[str, bool] = {} + self.__services_dict: Dict[str, Service] = {} self.__tf_buffer: Optional[Buffer] = None self.__tf_listener: Optional[TransformListener] = None self.__tf_broadcaster: Optional[TransformBroadcaster] = None self.__static_tf_broadcaster: Optional[StaticTransformBroadcaster] = None - self._qos = QoSProfile(depth=10) + self.__qos = QoSProfile(depth=10) self.add_pre_set_parameters_callback(self.__pre_set_parameters_callback) self.add_on_set_parameters_callback(self.__on_set_parameters_callback) self.add_parameter(sr.Parameter("rate", 10.0, sr.ParameterType.DOUBLE), "The rate in Hertz for all periodic callbacks") - self._predicate_publisher = self.create_publisher(PredicateCollection, "/predicates", self._qos) + self.__predicate_publisher = self.create_publisher(PredicateCollection, "/predicates", self.__qos) self.__predicate_message = PredicateCollection() self.__predicate_message.node = self.get_fully_qualified_name() self.__predicate_message.type = PredicateCollection.COMPONENT - self._rate = self.get_parameter_value("rate") - self._period = 1.0 / self._rate - self.__step_timer = self.create_timer(self._period, self.__step_with_mutex) + self.__rate = self.get_parameter_value("rate") + self.__period = 1.0 / self.__rate + self.__step_timer = self.create_timer(self.__period, self.__step_with_mutex) def __del__(self): self.__step_lock.acquire() @@ -86,7 +86,7 @@ def get_rate(self): :return: The component rate """ - return self._rate + return self.__rate def get_period(self): """ @@ -94,7 +94,7 @@ def get_period(self): :return: The component period """ - return self._period + return self.__period def __step_with_mutex(self): if self.__step_lock.acquire(blocking=False): @@ -144,7 +144,7 @@ def add_parameter(self, parameter: Union[str, sr.Parameter], description: str, r raise ParameterError(f"Failed to add parameter: {e}") if not self.has_parameter(sr_parameter.get_name()): self.get_logger().debug(f"Adding parameter '{sr_parameter.get_name()}'.") - self._parameter_dict[sr_parameter.get_name()] = parameter + self.__parameter_dict[sr_parameter.get_name()] = parameter self.__read_only_parameters[sr_parameter.get_name()] = False try: descriptor = ParameterDescriptor(description=description, read_only=read_only) @@ -162,7 +162,7 @@ def add_parameter(self, parameter: Union[str, sr.Parameter], description: str, r raise ParameterError(result.reason) self.__read_only_parameters[sr_parameter.get_name()] = read_only except Exception as e: - del self._parameter_dict[sr_parameter.get_name()] + del self.__parameter_dict[sr_parameter.get_name()] del self.__read_only_parameters[sr_parameter.get_name()] raise ParameterError(f"Failed to add parameter: {e}") else: @@ -196,13 +196,13 @@ def __get_component_parameter(self, name: str) -> sr.Parameter: :raises ParameterError: if the parameter does not exist :return: The parameter, if it exists """ - if name not in self._parameter_dict.keys(): + if name not in self.__parameter_dict.keys(): raise ParameterError(f"Parameter '{name}' is not in the dict of parameters") try: - if isinstance(self._parameter_dict[name], str): - return self.__getattribute__(self._parameter_dict[name]) + if isinstance(self.__parameter_dict[name], str): + return self.__getattribute__(self.__parameter_dict[name]) else: - return self._parameter_dict[name] + return self.__parameter_dict[name] except AttributeError as e: raise ParameterError(f"{e}") @@ -288,10 +288,10 @@ def __pre_set_parameters_callback(self, ros_parameters: List[Parameter]) -> List result.successful = False result.reason = f"Validation of parameter '{ros_param.name}' returned false!" else: - if isinstance(self._parameter_dict[ros_param.name], str): - self.__setattr__(self._parameter_dict[ros_param.name], new_parameter) + if isinstance(self.__parameter_dict[ros_param.name], str): + self.__setattr__(self.__parameter_dict[ros_param.name], new_parameter) else: - self._parameter_dict[ros_param.name] = new_parameter + self.__parameter_dict[ros_param.name] = new_parameter new_parameters.append(write_parameter(new_parameter)) except Exception as e: result.successful = False @@ -320,15 +320,15 @@ def add_predicate(self, name: str, predicate: Union[bool, Callable[[], bool]]): """ if not name: self.get_logger().error("Failed to add predicate: Provide a non empty string as a name.") - if name in self._predicates.keys(): + if name in self.__predicates.keys(): self.get_logger().warn(f"Predicate with name '{name}' already exists, overwriting.") else: self.get_logger().debug(f"Adding predicate '{name}'.") try: if callable(predicate): - self._predicates[name] = Predicate(predicate) + self.__predicates[name] = Predicate(predicate) else: - self._predicates[name] = Predicate(lambda: predicate) + self.__predicates[name] = Predicate(lambda: predicate) except Exception as e: self.get_logger().error(f"Failed to add predicate '{name}': {e}") @@ -340,13 +340,13 @@ def get_predicate(self, name: str) -> bool: :param name: The name of the predicate to retrieve from the map of predicates :return: The value of the predicate as a boolean """ - if name not in self._predicates.keys(): + if name not in self.__predicates.keys(): self.get_logger().error( f"Failed to get predicate '{name} ': Predicate does not exist, returning false.", throttle_duration_sec=1.0) return False try: - return self._predicates[name].get_value() + return self.__predicates[name].get_value() except Exception as e: self.get_logger().error(f"Failed to evaluate callback of predicate '{ name}', returning false: {e}", throttle_duration_sec=1.0) @@ -361,21 +361,21 @@ def set_predicate(self, name: str, predicate: Union[bool, Callable[[], bool]]): :param name: The name of the predicate to retrieve from the map of predicates :param predicate: The new value of the predicate as a bool or a callable function """ - if name not in self._predicates.keys(): + if name not in self.__predicates.keys(): self.get_logger().error(f"Failed to set predicate '{ name}': Predicate does not exist.", throttle_duration_sec=1.0) return try: if callable(predicate): - self._predicates[name].set_predicate(predicate) + self.__predicates[name].set_predicate(predicate) else: - self._predicates[name].set_predicate(lambda: predicate) + self.__predicates[name].set_predicate(lambda: predicate) except Exception as e: self.get_logger().error(f"Failed to set predicate '{name}': {e}", throttle_duration_sec=1.0) return - new_value = self._predicates[name].query() + new_value = self.__predicates[name].query() if new_value is not None: - self._publish_predicate(name, new_value) + self.__publish_predicate(name, new_value) def add_trigger(self, trigger_name: str): """ @@ -387,13 +387,13 @@ def add_trigger(self, trigger_name: str): if not trigger_name: self.get_logger().error("Failed to add trigger: Provide a non empty string as a name.") return - if trigger_name in self._triggers: + if trigger_name in self.__triggers: self.get_logger().error(f"Failed to add trigger: there is already a trigger with name '{trigger_name}'.") return - if trigger_name in self._predicates.keys(): + if trigger_name in self.__predicates.keys(): self.get_logger().error(f"Failed to add trigger: there is already a predicate with name '{trigger_name}'.") return - self._triggers.append(trigger_name) + self.__triggers.append(trigger_name) self.add_predicate(trigger_name, False) def trigger(self, trigger_name: str): @@ -402,27 +402,28 @@ def trigger(self, trigger_name: str): :param trigger_name: The name of the trigger """ - if trigger_name not in self._triggers: + if trigger_name not in self.__triggers: self.get_logger().error(f"Failed to trigger: could not find trigger with name '{trigger_name}'.") return self.set_predicate(trigger_name, True) # reset the trigger to be published on the next step - self._predicates[trigger_name].set_predicate(lambda: False) + self.__predicates[trigger_name].set_predicate(lambda: False) def remove_output(self, signal_name): - if signal_name not in self._outputs.keys(): + if signal_name not in self.__outputs.keys(): parsed_signal_name = parse_topic_name(signal_name) - if parsed_signal_name not in self._outputs.keys(): + if parsed_signal_name not in self.__outputs.keys(): self.get_logger().debug(f"Unknown output '{signal_name}' (parsed name was '{parsed_signal_name}').") return signal_name = parsed_signal_name - if "publisher" in self._outputs[signal_name].keys(): - self.destroy_publisher(self._outputs[signal_name]["publisher"]) - self._outputs.pop(signal_name) + if "publisher" in self.__outputs[signal_name].keys(): + self.destroy_publisher(self.__outputs[signal_name]["publisher"]) + self.__outputs.pop(signal_name) self.get_logger().debug(f"Removing signal '{signal_name}'.") - def _create_output(self, signal_name: str, data: str, message_type: MsgT, clproto_message_type: clproto.MessageType, - default_topic: str, fixed_topic: bool, publish_on_step: bool) -> str: + def _create_component_output(self, signal_name: str, data: str, message_type: MsgT, + clproto_message_type: clproto.MessageType, default_topic: str, fixed_topic: bool, + publish_on_step: bool) -> str: """ Helper function to parse the signal name and add an output without Publisher to the dict of outputs. @@ -454,19 +455,44 @@ def write_ros_msg(message, data): translator = write_ros_msg else: raise AddSignalError("The provided message type is not supported to create a component output.") - self._outputs[parsed_signal_name] = {"attribute": data, "message_type": message_type, - "translator": translator} - self._periodic_outputs[parsed_signal_name] = publish_on_step + self.__outputs[parsed_signal_name] = {"attribute": data, "message_type": message_type, + "translator": translator} + self.__periodic_outputs[parsed_signal_name] = publish_on_step return parsed_signal_name except AddSignalError: raise except Exception as e: raise AddSignalError(f"{e}") + def _get_output_publisher(self, output_name: str): + """ + Set the publisher object for a component output. + + :param output_name: The name of the output + :raises CoreError: if the output name is invalid + :return: The publisher object of the output + """ + try: + return self.__outputs[output_name]["publisher"] + except Exception as e: + raise CoreError(f"Failed to get output publisher: {e}") + + def _set_output_publisher(self, output_name: str, publisher): + """ + Set the publisher object for a component output. + + :param output_name: The name of the output + :param publisher: The publisher object for the output + """ + try: + self.__outputs[output_name]["publisher"] = publisher + except Exception as e: + self.get_logger().warn(f"Failed to set output publisher: {e}") + def remove_input(self, signal_name: str): - if not self.destroy_subscription(self._inputs.pop(signal_name, None)): + if not self.destroy_subscription(self.__inputs.pop(signal_name, None)): parsed_signal_name = parse_topic_name(signal_name) - if not self.destroy_subscription(self._inputs.pop(parsed_signal_name, None)): + if not self.destroy_subscription(self.__inputs.pop(parsed_signal_name, None)): self.get_logger().debug(f"Unknown input '{signal_name}' (parsed name was '{parsed_signal_name}').") return self.get_logger().debug(f"Removing signal '{parsed_signal_name}'.") @@ -520,9 +546,9 @@ def __declare_signal(self, signal_name: str, signal_type: str, default_topic="", self.get_logger().warn( f"The parsed name for {signal_type} '{signal_name}' is '{parsed_signal_name}'." "Use the parsed name to refer to this {signal_type}.") - if parsed_signal_name in self._inputs.keys(): + if parsed_signal_name in self.__inputs.keys(): raise AddSignalError(f"Signal with name '{parsed_signal_name}' already exists as input.") - if parsed_signal_name in self._outputs.keys(): + if parsed_signal_name in self.__outputs.keys(): raise AddSignalError(f"Signal with name '{parsed_signal_name}' already exists as output.") topic_name = default_topic if default_topic else "~/" + parsed_signal_name parameter_name = parsed_signal_name + "_topic" @@ -578,8 +604,8 @@ def add_input(self, signal_name: str, subscription: Union[str, Callable], messag if user_callback: self.get_logger().warn("Providing a callable for arguments 'subscription' and 'user_callback' is" "not supported. The user callback will be ignored.") - self._inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, subscription, - self._qos) + self.__inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, subscription, + self.__qos) elif isinstance(subscription, str): if callable(user_callback): signature = inspect.signature(user_callback) @@ -597,31 +623,31 @@ def default_callback(): message_type == Float64MultiArray or message_type == Int32 or message_type == String: read_message = partial(self.__read_translated_message, reader=modulo_readers.read_std_message) - self._inputs[parsed_signal_name] = \ + self.__inputs[parsed_signal_name] = \ self.create_subscription(message_type, topic_name, partial(self.__subscription_callback, attribute_name=subscription, read_message=read_message, user_callback=user_callback), - self._qos) + self.__qos) elif message_type == EncodedState: read_message = partial(self.__read_translated_message, reader=modulo_readers.read_clproto_message) - self._inputs[parsed_signal_name] = \ + self.__inputs[parsed_signal_name] = \ self.create_subscription(message_type, topic_name, partial(self.__subscription_callback, attribute_name=subscription, read_message=read_message, user_callback=user_callback), - self._qos) + self.__qos) elif hasattr(message_type, 'get_fields_and_field_types'): - self._inputs[parsed_signal_name] = \ + self.__inputs[parsed_signal_name] = \ self.create_subscription(message_type, topic_name, partial(self.__subscription_callback, attribute_name=subscription, read_message=self.__read_custom_message, user_callback=user_callback), - self._qos) + self.__qos) else: raise TypeError("The provided message type is not supported to create a component input.") else: @@ -673,7 +699,7 @@ def callback_wrapper(request, response, cb): self.get_logger().warn( f"The parsed name for service '{service_name}' is '{parsed_service_name}'." "Use the parsed name to refer to this service.") - if parsed_service_name in self._services_dict.keys(): + if parsed_service_name in self.__services_dict.keys(): raise AddServiceError(f"Service with name '{parsed_service_name}' already exists.") signature = inspect.signature(callback) if len(signature.parameters) == 0: @@ -682,7 +708,7 @@ def callback_wrapper(request, response, cb): else: self.get_logger().debug(f"Adding string service '{parsed_service_name}'.") service_type = StringTrigger - self._services_dict[parsed_service_name] = \ + self.__services_dict[parsed_service_name] = \ self.create_service(service_type, "~/" + parsed_service_name, lambda request, response: callback_wrapper(request, response, callback)) except Exception as e: @@ -781,7 +807,7 @@ def get_qos(self) -> QoSProfile: """ Getter of the Quality of Service attribute. """ - return self._qos + return self.__qos def set_qos(self, qos: QoSProfile): """ @@ -789,7 +815,7 @@ def set_qos(self, qos: QoSProfile): :param qos: The desired Quality of Service """ - self._qos = qos + self.__qos = qos def add_periodic_callback(self, name: str, callback: Callable[[], None]): """ @@ -801,11 +827,11 @@ def add_periodic_callback(self, name: str, callback: Callable[[], None]): if not name: self.get_logger().error("Failed to add periodic function: Provide a non empty string as a name.") return - if name in self._periodic_callbacks.keys(): + if name in self.__periodic_callbacks.keys(): self.get_logger().warn(f"Periodic function '{name}' already exists, overwriting.") else: self.get_logger().debug(f"Adding periodic function '{name}'.") - self._periodic_callbacks[name] = callback + self.__periodic_callbacks[name] = callback def __get_predicate_message(self, name: str, value: bool) -> Predicate: """ @@ -819,7 +845,7 @@ def __get_predicate_message(self, name: str, value: bool) -> Predicate: message.value = value return message - def _publish_predicate(self, name: str, value: bool): + def __publish_predicate(self, name: str, value: bool): """ Helper function to publish a predicate. @@ -828,19 +854,19 @@ def _publish_predicate(self, name: str, value: bool): """ message = copy.copy(self.__predicate_message) message.predicates = [self.__get_predicate_message(name, value)] - self._predicate_publisher.publish(message) + self.__predicate_publisher.publish(message) - def _publish_predicates(self): + def _publish_component_predicates(self): """ Helper function to publish all predicates. """ message = copy.deepcopy(self.__predicate_message) - for name in self._predicates.keys(): - new_value = self._predicates[name].query() + for name in self.__predicates.keys(): + new_value = self.__predicates[name].query() if new_value is not None: message.predicates.append(self.__get_predicate_message(name, new_value)) if len(message.predicates): - self._predicate_publisher.publish(message) + self.__predicate_publisher.publish(message) def __translate_and_publish(self, output_name: str): """ @@ -848,12 +874,12 @@ def __translate_and_publish(self, output_name: str): :param output_name: The name of the output """ - message = self._outputs[output_name]["message_type"]() - data = self.__getattribute__(self._outputs[output_name]["attribute"]) + message = self.__outputs[output_name]["message_type"]() + data = self.__getattribute__(self.__outputs[output_name]["attribute"]) # only publish if the data is not empty if not getattr(data, "is_empty", lambda: False)(): - self._outputs[output_name]["translator"](message, data) - self._outputs[output_name]["publisher"].publish(message) + self.__outputs[output_name]["translator"](message, data) + self.__outputs[output_name]["publisher"].publish(message) def publish_output(self, signal_name: str): """ @@ -863,31 +889,31 @@ def publish_output(self, signal_name: str): :raises CoreError: if the output is being published periodically or if the signal name could not be found """ parsed_signal_name = parse_topic_name(signal_name) - if parsed_signal_name not in self._outputs.keys(): + if parsed_signal_name not in self.__outputs.keys(): raise CoreError(f"Output with name '{signal_name}' doesn't exist") - if self._periodic_outputs[parsed_signal_name]: + if self.__periodic_outputs[parsed_signal_name]: raise CoreError("An output that is published periodically cannot be triggered manually") try: self.__translate_and_publish(parsed_signal_name) except Exception as e: self.get_logger().error(f"Failed to publish output '{parsed_signal_name}': {e}") - def _publish_outputs(self): + def _publish_component_outputs(self): """ Helper function to publish all outputs. """ - for output in self._outputs.keys(): + for output in self.__outputs.keys(): try: - if self._periodic_outputs[output]: + if self.__periodic_outputs[output]: self.__translate_and_publish(output) except Exception as e: self.get_logger().error(f"{e}") - def _evaluate_periodic_callbacks(self): + def _evaluate_periodic_component_callbacks(self): """ Helper function to evaluate all periodic function callbacks. """ - for name, callback in self._periodic_callbacks.items(): + for name, callback in self.__periodic_callbacks.items(): try: callback() except Exception as e: @@ -923,18 +949,18 @@ def raise_error(self): """ self.get_logger().error("An error was raised in the component.") - def _finalize_interfaces(self): + def _finalize_component_interfaces(self): """ Finalize all interfaces. """ - self._inputs = {} - self._outputs = {} - self._services_dict = {} + self.__inputs = {} + self.__outputs = {} + self.__services_dict = {} self.__tf_buffer = None self.__tf_listener = None self.__tf_broadcaster = None self.__static_tf_broadcaster = None - self._predicate_publisher = None + self.__predicate_publisher = None if self.__step_timer: self.__step_timer.cancel() self.__step_timer = None diff --git a/source/modulo_components/modulo_components/lifecycle_component.py b/source/modulo_components/modulo_components/lifecycle_component.py index f4c22d76..ba7c7eb7 100644 --- a/source/modulo_components/modulo_components/lifecycle_component.py +++ b/source/modulo_components/modulo_components/lifecycle_component.py @@ -26,7 +26,8 @@ def __init__(self, node_name: str, *args, **kwargs): lifecycle_node_kwargs = {key: value for key, value in kwargs.items() if key in LIFECYCLE_NODE_MIXIN_KWARGS} ComponentInterface.__init__(self, node_name, *args, **kwargs) LifecycleNodeMixin.__init__(self, *args, **lifecycle_node_kwargs) - self._has_error = False + self.__outputs = {} + self.__has_error = False def get_lifecycle_state(self) -> LifecycleState: """ @@ -226,12 +227,13 @@ def on_shutdown(self, previous_state: LifecycleState) -> TransitionCallbackRetur TRANSITION_CALLBACK_SUCCESS transitions to 'Finalized' TRANSITION_CALLBACK_FAILURE, TRANSITION_CALLBACK_ERROR or any uncaught exceptions to 'ErrorProcessing' """ + def error_processing(self): self.get_logger().error("Entering into the error processing transition state.") return TransitionCallbackReturn.ERROR self.get_logger().debug(f"on_shutdown called from previous state {previous_state.label}.") - if not self._has_error: + if not self.__has_error: if previous_state.state_id == State.PRIMARY_STATE_FINALIZED: return TransitionCallbackReturn.SUCCESS if previous_state.state_id == State.PRIMARY_STATE_ACTIVE: @@ -243,7 +245,7 @@ def error_processing(self): return error_processing(self) if not self.__handle_shutdown(): return error_processing(self) - self._finalize_interfaces() + self._finalize_component_interfaces() return TransitionCallbackReturn.SUCCESS if previous_state.state_id == State.PRIMARY_STATE_INACTIVE: if not self.__handle_cleanup(): @@ -251,12 +253,12 @@ def error_processing(self): return error_processing(self) if not self.__handle_shutdown(): return error_processing(self) - self._finalize_interfaces() + self._finalize_component_interfaces() return TransitionCallbackReturn.SUCCESS if previous_state.state_id == State.PRIMARY_STATE_UNCONFIGURED: if not self.__handle_shutdown(): return error_processing(self) - self._finalize_interfaces() + self._finalize_component_interfaces() return TransitionCallbackReturn.SUCCESS self.get_logger().warn(f"Invalid transition 'shutdown' from state {previous_state.label}.") return error_processing(self) @@ -303,9 +305,9 @@ def on_error(self, previous_state: LifecycleState) -> TransitionCallbackReturn: error_handled = False if not error_handled: self.get_logger().error("Error processing failed! Entering into the finalized state.") - self._finalize_interfaces() + self._finalize_component_interfaces() return TransitionCallbackReturn.ERROR - self._has_error = False + self.__has_error = False return TransitionCallbackReturn.SUCCESS def __handle_error(self) -> bool: @@ -332,10 +334,10 @@ def _step(self): """ try: if self.get_lifecycle_state().state_id == State.PRIMARY_STATE_ACTIVE: - self._evaluate_periodic_callbacks() + self._evaluate_periodic_component_callbacks() self.on_step_callback() - self._publish_outputs() - self._publish_predicates() + self._publish_component_outputs() + self._publish_component_predicates() except Exception as e: self.get_logger().error(f"Failed to execute step function: {e}") self.raise_error() @@ -347,13 +349,13 @@ def __configure_outputs(self) -> bool: :return: True if configuration was successful """ success = True - for signal_name, output_dict in self._outputs.items(): + for signal_name, message_type in self.__outputs.items(): try: topic_name = self.get_parameter_value(signal_name + "_topic") self.get_logger().debug(f"Configuring output '{signal_name}' with topic name '{topic_name}'.") - publisher = self.create_lifecycle_publisher(msg_type=output_dict["message_type"], topic=topic_name, - qos_profile=self._qos) - self._outputs[signal_name]["publisher"] = publisher + publisher = self.create_lifecycle_publisher(msg_type=message_type, topic=topic_name, + qos_profile=self.get_qos()) + self._set_output_publisher(signal_name, publisher) except Exception as e: success = False self.get_logger().debug(f"Failed to configure output '{signal_name}': {e}") @@ -378,19 +380,20 @@ def add_output(self, signal_name: str, data: str, message_type: MsgT, throttle_duration_sec=1.0) return try: - parsed_signal_name = self._create_output(signal_name, data, message_type, clproto_message_type, - default_topic, fixed_topic, publish_on_step) + parsed_signal_name = self._create_component_output(signal_name, data, message_type, clproto_message_type, + default_topic, fixed_topic, publish_on_step) topic_name = self.get_parameter_value(parsed_signal_name + "_topic") self.get_logger().debug(f"Adding output '{parsed_signal_name}' with topic name '{topic_name}'.") + self.__outputs[parsed_signal_name] = message_type except AddSignalError as e: self.get_logger().error(f"Failed to add output '{signal_name}': {e}") def __activate_outputs(self): success = True state = self.get_lifecycle_state().state_id - for signal_name, output_dict in self._outputs.items(): + for signal_name in self.__outputs.keys(): try: - output_dict["publisher"].on_activate(state) + self._get_output_publisher(signal_name).on_activate(state) except Exception as e: success = False self.get_logger().error(f"Failed to activate output '{signal_name}': {e}") @@ -400,19 +403,19 @@ def __activate_outputs(self): def __deactivate_outputs(self): success = True state = self.get_lifecycle_state().state_id - for signal_name, output_dict in self._outputs.items(): + for signal_name in self.__outputs.keys(): try: - output_dict["publisher"].on_deactivate(state) + self._get_output_publisher(signal_name).on_activate(state) except Exception as e: success = False self.get_logger().error(f"Failed to deactivate output '{signal_name}': {e}") self.get_logger().debug("All outputs deactivated.") return success - + def raise_error(self): """ Trigger the shutdown and error transitions. """ super().raise_error() - self._has_error = True + self.__has_error = True self.trigger_shutdown()