Skip to content

Commit

Permalink
feat(autoware_debug_tools): add processing time visualizer
Browse files Browse the repository at this point in the history
Signed-off-by: Y.Hisaki <[email protected]>
  • Loading branch information
yhisaki committed Jul 11, 2024
1 parent 5652129 commit 59306fc
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 0 deletions.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import curses
import time
import uuid

import rclpy
import rclpy.executors
from rclpy.node import Node
from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg

from .print_tree import print_tree
from .topic_selector import select_topic
from .tree import ProcessingTimeTree
from .utils import exit_curses
from .utils import init_curses


class ProcessingTimeVisualizer(Node):
def __init__(self):
super().__init__("processing_time_visualizer" + str(uuid.uuid4()).replace("-", "_"))

self.subscriber = self.subscribe_processing_time_tree()
self.tree = ProcessingTimeTree()
self.worst_case_tree = self.tree
self.stdcscr = init_curses()

Check warning on line 24 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (stdcscr)
print_tree("🌲 Processing Time Tree 🌲", self.topic_name, self.tree, self.stdcscr)

Check warning on line 25 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (stdcscr)

def subscribe_processing_time_tree(self):
topics = []

s = time.time()
while True:
for topic_name, topic_types in self.get_topic_names_and_types():
for topic_type in topic_types:
if (
topic_type == "tier4_debug_msgs/msg/ProcessingTimeTree"
and topic_name not in topics
):
topics.append(topic_name)

if time.time() - s > 1.0:
break

if len(topics) == 0:
self.get_logger().info("No ProcessingTimeTree topic found")
self.get_logger().info("Exiting...")
exit(1)
else:
self.topic_name = curses.wrapper(select_topic, topics)
subscriber = self.create_subscription(
ProcessingTimeTreeMsg,
self.topic_name,
self.callback,
10,
)

return subscriber

def callback(self, msg: ProcessingTimeTreeMsg):
self.tree = ProcessingTimeTree.from_msg(msg)
if self.tree.processing_time > self.worst_case_tree.processing_time:
self.worst_case_tree = self.tree
print_tree("🌲 Processing Time Tree 🌲", self.topic_name, self.tree, self.stdcscr)

Check warning on line 62 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/node.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (stdcscr)


def main(args=None):
rclpy.init(args=args)
try:
node = ProcessingTimeVisualizer()
except KeyboardInterrupt:
exit_curses()
return
try:
rclpy.spin(node)
except (KeyboardInterrupt, rclpy.executors.ExternalShutdownException):
node.destroy_node()
exit_curses()
print("⏰ Worst Case Execution Time ⏰")
print(node.worst_case_tree)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import curses

from .tree import ProcessingTimeTree
from .utils import abbreviate_topic
from .utils import wrap_lines


def print_tree(prefix: str, topic_name: str, tree: ProcessingTimeTree, stdscr: curses.window):
stdscr.clear()
height, width = stdscr.getmaxyx()
stdscr.addstr(0, 0, prefix[: width - 2], curses.color_pair(2))
topic_showing = (abbreviate_topic(topic_name) if len(topic_name) > width else topic_name)[
: width - 2
]
stdscr.addstr(1, 0, topic_showing, curses.color_pair(1))
tree_lines = wrap_lines(tree.to_lines(), width, height - 2)
for i, line in enumerate(tree_lines):
stdscr.addstr(i + 2, 1, line)
stdscr.refresh()
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import curses
from typing import List
from typing import Optional

from .utils import abbreviate_topic
from .utils import wrap_topic_name


def select_topic(stdscr: curses.window, topics: List[str]) -> Optional[str]:
curses.curs_set(0) # Hide the cursor
curses.start_color() # Enable color support
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_WHITE) # Define color pair
curses.init_pair(2, curses.COLOR_RED, curses.COLOR_BLACK) # Define red color pair

current_topic = 0
start_index = 0
max_topics = 8

while True:
stdscr.clear()
height, width = stdscr.getmaxyx()

