Skip to content

Commit

Permalink
feat(planning_debug_tools): add cpu_usage_checker.py (#66)
Browse files Browse the repository at this point in the history
* feat(planning_debug_tools): add cpu_usage_checker.py

Signed-off-by: Takayuki Murooka <[email protected]>

* update README

Signed-off-by: Takayuki Murooka <[email protected]>

---------

Signed-off-by: Takayuki Murooka <[email protected]>
  • Loading branch information
takayuki5168 authored Jun 28, 2024
1 parent 2656150 commit ad0b811
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 0 deletions.
1 change: 1 addition & 0 deletions planning/planning_debug_tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ install(PROGRAMS
scripts/processing_time_checker.py
scripts/trajectory_visualizer.py
scripts/closest_velocity_checker.py
scripts/cpu_usage_checker.py
scripts/perception_replayer/perception_reproducer.py
scripts/perception_replayer/perception_replayer.py
scripts/update_logger_level.sh
Expand Down
13 changes: 13 additions & 0 deletions planning/planning_debug_tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This package contains several planning-related debug tools.
- **Perception reproducer**: generates detected objects from rosbag data in planning simulator environment
- **processing time checker**: displays processing_time of modules on the terminal
- **logging level updater**: updates the logging level of the planning modules.
- **CPU Usage Checker**: displays CPU usage of ROS processes on the terminal

## Trajectory analyzer

Expand Down Expand Up @@ -292,3 +293,15 @@ ros2 run planning_debug_tools update_logger_level.sh <module-name> <logger-level
When you have a typo of the planning module, the script will show the available modules.

![logging_level_updater_typo](image/logging_level_updater_typo.png)

## CPU Usage Checker

The purpose of the CPU Usage Checker is to monitor and visualize the CPU usage of the ROS processes. By providing a real-time terminal-based visualization, users can easily confirm the cpu usage as in the picture below.

![cpu_usage_checker](image/cpu_usage_checker.png)

You can run the program by the following command.

```bash
ros2 run planning_debug_tools cpu_usage_checker.py
```
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
93 changes: 93 additions & 0 deletions planning/planning_debug_tools/scripts/cpu_usage_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3

# Copyright 2024 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import threading

import psutil


def get_cpu_usage(pid, cpu_usages, interval):
try:
proc = psutil.Process(pid)
cpu_usage = proc.cpu_percent(interval=interval)
cmdline = proc.cmdline()
process_name = " ".join(cmdline)
component = process_name.split("__ns:=/")[1].split("/")[0].split(" ")[0]
if component == "":
component = "others"
container = process_name.split("__node:=", 1)[1].split(" ")[0]
cpu_usages[pid] = {"component": component, "container": container, "usage": cpu_usage}
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, IndexError):
pass


def print_cpu_usage(cpu_usages):
# clear terminal
os.system("clear")
if not cpu_usages:
print("No processes found with the specified name.")
return

# sort in the order of component name
sorted_cpu_usages = sorted(cpu_usages.items(), key=lambda x: x[1]["component"])

print(" CPU Usage")
print("-" * 185)

last_component = None
for pid, data in sorted_cpu_usages:
component = data["component"]
container = data["container"]
usage = data["usage"]
bar = "#" * int(usage)

if last_component and last_component != component:
print("|" + "-" * 17 + "|" + "-" * 57 + "|" + "-" * 7 + "|" + "-" * 100)

last_component = component
process_info = f"| {component.split('/')[-1].ljust(15)} | {container.ljust(55)} | {str(usage).ljust(4)}% | {bar}"
print(process_info)

print("-" * 185)


def main():
cpu_usages = {}
while True:
# create thread to calculate cpu usage of each process since it takes time
process_name_keyword = "__node:="
threads = []
for proc in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
try:
if process_name_keyword in " ".join(proc.info["cmdline"]):
pid = proc.info["pid"]
thread = threading.Thread(target=get_cpu_usage, args=(pid, cpu_usages, 1.0))
threads.append(thread)
thread.start()
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass

# wait for all the thread to finish
for thread in threads:
thread.join()

# print cpu usage
print_cpu_usage(cpu_usages)


if __name__ == "__main__":
main()

0 comments on commit ad0b811

Please sign in to comment.