diff --git a/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/launch_file_analyse.py b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/launch_file_analyse.py new file mode 100644 index 00000000..af6f7db3 --- /dev/null +++ b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/launch_file_analyse.py @@ -0,0 +1,628 @@ +# Total Structure +"""String Utils -> Launch Tree -> Launch Node Utils -> Launch Analyzer.""" + +# String Utils + +from copy import deepcopy +import json +import os +import re +from typing import List +from typing import Optional +import xml.etree.ElementTree as ET + +import yaml + +patterns = { + "var": r"\$\((var) ([^\)]+)\)", + "env": r"\$\((env) ([^\s]+)(?:\s+([^\)]+))?\)", + "eval": r"\$\((eval) ([^\)]+)\)", + "find-pkg-share": r"\$\((find-pkg-share) ([^\)]+)\)", +} + +BASE_PROJECT_MAPPING = {} +FLAG_CHECKING_SYSTEM_PROJECTS = False + + +def find_package(package_name) -> str: + """Return the share directory of the given package.""" + if package_name in BASE_PROJECT_MAPPING: + return BASE_PROJECT_MAPPING[package_name] + else: + if FLAG_CHECKING_SYSTEM_PROJECTS: + from ament_index_python.packages import get_package_share_directory + + BASE_PROJECT_MAPPING[package_name] = get_package_share_directory(package_name) + else: + BASE_PROJECT_MAPPING[ + package_name + ] = f"/opt/ros/humble/share/{package_name}" # use this for temporal solution; + return BASE_PROJECT_MAPPING[package_name] + + +def clean_eval_variables(string: str) -> str: + """Remove quotes and spaces from a string, to obtain the 'value' of a variable.""" + string = string.replace("\\", "") + if string.startswith('"') and string.endswith('"'): + return string[1:-1] + elif string.startswith("'") and string.endswith("'"): + return string[1:-1] + else: + return string + + +def analyze_eval_string(input_string: str) -> str: + """Evaluate the expression in the $(eval ...) tag.""" + list_of_strings = input_string.split(" ") + if list_of_strings[0] == "$(eval": + expression = " ".join(list_of_strings[1:])[:-1] # remove the last ')' + expression = clean_eval_variables(expression) + result = str(eval(expression)) # remove the outer quotes + else: + result = input_string + return result + + +def analyze_string( + input_string: str, context: dict, local_context: dict, base_namespace: str +) -> str: + """Resolve substitutions recursively in a given string. + + Args: + context: The arguments and variables context of the current XML file, which is defined by the arg tag and will be passed to the included file + local_context: The local variable context of the current XML file, which is defined by the let tag + base_namespace: The current namespace of the XML file + + Returns: + The string with all substitutions resolved. + """ + + def replace_match(match): + # Determine type and execute corresponding logic + if match.group(1) == "var": + variable_name = analyze_string( + match.group(2), context, local_context, base_namespace + ) # Recursively resolve inner substitutions + # Check if the variable is in the local context + var_value = local_context.get(variable_name, None) + if var_value is None: + # Check if the variable is in the global context + var_value = context.get(variable_name) + return var_value + elif match.group(1) == "env": + var_name = analyze_string( + match.group(2), context, local_context, base_namespace + ) # Recursively resolve inner substitutions + default_value = analyze_string( + match.group(3) if match.group(3) is not None else "", + context, + local_context, + base_namespace, + ) + return os.getenv(var_name, default_value) + + elif match.group(1) == "find-pkg-share": + package_name = analyze_string( + match.group(2), context, local_context, base_namespace + ) # Recursively resolve inner substitutions + package_dir = find_package(package_name) + return package_dir + + return "" + + # Loop to ensure all substitutions are resolved + for key, pattern in patterns.items(): + """ + 1. Solve all variables. + 2. Solve all environment variables. + 3. Solve all eval expressions. + 4. Solve all find-pkg-share expressions. + """ + if key == "eval": + input_string = analyze_eval_string(input_string) + else: + while True: + old_string = input_string + input_string = re.sub(pattern, replace_match, input_string) + # Stop if no more changes are made + if input_string == old_string: + break + # solve for "\" in the string + input_string = input_string.replace("\\", "") + + return input_string + + +def find_linked_path(path: str) -> str: + """Find the linked path of a given path. If the path is not a link, return the path itself.""" + if os.path.islink(path): + linked_path = os.readlink(path) + return linked_path + else: + return path + + +# Launch Tree + + +class LaunchTreeNode: + """Each node in the launch tree is a LaunchTreeNode. It represents a launch file or a ros node.""" + + def __init__(self, name: str, **kwargs): + self.name = name + self.children: List[LaunchTreeNode] = [] + self.parameters = kwargs + + def add_child(self, child: "LaunchTreeNode"): + self.children.append(child) + + def jsonify(self): + return { + "name": self.name, + "children": [child.jsonify() for child in self.children], + "parameters": self.parameters, + } + + +class LaunchTree: + """Tree Structure to store the launch file structure.""" + + def __init__(self): + self.root = None + self.edges_manager = [] + self.nodes_manager = {} + + def get_node(self, node_name): + return self.nodes_manager[node_name] + + def add_root(self, root_name, **kwargs): + if self.root is None: + self.root = LaunchTreeNode(root_name) + self.nodes_manager[root_name] = self.root + else: + print("Root already exists") + + def add_child(self, parent_name, child_name, **kwargs): + if self.root is None: + self.root = LaunchTreeNode(parent_name) + self.nodes_manager[parent_name] = self.root + + if parent_name not in self.nodes_manager: + # print(f"Parent node {parent_name} not found") + return + + if child_name in self.nodes_manager: + # print(f"Child node {child_name} already exists") + return + + child = LaunchTreeNode(child_name, **kwargs) + self.nodes_manager[child_name] = child + self.nodes_manager[parent_name].add_child(child) + self.edges_manager.append((parent_name, child_name)) + + def add_argument(self, node_name, argument_name, argument_value): + if node_name not in self.nodes_manager: + print(f"Node {node_name} not found") + return + + self.nodes_manager[node_name].arguments[argument_name] = argument_value + + def jsonify(self): + json_object = self.root.jsonify() + return json_object + + def __repr__(self) -> str: + json_object = self.jsonify() + return json.dumps(json_object, indent=4) + + +def find_unset_parameters(tree: LaunchTree): + """Find all unset parameters in the launch tree.""" + unset_parameters = [] + for node_name, node in tree.nodes_manager.items(): + for parameter_name, parameter_value in node.parameters.items(): + if parameter_value == "ERROR_PARAMETER_NOT_SET": + unset_parameters.append((node_name, parameter_name)) + return unset_parameters + + +# Launch Node Utils + + +def read_ros_yaml(file_path: str) -> dict: + """Read and return the contents of a YAML file.""" + with open(file_path, "r") as file: + # Using safe_load() to avoid potential security risks + data = yaml.safe_load(file) + + data = data["/**"]["ros__parameters"] + return data + + +def include_ros_yaml(file_path: str) -> dict: + """Read and return the contents of a YAML file.""" + with open(file_path, "r") as file: + # Using safe_load() to avoid potential security risks + data = yaml.safe_load(file) + + data = data["launch"] + parameters = {} + for argument_dict in data: + name = argument_dict["arg"]["name"] + default = argument_dict["arg"]["default"] + parameters[name] = default + return parameters + + +def parse_node_tag(node_tag: ET.Element, base_namespace: str, context: dict, local_context: dict): + try: + pkg = analyze_string(node_tag.get("pkg"), context, local_context, base_namespace) + executable = analyze_string(node_tag.get("exec"), context, local_context, base_namespace) + except Exception as e: + print(f"Error in parsing node tag: {e}") + print( + node_tag.get("pkg"), + node_tag.get("exec"), + base_namespace, + context["__current_launch_name_"], + ) + raise Exception(f"Error in parsing node tag: {e}") + local_parameters = {} + local_parameters["__param_files"] = [] + # print(context, base_namespace) + for child in node_tag: + if child.tag == "param": + if child.get("name") is not None: + local_parameters[child.get("name")] = analyze_string( + child.get("value"), context, local_context, base_namespace + ) + if child.get("from") is not None: + path = analyze_string(child.get("from"), context, local_context, base_namespace) + path = find_linked_path(path) + if path.endswith("_empty.param.yaml"): + continue + # print(path, child.get("from")) + local_parameters["__param_files"].append(path) + if path == "": + print("-----Node Parameter not Found------") + print(f"----package: {pkg}-----") + print(f"----exec: {executable}-----") + print(f"----parameter string: {child.get('from')}-----") + else: + data = read_ros_yaml(path) + for key in data: + if isinstance(data[key], str) and data[key].startswith("$(var "): + local_parameters[key] = analyze_string( + data[key], context, local_context, base_namespace + ) + else: + local_parameters[key] = data[key] + if child.tag == "remap": + from_topic = analyze_string(child.get("from"), context, local_context, base_namespace) + to_topic = analyze_string(child.get("to"), context, local_context, base_namespace) + if "__remapping__" not in local_parameters: + local_parameters["__remapping__"] = {} + local_parameters["__remapping__"][from_topic] = to_topic + + context["__tree__"].add_child( + context["__current_launch_name_"], f"{pkg}/{executable}", **local_parameters + ) + + +def parse_load_composable_node( + load_composable_node_tag: ET.Element, base_namespace: str, context: dict, local_context: dict +): + container_target = load_composable_node_tag.get("target") + local_parameters = local_context.copy() + local_parameters["__container_target__"] = container_target + for child in load_composable_node_tag: + process_tag( + child, + base_namespace, + context, + local_parameters, + ) + return context + + +def parse_composable_node( + composable_node_tag: ET.Element, base_namespace: str, context: dict, local_context: dict +): + pkg = analyze_string(composable_node_tag.get("pkg"), context, local_context, base_namespace) + executable = analyze_string( + composable_node_tag.get("plugin"), context, local_context, base_namespace + ) + local_parameters = {} + local_parameters["__param_files"] = [] + # print(context, base_namespace) + for child in composable_node_tag: + if child.tag == "param": + if child.get("name") is not None: + local_parameters[child.get("name")] = analyze_string( + child.get("value"), context, local_context, base_namespace + ) + if child.get("from") is not None: + path = analyze_string(child.get("from"), context, local_context, base_namespace) + path = find_linked_path(path) + if path.endswith("_empty.param.yaml"): + continue + # print(path, child.get("from")) + local_parameters["__param_files"].append(path) + if path == "": + print("-----Node Parameter not Found------") + print(f"----package: {pkg}-----") + print(f"----exec: {executable}-----") + print(f"----parameter string: {child.get('from')}-----") + else: + data = read_ros_yaml(path) + for key in data: + if isinstance(data[key], str) and data[key].startswith("$(var "): + local_parameters[key] = analyze_string( + data[key], context, local_context, base_namespace + ) + else: + local_parameters[key] = data[key] + if child.tag == "remap": + from_topic = analyze_string(child.get("from"), context, local_context, base_namespace) + to_topic = analyze_string(child.get("to"), context, local_context, base_namespace) + if "__remapping__" not in local_parameters: + local_parameters["__remapping__"] = {} + local_parameters["__remapping__"][from_topic] = to_topic + + context["__tree__"].add_child( + context["__current_launch_name_"], f"{pkg}/{executable}", **local_parameters + ) + + +# Launch Analyzer + + +def find_cmake_projects(root_dir): + for dirpath, _, filenames in os.walk(root_dir): + if "CMakeLists.txt" in filenames: + cmake_file_path = os.path.join(dirpath, "CMakeLists.txt") + with open(cmake_file_path, "r") as file: + for line in file: + if line.startswith("project("): + # Extract the project name + project_name = line.split("(")[1].split(")")[0].strip() + BASE_PROJECT_MAPPING[project_name] = dirpath + break + + +def check_if_run(tag: ET.Element, base_name: dict, context: dict, local_context: dict): + """Many tag has a if and unless attribute, this function checks if the tag should be run or not.""" + if tag.get("if"): + if_value = analyze_string(tag.get("if"), context, local_context, base_name) + if_value = if_value.lower() == "true" + if not if_value: + return False + if tag.get("unless"): + unless_value = analyze_string(tag.get("unless"), context, local_context, base_name) + unless_value = unless_value.lower() == "true" + if unless_value: + return False + return True + + +def copy_context(context: dict): + new_context = {} + for key in context: + new_context[key] = context[key] + return new_context + + +def process_include_tag( + include_tag: ET.Element, + context: dict, + local_context: dict, + base_namespace: str, + group_base_namespace: Optional[str] = None, +): + """Process the include tag, which includes another XML file. + + include_tag: The include XML node tag to process + context: The arguments and variables context of the current XML file, which is defined by the arg tag and will be passed to the included file + local_context: The local variable context of the current XML file, which is defined by the let tag + base_namespace: The current namespace of the XML file + group_base_namespace: The namespace of the current group tag (affect the namespace of the included file) + """ + if group_base_namespace is None: + group_base_namespace = base_namespace + included_file = include_tag.get("file") + included_file = analyze_string(included_file, context, local_context, base_namespace) + included_file = find_linked_path(included_file) + + if included_file.endswith(".yaml"): + # this is a yaml file for parameters + data = include_ros_yaml(included_file) + for key in data: + if key not in context: + context[key] = data[key] + return context + + temp_context = copy_context(context) + argument_dict = {} + for child in include_tag: + if child.tag == "arg": + value = analyze_string(child.get("value"), temp_context, local_context, base_namespace) + name = analyze_string( + child.get("name"), + temp_context, + local_context, + base_namespace, + ) + temp_context[ + name + ] = value # temp_context is used to pass arguments to the included file and updated on the fly for each argument + for key in argument_dict: + temp_context[key] = argument_dict[key] + if included_file: + context["__tree__"].add_child( + context["__current_launch_name_"], + os.path.basename(included_file), + path=included_file, + ) + if included_file.startswith("/opt/ros/humble") and (not FLAG_CHECKING_SYSTEM_PROJECTS): + # not checking system projects if the file is in /opt/ros/humble and FLAG_CHECKING_SYSTEM_PROJECTS is False + pass + elif included_file.endswith(".launch.xml"): + # check if the file is a launch file + return parse_xml(included_file, group_base_namespace, temp_context) + return context + + +def parse_argument_tag( + argument_tag: ET.Element, base_namespace: str, context: dict, local_context: dict +): + # argument_name = os.path.join(base_namespace, argument_tag.get("name")) + argument_name = argument_tag.get("name") + if argument_tag.get("default"): + if argument_name not in context: + value = analyze_string( + argument_tag.get("default"), context, local_context, base_namespace + ) + context["__tree__"].get_node(context["__current_launch_name_"]).parameters[ + argument_name + ] = value + context[argument_name] = value + else: + if argument_name not in context: + context["__tree__"].get_node(context["__current_launch_name_"]).parameters[ + argument_name + ] = "ERROR_PARAMETER_NOT_SET" + # print(f"Argument {argument_name} has no default value and is not set in context") + else: + context["__tree__"].get_node(context["__current_launch_name_"]).parameters[ + argument_name + ] = context[argument_name] + return context + + +def parse_let_tag(let_tag: ET.Element, base_namespace: str, context: dict, local_context: dict): + argument_name = let_tag.get("name") + if let_tag.get("value"): + local_context[argument_name] = analyze_string( + let_tag.get("value"), context, local_context, base_namespace + ) + return context + + +def parse_group_tag( + group_tag: ET.Element, + base_namespace: str, + context: dict, + local_context: dict, + parent_file_space: Optional[str] = None, +): + if parent_file_space is None: + parent_file_space = base_namespace + # find the push-ros-namespace tag inside the children + group_base_namespace = deepcopy(base_namespace) + for child in group_tag: + if child.tag == "push-ros-namespace": + if child.get("namespace").strip() == "/": + continue + group_base_namespace = f"{base_namespace}/{child.get('namespace').strip('/')}" + # print(f"Setting ROS namespace to {group_base_namespace} inside group") + + # find all other children + for child in group_tag: + process_tag( + child, + base_namespace, + context, + local_context, + group_base_namespace=group_base_namespace, + ) + + # if group_base_namespace != base_namespace: + # print(f"Exiting group with namespace {group_base_namespace}") + return context + + +def process_tag( + tag: ET.Element, + base_namespace: str, + context: dict, + local_context: dict, + group_base_namespace: Optional[str] = None, +): + if group_base_namespace is None: + group_base_namespace = base_namespace + if not check_if_run(tag, base_namespace, context, local_context): + return context + + if tag.tag == "arg": + context = parse_argument_tag(tag, base_namespace, context, local_context) + elif tag.tag == "let": + context = parse_let_tag(tag, base_namespace, context, local_context) + elif tag.tag == "group": + context = parse_group_tag(tag, base_namespace, context, local_context) + elif tag.tag == "include": + context = process_include_tag( + tag, context, local_context, base_namespace, group_base_namespace + ) + elif tag.tag == "node": + context = parse_node_tag(tag, base_namespace, context, local_context) + elif tag.tag == "load_composable_node": + context = parse_load_composable_node(tag, base_namespace, context, local_context) + elif tag.tag == "composable_node": + context = parse_composable_node(tag, base_namespace, context, local_context) + return context + + +def parse_xml(file_path: str, namespace: str = "", context: dict = {}): + """Recursively parse XML files, handling tags. For each file, the namespace should be the same.""" + full_path = os.path.join(file_path) + context["__current_launch_file__"] = full_path + context["__current_launch_name_"] = os.path.basename(full_path) + if "__tree__" not in context: + context["__tree__"] = LaunchTree() + if context["__tree__"].root is None: + context["__tree__"].add_root(context["__current_launch_name_"], path=full_path) + tree = ET.parse(full_path) + root = tree.getroot() + + # Process each node in the XML + local_context = {} + for tag in root: + process_tag(tag, namespace, context, local_context) + return context + + +def launch_file_analyse_main(launch_file, context={}, src_dir=None): + if src_dir: + find_cmake_projects(src_dir) + context = parse_xml(launch_file, context=context) + # with open("output.json", "w") as f: + # f.write(str(context["__tree__"])) + # print unused parameters + unset_parameters = find_unset_parameters(context["__tree__"]) + if len(unset_parameters) > 0: + for unset_parameter in unset_parameters: + print(unset_parameter) + raise Exception(f"Some parameters are not set; {unset_parameters}") + return context + + +if __name__ == "__main__": + import argparse + + args = argparse.ArgumentParser() + args.add_argument("--src_dir", type=str, default="src") + args.add_argument("--launch_file", type=str, required=True) + args.add_argument("--flag_check_system_file", action="store_true") + # context dictionary + args.add_argument("--context", "--parameters", nargs="+", help="Key=value pairs") + + args = args.parse_args() + src_dir = args.src_dir + launch_file = args.launch_file + FLAG_CHECKING_SYSTEM_PROJECTS = args.flag_check_system_file + context = {} + for param in args.context: + key, value = param.split("=") + context[key] = value + + launch_file_analyse_main(launch_file, context=context, src_dir=src_dir) diff --git a/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/localize_topic.py b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/localize_topic.py new file mode 100644 index 00000000..2dc01f24 --- /dev/null +++ b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/localize_topic.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +import argparse +import os +from typing import Dict +from typing import List + +from autoware_debug_tools.topic_connection_checker.launch_file_analyse import ( + launch_file_analyse_main, +) +from autoware_debug_tools.topic_connection_checker.launch_file_analyse import LaunchTree +from autoware_debug_tools.topic_connection_checker.launch_file_analyse import LaunchTreeNode +from autoware_debug_tools.topic_connection_checker.launch_file_analyse import find_cmake_projects +from autoware_debug_tools.topic_connection_checker.launch_file_analyse import find_package + + +def find_topics_in_file(file_path: str, topics: List[str]) -> Dict[str, List[int]]: + """Search for topics in a given file and return their line numbers.""" + topic_lines = {topic: [] for topic in topics} + + with open(file_path, "r", encoding="utf-8") as file: + for line_num, line in enumerate(file, 1): + for topic in topics: + if topic in line: + topic_lines[topic].append(line_num) + + return {topic: lines for topic, lines in topic_lines.items() if lines} + + +def search_topics_in_directory( + directory: str, topics: List[str] +) -> Dict[str, Dict[str, List[int]]]: + """Search for topics in all relevant files within the given directory and its subdirectories.""" + results = {} + + for root, _, files in os.walk(directory): + for file in files: + if file.endswith((".cpp", ".hpp", ".h", "launch.py")): + file_path = os.path.join(root, file) + file_results = find_topics_in_file(file_path, topics) + if file_results: + results[file_path] = file_results + + return results + + +def search_topics_in_launch_tree(tree: LaunchTree, topics: List[str]) -> Dict[str, List[str]]: + results = {} + + for node_name in tree.nodes_manager: + node: LaunchTreeNode = tree.nodes_manager[node_name] + name = node.name + results[name] = {topic: [] for topic in topics} + value_found = False + + if "__remapping__" in node.parameters: + for remap in node.parameters["__remapping__"]: + value = node.parameters["__remapping__"][remap] + if value in topics: + if "input" in remap: + # it means that the topic is a subscriber; we aim to find the publisher + continue + results[name][value] = remap + value_found = True + + for param in node.parameters: + value = node.parameters[param] + if value in topics: + if "input" in param: + # it means that the topic is a subscriber; we aim to find the publisher + continue + results[name][value] = param + value_found = True + + if value_found: + if name.endswith(".launch.xml"): + # topic name found in launch parameters + print(f"\nFile: {node.parameters['path']}") + for topic, param in results[name].items(): + if len(param) == 0: + continue + print(f" Topic '{topic}' found as parameter(s): {param}") + + else: + # topic name found in node's parameters or remapping + print(f"\nNode: {name}") + for topic, param in results[name].items(): + if len(param) == 0: + continue + print(f" Topic '{topic}' found as parameter(s): {param}") + + +def print_results(results: Dict[str, Dict[str, List[int]]]): + """Print the search results in a formatted manner.""" + for file_path, file_results in results.items(): + print(f"\nFile: {file_path}") + for topic, lines in file_results.items(): + print(f" Topic '{topic}' found on line(s): {', '.join(map(str, lines))}") + + +def main(): + parser = argparse.ArgumentParser(description="Search for ROS topics in source files.") + parser.add_argument("directory", help="The directory to search in", default=".") + parser.add_argument("topics", help="Comma-separated list of topics to search for") + + args = parser.parse_args() + + directory = args.directory + topics = [topic.strip() for topic in args.topics.split(",")] + + print(f"Searching for topics: {', '.join(topics)}") + + find_cmake_projects(directory) + autoware_launch_directory = find_package("autoware_launch") + + autoware_launch_context_tree: LaunchTree = launch_file_analyse_main( + os.path.join(autoware_launch_directory, "launch", "autoware.launch.xml"), + context={"map_path": "dummy_map"}, + )["__tree__"] + + # Search in CPP, HPP, H, and python launch files + results = search_topics_in_directory(directory, topics) + + if results: + print("\nResults:") + print_results(results) + else: + print("\nNo matching topics found in the codes in the specified directory.") + + # Search in launch files + search_topics_in_launch_tree(autoware_launch_context_tree, topics) + + +if __name__ == "__main__": + main() diff --git a/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/node.py b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/topic_connection_checker_node.py similarity index 76% rename from common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/node.py rename to common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/topic_connection_checker_node.py index eff9714c..f5d80233 100644 --- a/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/node.py +++ b/common/autoware_debug_tools/autoware_debug_tools/topic_connection_checker/topic_connection_checker_node.py @@ -2,7 +2,10 @@ import importlib import threading +import time +from diagnostic_msgs.msg import DiagnosticArray +from diagnostic_msgs.msg import DiagnosticStatus import rclpy from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.executors import MultiThreadedExecutor @@ -20,18 +23,9 @@ def __init__(self): self.callback_group = ReentrantCallbackGroup() - # List of important topics to check - self.important_topics = [ - "/control/command/control_cmd", - "/control/trajectory_follower/control_cmd", - "/control/shift_decider/gear_cmd", - "/planning/scenario_planning/trajectory", - "/planning/turn_indicators_cmd", - "/planning/mission_planning/route", - "/perception/traffic_light_recognition/traffic_signals", - "/perception/object_recognition/objects", - # Add more important topics here - ] + # List of important topics to check; only add topics that are known to be important + # we will also listen to diagnostic messages to find out topics that create problems + self.important_topics = set("/control/command/control_cmd") self.ignore_topics = [ "/rosout", @@ -42,11 +36,25 @@ def __init__(self): self.lock = threading.Lock() self.check_completed = threading.Event() self.topics_to_check_next_round = set() + self.problematic_topics_without_publishers = set() self.checked_topics = set() # New set to keep track of checked topics self.reported_topics = set() # New set to keep track of reported topics self.timer = None # Timer object + self.diag_sub = self.create_subscription( + DiagnosticArray, + "/diagnostics", + self.diagnostic_callback, + QoSProfile(depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT), + callback_group=self.callback_group, + ) + self.start_time = time.time() + self.diagnostic_collectted = False + + # New: Dictionary to store problematic topics from diagnostics + self.diagnostic_problematic_topics = {} + # Default QoS profile (used if unable to determine publisher's QoS) self.default_qos_profile = QoSProfile( durability=QoSDurabilityPolicy.VOLATILE, @@ -55,6 +63,34 @@ def __init__(self): depth=1, ) + def diagnostic_callback(self, msg: DiagnosticArray): + passed_time = time.time() - self.start_time + if passed_time > 3: + self.destroy_subscription(self.diag_sub) + self.diagnostic_collectted = True + self.get_logger().info("Diagnostic data collected") + print(self.important_topics) + return + for status in msg.status: + isinstance(status, DiagnosticStatus) + if status.hardware_id == "topic_state_monitor": + status_is_ok = True + for key_value in status.values: + key = key_value.key + value = key_value.value + if key == "topic": + topic_name = value + if key == "status": + if value != "OK": + status_is_ok = False + + if not status_is_ok: + # print(topic_name, status_is_ok) + if topic_name not in self.important_topics: + self.important_topics.add(topic_name) + self.get_logger().warn(f"Diagnostic reports stuck topic: {topic_name}") + # self.analyze_topic_connections(topic_name) + def check_topics(self): self.check_completed.clear() for topic in self.important_topics: @@ -165,7 +201,8 @@ def analyze_results(self): else: self.get_logger().warn(f"Topic {topic} has unexpected state") - for topic in stuck_topics: + all_stuck_topics = set(stuck_topics + list(self.diagnostic_problematic_topics.keys())) + for topic in all_stuck_topics: self.analyze_topic_connections(topic) if self.topics_to_check_next_round: @@ -189,6 +226,7 @@ def analyze_topic_connections(self, stuck_topic): continue publishers_info = self.get_publishers_info_by_topic(topic) if len(publishers_info) == 0: + self.problematic_topics_without_publishers.add(topic) self.get_logger().error( f" Node {node_name} is subscribing to topic {topic} but there are no publishers" ) @@ -214,6 +252,7 @@ def main(args=None): thread.start() try: + time.sleep(3) # Wait for diagnostic data to be collected checker.check_topics() except KeyboardInterrupt: pass diff --git a/common/autoware_debug_tools/setup.py b/common/autoware_debug_tools/setup.py index b79b4f85..b61d4a8e 100644 --- a/common/autoware_debug_tools/setup.py +++ b/common/autoware_debug_tools/setup.py @@ -25,7 +25,8 @@ "processing_time_plotter = autoware_debug_tools.system_performance_plotter.processing_time_plotter:main", "cpu_usage_plotter = autoware_debug_tools.system_performance_plotter.cpu_usage_plotter:main", "memory_usage_plotter = autoware_debug_tools.system_performance_plotter.memory_usage_plotter:main", - "topic_connection_checker = autoware_debug_tools.topic_connection_checker.node:main", + "topic_connection_checker = autoware_debug_tools.topic_connection_checker.topic_connection_checker_node:main", + "topic_localizer = autoware_debug_tools.topic_connection_checker.localize_topic:main", ], }, )