# Check if the terminal window is too small
if (
width < max(len(abbreviate_topic(topic)) for topic in topics) + 2
or height < max_topics + 2
):
error_msg = "Terminal window too small. Please resize."
stdscr.addstr(height // 2, width // 2 - len(error_msg) // 2, error_msg)
stdscr.refresh()
key = stdscr.getch()
if key in [ord("q"), ord("Q")]:
return None
continue

# Display the full selected topic in red at the top, with wrapping if necessary
full_topic = topics[current_topic]
lines = wrap_topic_name(full_topic, width - 2)

for i, line in enumerate(lines):
stdscr.attron(curses.color_pair(2))

Check warning on line 41 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (attron)
stdscr.addstr(i, 1, line)
stdscr.attroff(curses.color_pair(2))

Check warning on line 43 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (attroff)

# Display the topics
for idx in range(start_index, min(start_index + max_topics, len(topics))):
abbreviated_option = abbreviate_topic(topics[idx])[: width - 2] # Truncate if necessary
x = width // 2 - len(abbreviated_option) // 2
y = height // 2 - max_topics // 2 + idx - start_index + len(lines)
if idx == current_topic:
stdscr.attron(curses.color_pair(1))

Check warning on line 51 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (attron)
stdscr.addstr(y, x, abbreviated_option)
stdscr.attroff(curses.color_pair(1))

Check warning on line 53 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/topic_selector.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (attroff)
else:
stdscr.addstr(y, x, abbreviated_option)

# Display navigation buttons if needed
if start_index + max_topics < len(topics):
string = "Next>"
stdscr.addstr(height - 1, width - len(string) - 1, string)
if start_index > 0:
string = "<Prev"
stdscr.addstr(height - 1, 0, string)

stdscr.refresh()

# Handle user input
key = stdscr.getch()
if key == curses.KEY_UP and current_topic > 0:
current_topic -= 1
if current_topic < start_index:
start_index -= 1
elif key == curses.KEY_DOWN and current_topic < len(topics) - 1:
current_topic += 1
if current_topic >= start_index + max_topics:
start_index += 1
elif key in [curses.KEY_ENTER, 10, 13]:
return topics[current_topic]
elif key == curses.KEY_RIGHT and start_index + max_topics < len(topics):
start_index += max_topics
current_topic = start_index
elif key == curses.KEY_LEFT and start_index > 0:
start_index -= max_topics
current_topic = start_index
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from typing import Dict

from tier4_debug_msgs.msg import ProcessingTimeTree as ProcessingTimeTreeMsg


class ProcessingTimeTree:
def __init__(
self,
name: str = "",
processing_time: float = 0.0,
id: int = 1, # noqa
parent_id: int = 0,
):
self.name = name
self.processing_time = processing_time
self.id = id
self.parent_id = parent_id
self.children = []

@classmethod
def from_msg(cls, msg: ProcessingTimeTreeMsg) -> "ProcessingTimeTree":
# Create a dictionary to map node IDs to ProcessingTimeTree objects
node_dict: Dict[int, ProcessingTimeTree] = {
node.id: ProcessingTimeTree(node.name, node.processing_time, node.id, node.parent_id)
for node in msg.nodes
}

# Build the tree structure
root = node_dict[1]
for node in list(node_dict.values()):
parent = node_dict.get(node.parent_id)
if parent:
parent.children.append(node)

return root

def to_lines(self) -> str:
def construct_string(
node: "ProcessingTimeTree",
lines: list,
prefix: str,
is_last: bool,
is_root: bool,
) -> None:
# If not the root, append the prefix and the node information
line = ""
if not is_root:
line += prefix + ("└── " if is_last else "├── ")
lines.append(line + f"{node.name}: {node.processing_time} [ms]")
# Recur for each child node
for i, child in enumerate(node.children):
construct_string(
child,
lines,
prefix + (" " if is_last else "│ "),
i == len(node.children) - 1,
False,
)

lines = []
# Start the recursive string construction with the root node
construct_string(self, lines, "", True, True)
return lines

def __str__(self) -> str:
return "".join([line + "\n" for line in self.to_lines()])
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import curses
from typing import List


def abbreviate_topic(topic: str) -> str:
parts = topic.split("/")
abbreviated_parts = [part[0] if len(part) > 1 else part for part in parts[:-1]]
return "/".join(abbreviated_parts + [parts[-1]])


def wrap_topic_name(text: str, width: int) -> List[str]:
lines = []
while len(text) > width:
split_point = text.rfind("/", 0, width)
if split_point == -1:
split_point = width
lines.append(text[:split_point])
text = text[split_point:]
lines.append(text)
return lines


def wrap_lines(lines, width, height):
return [line[:width] for line in lines][:height]


def exit_curses():
curses.echo()
curses.nocbreak()

Check warning on line 29 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/utils.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (nocbreak)
curses.endwin()

Check warning on line 30 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/utils.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (endwin)


def init_curses() -> curses.window:
stdscr = curses.initscr()
curses.noecho()

Check warning on line 35 in common/autoware_debug_tools/autoware_debug_tools/processing_time_visualizer/utils.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (noecho)
curses.cbreak()
stdscr.keypad(True)
curses.start_color()
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE)
return stdscr
21 changes: 21 additions & 0 deletions common/autoware_debug_tools/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>autoware_debug_tools</name>
<version>0.0.0</version>
<description>The autoware_debug_tools package</description>
<maintainer email="[email protected]">hisaki</maintainer>
<license>Apache License 2.0</license>

<exec_depend>rclpy</exec_depend>
<exec_depend>tier4_debug_msgs</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions common/autoware_debug_tools/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/autoware_debug_tools
[install]
install_scripts=$base/lib/autoware_debug_tools
26 changes: 26 additions & 0 deletions common/autoware_debug_tools/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from setuptools import find_packages
from setuptools import setup

package_name = "autoware_debug_tools"

setup(
name=package_name,
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="Yukinari Hisaki",
maintainer_email="[email protected]",
description="The autoware_debug_tools package",
license="Apache License 2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"processing_time_visualizer = autoware_debug_tools.processing_time_visualizer.node:main",
],
},
)

0 comments on commit 59306fc

Please sign in to comment.