Skip to content

Commit

Permalink
style(pre-commit): autofix
Browse files Browse the repository at this point in the history
  • Loading branch information
pre-commit-ci[bot] committed Oct 22, 2024
1 parent d112eaf commit 0bba6f9
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from .launch_tree import LaunchTree, LaunchTreeNode
from .launch_tree import LaunchTree
from .launch_tree import LaunchTreeNode
from .main import launch_file_analyse_main
from .string_utils import find_cmake_projects, find_package
from .string_utils import find_cmake_projects
from .string_utils import find_package
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from typing import List


class LaunchTreeNode:
"""Each node in the launch tree is a LaunchTreeNode. It represents a launch file or a ros node."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@
import os
from typing import Optional
import xml.etree.ElementTree as ET
import yaml

from autoware_debug_tools.topic_connection_checker.launch_file_analyse.launch_tree import LaunchTree
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import analyze_string, find_linked_path
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import FLAG_CHECKING_SYSTEM_PROJECTS
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import (
FLAG_CHECKING_SYSTEM_PROJECTS,
)
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import (
analyze_string,
)
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import (
find_linked_path,
)
import yaml


def read_ros_yaml(file_path: str) -> dict:
Expand Down Expand Up @@ -154,10 +161,6 @@ def parse_composable_node(
)






def check_if_run(tag: ET.Element, base_name: dict, context: dict, local_context: dict):
"""Many tag has a if and unless attribute, this function checks if the tag should be run or not."""
if tag.get("if"):
Expand Down Expand Up @@ -356,4 +359,4 @@ def parse_xml(file_path: str, namespace: str = "", context: dict = {}):
local_context = {}
for tag in root:
process_tag(tag, namespace, context, local_context)
return context
return context
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.launch_xml_parser import parse_xml
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import find_cmake_projects
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.launch_tree import find_unset_parameters

from autoware_debug_tools.topic_connection_checker.launch_file_analyse.launch_tree import (
find_unset_parameters,
)
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.launch_xml_parser import (
parse_xml,
)
from autoware_debug_tools.topic_connection_checker.launch_file_analyse.string_utils import (
find_cmake_projects,
)


def launch_file_analyse_main(launch_file, context={}, src_dir=None):
Expand All @@ -18,6 +23,7 @@ def launch_file_analyse_main(launch_file, context={}, src_dir=None):
raise Exception(f"Some parameters are not set; {unset_parameters}")
return context


if __name__ == "__main__":
import argparse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def find_package(package_name) -> str:
] = f"/opt/ros/humble/share/{package_name}" # use this for temporal solution;
return BASE_PROJECT_MAPPING[package_name]


def find_cmake_projects(root_dir):
for dirpath, _, filenames in os.walk(root_dir):
if "CMakeLists.txt" in filenames:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
#!/usr/bin/env python3

import importlib
import threading
import time

from diagnostic_msgs.msg import DiagnosticArray
from diagnostic_msgs.msg import DiagnosticStatus
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy, QoSHistoryPolicy
import threading
import importlib
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
import time
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class TopicConnectionChecker(Node):
def __init__(self):
super().__init__('optimized_topic_connection_checker')
self.get_logger().info('Optimized Topic Connection Checker node started')

self.callback_group = ReentrantCallbackGroup()

# List of important topics to check
self.important_topics = set([
'/control/command/control_cmd',
'/control/trajectory_follower/control_cmd',
'/control/shift_decider/gear_cmd',
super().__init__("optimized_topic_connection_checker")
self.get_logger().info("Optimized Topic Connection Checker node started")

'/planning/scenario_planning/trajectory',
'/planning/turn_indicators_cmd',
'/planning/mission_planning/route',

'/perception/traffic_light_recognition/traffic_signals',
'/perception/object_recognition/objects',
self.callback_group = ReentrantCallbackGroup()

# Add more important topics here
])
# List of important topics to check
self.important_topics = set(
[
"/control/command/control_cmd",
"/control/trajectory_follower/control_cmd",
"/control/shift_decider/gear_cmd",
"/planning/scenario_planning/trajectory",
"/planning/turn_indicators_cmd",
"/planning/mission_planning/route",
"/perception/traffic_light_recognition/traffic_signals",
"/perception/object_recognition/objects",
# Add more important topics here
]
)
self.important_topics = set()

self.ignore_topics = [
"/rosout",
"/parameter_events",
]

self.topic_data = {}
self.lock = threading.Lock()
self.check_completed = threading.Event()
Expand All @@ -51,10 +56,10 @@ def __init__(self):

self.diag_sub = self.create_subscription(
DiagnosticArray,
'/diagnostics',
"/diagnostics",
self.diagnostic_callback,
QoSProfile(depth=10, reliability=QoSReliabilityPolicy.BEST_EFFORT),
callback_group=self.callback_group
callback_group=self.callback_group,
)
self.start_time = time.time()
self.diagnostic_collectted = False
Expand All @@ -67,10 +72,10 @@ def __init__(self):
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
depth=1,
)

def diagnostic_callback(self, msg:DiagnosticArray):
def diagnostic_callback(self, msg: DiagnosticArray):
passed_time = time.time() - self.start_time
if passed_time > 3:
self.destroy_subscription(self.diag_sub)
Expand All @@ -90,7 +95,7 @@ def diagnostic_callback(self, msg:DiagnosticArray):
if key == "status":
if value != "OK":
status_is_ok = False

if not status_is_ok:
# print(topic_name, status_is_ok)
if topic_name not in self.important_topics:
Expand All @@ -104,15 +109,15 @@ def check_topics(self):
if topic not in self.checked_topics:
self.check_topic(topic)
self.checked_topics.add(topic)

# Set a timer for the overall check duration
if self.timer:
self.timer.cancel() # Cancel existing timer if any
self.timer = self.create_timer(5.0, self.finish_check, callback_group=self.callback_group)

# Wait for the check to complete
self.check_completed.wait()

self.analyze_results()

def check_topic(self, topic):
Expand All @@ -123,25 +128,25 @@ def check_topic(self, topic):

try:
# Dynamically import the message type
package_name, _, msg_name = msg_type.split('/')
package_name, _, msg_name = msg_type.split("/")
module = importlib.import_module(f"{package_name}.msg")
msg_class = getattr(module, msg_name)
except (ValueError, ImportError, AttributeError) as e:
self.get_logger().error(f"Failed to import message type for topic {topic}: {e}")
return

self.topic_data[topic] = {'received': False, 'publishers': [], 'last_received': None}
self.topic_data[topic] = {"received": False, "publishers": [], "last_received": None}

# Determine QoS profile
qos_profile = self.get_publisher_qos(topic)

# Create a subscription with the determined QoS profile
self.create_subscription(
msg_class,
topic,
lambda msg: self.topic_callback(topic, msg),
qos_profile,
callback_group=self.callback_group
callback_group=self.callback_group,
)

def get_publisher_qos(self, topic):
Expand All @@ -152,7 +157,7 @@ def get_publisher_qos(self, topic):

# Use the QoS of the first publisher (assuming all publishers use the same QoS)
pub_qos = publishers_info[0].qos_profile

return QoSProfile(
durability=pub_qos.durability,
reliability=pub_qos.reliability,
Expand All @@ -161,23 +166,23 @@ def get_publisher_qos(self, topic):
lifespan=pub_qos.lifespan,
deadline=pub_qos.deadline,
liveliness=pub_qos.liveliness,
liveliness_lease_duration=pub_qos.liveliness_lease_duration
liveliness_lease_duration=pub_qos.liveliness_lease_duration,
)

def topic_callback(self, topic, msg):
with self.lock:
self.topic_data[topic]['received'] = True
self.topic_data[topic]['last_received'] = self.get_clock().now()
self.topic_data[topic]["received"] = True
self.topic_data[topic]["last_received"] = self.get_clock().now()

def finish_check(self):
# Get publisher information for all topics
for topic in self.important_topics:
publishers_info = self.get_publishers_info_by_topic(topic)
with self.lock:
self.topic_data[topic]['publishers'] = [
self.topic_data[topic]["publishers"] = [
(p.node_name, p.node_namespace) for p in publishers_info
]

self.check_completed.set()

def get_topic_type(self, topic):
Expand All @@ -190,18 +195,23 @@ def get_topic_type(self, topic):
def analyze_results(self):
stuck_topics = []
for topic, data in self.topic_data.items():
if not data['received']:
if topic not in self.ignore_topics and len(data['publishers']) == 1 and topic not in self.reported_topics:
self.get_logger().warn(f"Topic {topic} from {data['publishers'][0][0]} is stuck (no messages received)")
if not data["received"]:
if (
topic not in self.ignore_topics
and len(data["publishers"]) == 1
and topic not in self.reported_topics
):
self.get_logger().warn(
f"Topic {topic} from {data['publishers'][0][0]} is stuck (no messages received)"
)
stuck_topics.append(topic)
self.reported_topics.add(topic)

elif data['last_received'] is not None:
elif data["last_received"] is not None:
# we assume that last received time is active because most of them are latch topics
pass
else:
self.get_logger().warn(f"Topic {topic} has unexpected state")


all_stuck_topics = set(stuck_topics + list(self.diagnostic_problematic_topics.keys()))
for topic in all_stuck_topics:
Expand All @@ -216,47 +226,52 @@ def analyze_results(self):
self.check_topics() # Start another round of checks

def analyze_topic_connections(self, stuck_topic):
publishers = self.topic_data[stuck_topic]['publishers']
publishers = self.topic_data[stuck_topic]["publishers"]
for node_name, node_namespace in publishers:

# Get all topics this node is subscribing to
node_subscriptions = self.get_subscriber_names_and_types_by_node(node_name, node_namespace)

node_subscriptions = self.get_subscriber_names_and_types_by_node(
node_name, node_namespace
)

for topic, types in node_subscriptions:
if topic in self.ignore_topics:
continue
publishers_info = self.get_publishers_info_by_topic(topic)
if len(publishers_info) == 0:
self.problematic_topics_without_publishers.add(topic)
self.get_logger().error(f" Node {node_name} is subscribing to topic {topic} but there are no publishers")
self.get_logger().error(
f" Node {node_name} is subscribing to topic {topic} but there are no publishers"
)
elif len(publishers_info) > 2:
# topics with multiple publishers are usually debuggers or loggers and are not a problem
continue
if topic in self.topic_data:
status = "stuck" if not self.topic_data[topic]['received'] else "active"
status = "stuck" if not self.topic_data[topic]["received"] else "active"
self.get_logger().debug(f" Subscribed topic {topic} is {status}")
elif topic not in self.checked_topics:
self.get_logger().debug(f" Subscribed topic {topic} was not checked")
self.topics_to_check_next_round.add(topic)


def main(args=None):
rclpy.init(args=args)
checker = TopicConnectionChecker()

executor = MultiThreadedExecutor()
executor.add_node(checker)

thread = threading.Thread(target=executor.spin, daemon=True)
thread.start()

try:
time.sleep(3) # Wait for diagnostic data to be collected
time.sleep(3) # Wait for diagnostic data to be collected
checker.check_topics()
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
thread.join()

if __name__ == '__main__':
main()

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Replace `$TOPIC1,$TOPIC2` with the actual topic names you want to localize, sepa
The Topic Localizer employs a two-step approach to find the source of problematic topics:

1. Direct Search:

- Scans all HPP/CPP and launch.py files for code snippets containing the exact names of the target topics.

2. Launch System Analysis:
Expand Down

0 comments on commit 0bba6f9

Please sign in to comment.