diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile
new file mode 100644
index 0000000..7e0c995
--- /dev/null
+++ b/.devcontainer/Dockerfile
@@ -0,0 +1,34 @@
+FROM osrf/ros:galactic-desktop-focal
+
+SHELL ["/bin/bash", "-c"]
+
+# create a non-root user
+# https://code.visualstudio.com/remote/advancedcontainers/add-nonroot-user#_creating-a-nonroot-user
+ARG USERNAME=vscode
+ARG USER_UID=1000
+ARG USER_GID=$USER_UID
+
+RUN groupadd --gid $USER_GID $USERNAME \
+ && useradd -s /bin/bash --uid $USER_UID --gid $USER_GID -m $USERNAME \
+ && apt-get update \
+ && apt-get install -y sudo \
+ && echo $USERNAME ALL=\(root\) NOPASSWD:ALL > /etc/sudoers.d/$USERNAME \
+ && chmod 0440 /etc/sudoers.d/$USERNAME
+
+USER $USERNAME
+
+# update all packages
+RUN sudo apt-get update && sudo apt-get upgrade -y
+
+# install packages
+RUN sudo apt-get install -y python3-pip git
+
+# install fast rtps
+RUN sudo apt install -y ros-galactic-rmw-fastrtps-cpp
+
+# update ros dependencies
+RUN rosdep update --rosdistro galactic
+
+# ros configuration
+RUN echo "source /opt/ros/$ROS_DISTRO/setup.sh" >> ~/.bashrc
+RUN echo "export RMW_IMPLEMENTATION=rmw_fastrtps_cpp" >> ~/.bashrc
\ No newline at end of file
diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json
new file mode 100644
index 0000000..fc9001c
--- /dev/null
+++ b/.devcontainer/devcontainer.json
@@ -0,0 +1,42 @@
+// For format details, see https://aka.ms/devcontainer.json. For config options, see the
+// README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu
+{
+
+ "name": "galactic-desktop-focal",
+ // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
+ "build": {
+ "dockerfile": "Dockerfile"
+ },
+
+ // Features to add to the dev container. More info: https://containers.dev/features.
+ "features": {
+ "ghcr.io/devcontainers/features/python:1": {}
+ },
+
+ // Use 'forwardPorts' to make a list of ports inside the container available locally.
+ // "forwardPorts": [],
+
+ "runArgs": ["--network=host"],
+
+ // Use 'postCreateCommand' to run commands after the container is created.
+ "postCreateCommand": "bash .devcontainer/installation.sh",
+ "customizations": {
+ "vscode": {
+ "settings": {
+ "python.defaultInterpreterPath": "/usr/bin/python3"
+ },
+ "extensions": [
+ "mhutchie.git-graph",
+ "ms-azuretools.vscode-docker",
+ "ms-vscode.live-server",
+ "mechatroner.rainbow-csv"
+ ]
+ }
+ }
+
+ // Configure tool-specific properties.
+ // "customizations": {},
+
+ // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
+ // "remoteUser": "root"
+}
diff --git a/.devcontainer/installation.sh b/.devcontainer/installation.sh
new file mode 100644
index 0000000..98b93b1
--- /dev/null
+++ b/.devcontainer/installation.sh
@@ -0,0 +1,4 @@
+sudo apt-get update && sudo apt-get upgrade -y \
+&& pip3 install --user -r .devcontainer/requirements.txt \
+&& colcon build \
+&& echo "source install/setup.bash" >> ~/.bashrc
\ No newline at end of file
diff --git a/.devcontainer/requirements.txt b/.devcontainer/requirements.txt
new file mode 100644
index 0000000..2f702d8
--- /dev/null
+++ b/.devcontainer/requirements.txt
@@ -0,0 +1,6 @@
+pyyaml==5.3.1
+rosbags==0.9.22
+pathlib==1.0.1
+plotly==5.18.0
+pandas==2.0.3
+numpy==1.24.4
\ No newline at end of file
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a6335b4
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+data/*
+src/__pycache__/*
+src/*.html
+log/*
+build/*
+install/*
+plot/*
+csv/*
\ No newline at end of file
diff --git a/LICENSE b/LICENSE.md
similarity index 100%
rename from LICENSE
rename to LICENSE.md
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..284148e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,71 @@
+# ROS2 Rosbag Fault Injection
+
+![Docker](https://img.shields.io/badge/docker-%230db7ed.svg?style=for-the-badge&logo=docker&logoColor=white)
+![Visual Studio Code](https://img.shields.io/badge/Visual%20Studio%20Code-0078d7.svg?style=for-the-badge&logo=visual-studio-code&logoColor=white)
+![ROS](https://img.shields.io/badge/ros-%230A0FF9.svg?style=for-the-badge&logo=ros&logoColor=white)
+![Python](https://img.shields.io/badge/python-3670A0?style=for-the-badge&logo=python&logoColor=ffdd54)
+![Shell Script](https://img.shields.io/badge/shell_script-%23121011.svg?style=for-the-badge&logo=gnu-bash&logoColor=white)
+
+Fault injection for [ROS2](https://github.com/ros2) rosbags that supports recording, modifying, and plotting the fault injected rosbags.
+
+## Getting Started
+
+### Prerequisite (at host)
+
+* Visual Code
+* Docker
+ * If you have a Docker Desktop license, Docker Desktop >= 4.27.2
+ * If you don't have a Docker Desktop license, [Docker Engine on WSL2](https://www.youtube.com/redirect?event=video_description&redir_token=QUFFLUhqbU8zY0gzaUpIak1nWWM5NExPNlhDYjdYRGh1QXxBQ3Jtc0trMjgzWVZzeGxXSnMtSEpUNlBxeXAwdWg3a29iaVlJcVVIZEdMbzVaeVhOQ3NNSllSa3JnVXlDVVZMLUY2ZzJxSGZQdWNmSDhjRVJWVTYwbFFmRzhzSnJoQ05MNktGRUlCRjdqTDI3OXJ2T2NLVVBhcw&q=https%3A%2F%2Fdocs.docker.com%2Fengine%2Finstall%2Fubuntu%2F%23install-using-the-convenience-script&v=SDk3pqFXgs8) (Docker Engine != Docker Desktop)
+ * In case of another installation, make sure you have deleted the previous Docker completely
+ * After installation, check if Docker Container Engine is running well
+ ```
+ # to check if docker container engine is running
+ sudo service docker status
+ ```
+* Python >= 3.8.10
+* The user must be added in the `docker` group with sudo previledge
+ ```
+ # add a group named docker
+ sudo gropuadd docker
+
+ # add the current user to the docker group with sudo privilege
+ sudo gpasswd -a $USER docker
+
+ # change the current user's primary group to docker
+ newgrp docker
+ ```
+
+### Setup
+
+To clone the repo:
+```
+git clone https://github.com/boschresearch/rosbag-fault-injection.git
+```
+
+To open the Visual Code:
+```
+# change directory to the project
+cd rosbag-fault-injection
+
+# open visual code
+code .
+```
+
+The project contains `devcontainer` that runs [ROS2 Galactic](https://docs.ros.org/en/galactic/index.html) as a container with all required dependencies and initial setup.
+
+To open the `devcontainer`, press `Ctrl`+`Shift`+`P` from Visual Code and select `Reopen in Container`. See [Visual Studio Blog](https://code.visualstudio.com/blogs/2020/07/01/containers-wsl#_getting-started) for visual guidelines.
+
+After `devcontainer` is up and running successfully, you can open a `bash` terminal with `vscode` user name. And `ros2` commands are usable right away. :)
+
+## Fault injection
+
+WIP
+
+## Trouble Shooting
+Q1. Opening the container fails with `... "docker-credential-desktop.exe": executable file not found in $PATH, out: ...`.
+
+A1. Delete the line with `credsStore` from `~/.docker/config.json` on WSL2.
+
+## License
+
+This project is licenced under the Apache-2.0 license. See the [LICENSE](LICENSE) file for more details.
diff --git a/app/interfaces/CMakeLists.txt b/app/interfaces/CMakeLists.txt
new file mode 100644
index 0000000..bf5886c
--- /dev/null
+++ b/app/interfaces/CMakeLists.txt
@@ -0,0 +1,21 @@
+cmake_minimum_required(VERSION 3.8)
+project(interfaces)
+
+if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ add_compile_options(-Wall -Wextra -Wpedantic)
+endif()
+
+# find dependencies
+find_package(ament_cmake REQUIRED)
+find_package(std_msgs REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/BoolWithHeader.msg"
+ "msg/IntWithHeader.msg"
+ DEPENDENCIES std_msgs
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+
+ament_package()
diff --git a/app/interfaces/msg/BoolWithHeader.msg b/app/interfaces/msg/BoolWithHeader.msg
new file mode 100644
index 0000000..9a9b960
--- /dev/null
+++ b/app/interfaces/msg/BoolWithHeader.msg
@@ -0,0 +1,2 @@
+std_msgs/Header header
+bool data
\ No newline at end of file
diff --git a/app/interfaces/msg/IntWithHeader.msg b/app/interfaces/msg/IntWithHeader.msg
new file mode 100644
index 0000000..b4e7bf3
--- /dev/null
+++ b/app/interfaces/msg/IntWithHeader.msg
@@ -0,0 +1,2 @@
+std_msgs/Header header
+int32 data
\ No newline at end of file
diff --git a/app/interfaces/package.xml b/app/interfaces/package.xml
new file mode 100644
index 0000000..ce1dbbe
--- /dev/null
+++ b/app/interfaces/package.xml
@@ -0,0 +1,26 @@
+
+
+
+ interfaces
+ 0.0.0
+ Interfaces
+ vscode
+ Apache License 2.0
+
+ ament_cmake
+
+ rosidl_default_generators
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+ std_msgs
+
+ ament_lint_auto
+ ament_lint_common
+
+
+ ament_cmake
+
+
diff --git a/app/num_generator/launch/all_launch.py b/app/num_generator/launch/all_launch.py
new file mode 100644
index 0000000..92b69cf
--- /dev/null
+++ b/app/num_generator/launch/all_launch.py
@@ -0,0 +1,27 @@
+from launch import LaunchDescription
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ return LaunchDescription([
+ Node(
+ package='num_generator',
+ executable='generator',
+ name='generator',
+ ),
+ Node(
+ package='num_generator',
+ executable='multiplier',
+ name='multiplier',
+ ),
+ Node(
+ package='num_generator',
+ executable='feedback',
+ name='feedback',
+ ),
+ Node(
+ package='num_generator',
+ executable='initialiser',
+ name='initialiser',
+ )
+ ])
diff --git a/app/num_generator/num_generator/__init__.py b/app/num_generator/num_generator/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/app/num_generator/num_generator/feedback.py b/app/num_generator/num_generator/feedback.py
new file mode 100644
index 0000000..dfd408a
--- /dev/null
+++ b/app/num_generator/num_generator/feedback.py
@@ -0,0 +1,67 @@
+"""" This node is used to provide feedback to the generator
+if a condition is fulfilled"""
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+import rclpy
+import time
+from rclpy.node import Node
+from std_msgs.msg import Header
+from interfaces.msg import BoolWithHeader, IntWithHeader
+
+
+class Feedback(Node):
+
+ def __init__(self):
+ super().__init__('Feedback')
+ self.reset_signal = True
+ self.subscription = self.create_subscription(
+ IntWithHeader, 'multiplier2feedback', self.num_feedback, 1)
+ self.publisher = self.create_publisher(BoolWithHeader, 'feedback2generator', 1)
+
+ def num_feedback(self, msg):
+ # create message
+ bool_msg = BoolWithHeader()
+
+ # header
+ header = Header()
+ header.stamp = self.get_clock().now().to_msg()
+ bool_msg.header = header
+
+ # data
+ if msg.data < 10:
+ bool_msg.data = self.reset_signal = False # if condition not met set to False
+ else:
+ bool_msg.data = self.reset_signal = True # if condition met set to True
+
+ print(bool_msg.data)
+
+ # publish
+ self.publisher.publish(bool_msg)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = Feedback()
+ try:
+ # sleep statement to spin the node after 0.2 seconds
+ time.sleep(0.2)
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/app/num_generator/num_generator/generator.py b/app/num_generator/num_generator/generator.py
new file mode 100644
index 0000000..b889159
--- /dev/null
+++ b/app/num_generator/num_generator/generator.py
@@ -0,0 +1,81 @@
+"""" This node is used to publish the numbers on two different topics """
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+import rclpy
+import time
+from rclpy.node import Node
+from std_msgs.msg import Header
+from interfaces.msg import BoolWithHeader, IntWithHeader
+
+
+class NumberGenerator(Node):
+
+ def __init__(self):
+ super().__init__('NumberGenerator')
+ self.publisher_multiplier = self.create_publisher(IntWithHeader, 'generator2multiplier', 1)
+ self.publisher_initialiser = self.create_publisher(IntWithHeader, 'generator2initializer', 1)
+ self.subscription = self.create_subscription(
+ BoolWithHeader, 'feedback2generator', self.num_feedback, 1)
+ self.subscription = self.create_subscription(
+ IntWithHeader, 'initializer2generator', self.num_y_generator, 1)
+ self.timer = self.create_timer(0.05, self.timer_callback) # 0.05 seconds can be changed by user
+ # initialising variables
+ self.x = 1
+ self.initialiser2generator = 0
+
+ def timer_callback(self): # Timer callback for publishing x values
+ # create message
+ msg = IntWithHeader()
+
+ # header
+ header = Header()
+ header.stamp = self.get_clock().now().to_msg()
+ msg.header = header
+
+ # data
+ msg.data = self.x
+
+ print(msg.data)
+
+ # publish
+ self.publisher_multiplier.publish(msg)
+ self.publisher_initialiser.publish(msg)
+
+ # update
+ self.x += 1
+
+ def num_feedback(self, msg): # Bool output
+ if msg.data is True:
+ self.x = self.initialiser2generator
+ print("The value of x is:", self.x)
+
+ def num_y_generator(self, msg_y): # Int32 Output
+ self.initialiser2generator = msg_y.data
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = NumberGenerator()
+ try:
+ # sleep statement to spin the node after 0.5 seconds
+ time.sleep(0.5)
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/app/num_generator/num_generator/initialiser.py b/app/num_generator/num_generator/initialiser.py
new file mode 100644
index 0000000..7d93d62
--- /dev/null
+++ b/app/num_generator/num_generator/initialiser.py
@@ -0,0 +1,68 @@
+""" This node initliases the value of y and publishes
+it depending on the condition"""
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+import rclpy
+import time
+from rclpy.node import Node
+from std_msgs.msg import Header
+from interfaces.msg import IntWithHeader
+
+
+class Initialiser(Node):
+
+ def __init__(self):
+ super().__init__('Initialiser')
+ self.publisher_generator = self.create_publisher(IntWithHeader, 'initializer2generator', 1)
+ self.subscription = self.create_subscription(
+ IntWithHeader, 'generator2initializer', self.num_initialiser, 1)
+ # initialising variables
+ self.y = 1
+
+ def num_initialiser(self, msg):
+ # create message
+ y_value = IntWithHeader()
+
+ # header
+ header = Header()
+ header.stamp = self.get_clock().now().to_msg()
+ y_value.header = header
+
+ # data
+ if msg.data < 0:
+ y_value.data = self.y = 1
+ else:
+ y_value.data = self.y = self.y - msg.data
+
+ print(self.y)
+
+ # publish
+ self.publisher_generator.publish(y_value)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = Initialiser()
+ try:
+ # sleep statement to spin the node after 0.2 seconds
+ time.sleep(0.2)
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/app/num_generator/num_generator/multiplier.py b/app/num_generator/num_generator/multiplier.py
new file mode 100644
index 0000000..a0327ef
--- /dev/null
+++ b/app/num_generator/num_generator/multiplier.py
@@ -0,0 +1,62 @@
+"""" This node is used to multiply the number by a factor of 2 """
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+import rclpy
+import time
+from rclpy.node import Node
+from std_msgs.msg import Header
+from interfaces.msg import IntWithHeader
+
+
+class NumberMultiplier(Node):
+
+ def __init__(self):
+ super().__init__('NumberMultiplier')
+ self.subscription = self.create_subscription(
+ IntWithHeader, 'generator2multiplier', self.num_multiplier, 1)
+ self.publisher = self.create_publisher(IntWithHeader, 'multiplier2feedback', 1)
+
+ def num_multiplier(self, msg):
+ # create message
+ new_msg = IntWithHeader()
+
+ # header
+ header = Header()
+ header.stamp = self.get_clock().now().to_msg()
+ new_msg.header = header
+
+ # data
+ new_msg.data = self.z = msg.data * 2
+
+ print(new_msg.data)
+
+ # publish
+ self.publisher.publish(new_msg)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = NumberMultiplier()
+ try:
+ # sleep statement to spin the node after 0.2 seconds
+ time.sleep(0.2)
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/app/num_generator/package.xml b/app/num_generator/package.xml
new file mode 100644
index 0000000..cd96da7
--- /dev/null
+++ b/app/num_generator/package.xml
@@ -0,0 +1,23 @@
+
+
+
+ num_generator
+ 0.0.0
+ Number generator
+ vscode
+ Apache License 2.0
+
+ rclpy
+ std_msgs
+ ros2launch
+ interfaces
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/app/num_generator/resource/num_generator b/app/num_generator/resource/num_generator
new file mode 100644
index 0000000..e69de29
diff --git a/app/num_generator/setup.cfg b/app/num_generator/setup.cfg
new file mode 100644
index 0000000..393c43f
--- /dev/null
+++ b/app/num_generator/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/num_generator
+[install]
+install_scripts=$base/lib/num_generator
diff --git a/app/num_generator/setup.py b/app/num_generator/setup.py
new file mode 100644
index 0000000..ca49529
--- /dev/null
+++ b/app/num_generator/setup.py
@@ -0,0 +1,32 @@
+import os
+from glob import glob
+from setuptools import setup
+
+package_name = 'num_generator'
+
+setup(
+ name=package_name,
+ version='0.0.0',
+ packages=[package_name],
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ ('share/' + package_name, ['package.xml']),
+ (os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*launch.[pxy][yma]*')))
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='vscode',
+ maintainer_email='MinHee.Jo@de.bosch.com',
+ description='Number Generator',
+ license='Apache License 2.0',
+ tests_require=['pytest'],
+ entry_points={
+ 'console_scripts': [
+ 'generator = num_generator.generator:main',
+ 'multiplier = num_generator.multiplier:main',
+ 'feedback = num_generator.feedback:main',
+ 'initialiser = num_generator.initialiser:main',
+ ],
+ },
+)
diff --git a/app/num_generator/test/test_copyright.py b/app/num_generator/test/test_copyright.py
new file mode 100644
index 0000000..cc8ff03
--- /dev/null
+++ b/app/num_generator/test/test_copyright.py
@@ -0,0 +1,23 @@
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_copyright.main import main
+import pytest
+
+
+@pytest.mark.copyright
+@pytest.mark.linter
+def test_copyright():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found errors'
diff --git a/app/num_generator/test/test_flake8.py b/app/num_generator/test/test_flake8.py
new file mode 100644
index 0000000..27ee107
--- /dev/null
+++ b/app/num_generator/test/test_flake8.py
@@ -0,0 +1,25 @@
+# Copyright 2017 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_flake8.main import main_with_errors
+import pytest
+
+
+@pytest.mark.flake8
+@pytest.mark.linter
+def test_flake8():
+ rc, errors = main_with_errors(argv=[])
+ assert rc == 0, \
+ 'Found %d code style errors / warnings:\n' % len(errors) + \
+ '\n'.join(errors)
diff --git a/app/num_generator/test/test_pep257.py b/app/num_generator/test/test_pep257.py
new file mode 100644
index 0000000..b234a38
--- /dev/null
+++ b/app/num_generator/test/test_pep257.py
@@ -0,0 +1,23 @@
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_pep257.main import main
+import pytest
+
+
+@pytest.mark.linter
+@pytest.mark.pep257
+def test_pep257():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found code style errors / warnings'
diff --git a/src/config.yaml b/src/config.yaml
new file mode 100644
index 0000000..806ef18
--- /dev/null
+++ b/src/config.yaml
@@ -0,0 +1,45 @@
+# fault type | data type | description
+# ----------------------------------------------------------------------------------------------------------------
+# add | int / float | add a constant value x
+# replace | int / float | replace with a constant value x
+# random_add | int / float | add random values in [x,y] for each timestamp
+# random_replace | int / float | replace with random values in [x,y] for each timestamp
+
+# filter type | description
+# ----------------------------------------------------------------------------------------------------------------
+# show_amplitude_greater_than_or_equal_to | show only amplitude greater than or equal to a constant value x
+# show_amplitude_less_than_or_equal_to | show only amplitude smaller than or equal to a constant value x
+# show_amplitude_between | show only amplitude in between [x,y]
+
+seed: 100
+
+custom_message_path:
+ - /app/interfaces/msg/BoolWithHeader.msg
+ - /app/interfaces/msg/IntWithHeader.msg
+
+parameters:
+ - input: "./data/input/"
+ topic_mask_filter: [generator2multiplier, multiplier2feedback]
+ topics:
+ - topic: generator2multiplier
+ type: .data
+ fault_type: add
+ fault_value: 10
+ start_after_sec: 1.5
+ duration_sec: 7
+ - topic: multiplier2feedback
+ type: .data
+ fault_type: add
+ fault_value: 3
+ filters:
+ - topic: generator2multiplier
+ type: data
+ filter_type: show_amplitude_between
+ filter_value: 5, 13
+ filter_window_size: 3
+ pass_size: 2
+ - topic: multiplier2feedback
+ type: data
+ filter_type: show_amplitude_greater_than_or_equal_to
+ filter_value: 5
+ filter_window_size: 0
\ No newline at end of file
diff --git a/src/fault_injection.py b/src/fault_injection.py
new file mode 100644
index 0000000..a7ee9c3
--- /dev/null
+++ b/src/fault_injection.py
@@ -0,0 +1,213 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+from rosbags.interfaces import ConnectionExtRosbag2
+from rosbags.rosbag2 import Reader, Writer
+from rosbags.typesys import Stores, get_types_from_msg, get_typestore
+from typing import cast
+from pathlib import Path
+import fault_types
+import read_yaml
+import random
+
+
+class Fault_Injection:
+ """
+ Inject faults to rosbags according to the parameters configured in
+ YAML file.
+ """
+ def __init__(self, config_path: str):
+ """
+ Initialize Fault_Injection class and its variables.
+
+ Keyword arguments:
+ config_path -- relative path to configuration YAML file
+ """
+ self.cur_dir = Path.cwd()
+
+ self.yaml = read_yaml.Yaml(self.cur_dir, config_path)
+
+ # read config.yaml
+ self.yaml.read_yaml()
+
+ # read seed
+ seed = self.yaml.get_seed()
+ # set random seed
+ random.seed(seed)
+
+ # read parameters
+ self.parameters = self.yaml.get_params()
+
+
+ def inject(self):
+ """
+ Inject faults as configured in YAML file.
+ """
+ # input from the first parameter set
+ first_input = self.yaml.get_input(self.parameters[0])
+ # subdirectories from the input from the first parameter set
+ input_as_parent_dir = self.yaml.get_subdirectory(first_input)
+
+ # inject faults to all rosbags in directory
+ if input_as_parent_dir:
+ for rosbag in input_as_parent_dir:
+ # read each parameters for the input
+ self.yaml.set_param(self.parameters[0])
+
+ # input rosbag
+ input = self.cur_dir / rosbag
+ # read write rosbag
+ self.read_write_rosbag(input)
+ # inject faults to specified rosbag
+ else:
+ for parameter in self.parameters:
+ # read each parameters for the input
+ self.yaml.set_param(parameter)
+
+ # input rosbag
+ input = self.cur_dir / self.yaml.get_input(parameter)
+ # read write rosbag
+ self.read_write_rosbag(input)
+
+
+ def read_write_rosbag(self, input: Path):
+ """
+ Read the original rosbag and write a new rosbag with faults.
+
+ Keyword arguments:
+ input -- path to rosbag input
+ """
+ print("[Injection]:", input)
+
+ # temporary directory
+ temp = Path(str(input).replace("input", "temp"))
+
+ if temp.exists():
+ raise Exception("Temporary directory already exists in /data/temp")
+
+ print("\t -> ", temp)
+
+ # error handling of wrong config.yaml
+ self.yaml.no_topic()
+ self.yaml.undefined_topics()
+
+ # writing rosbag
+ writer_connections = {}
+ # the first timestap for each type
+ start_timestamps = {}
+
+ # create type store to use if the bag has no message definitions
+ typestore = get_typestore(Stores.ROS2_GALACTIC)
+ add_types = {}
+
+ for pathstr in self.yaml.get_custom_message_path():
+ msgpath = self.cur_dir.joinpath(pathstr)
+ msgdef = msgpath.read_text(encoding='utf-8')
+ msgtype = get_types_from_msg(
+ msgdef,
+ self.yaml.guess_msgtype(msgpath)
+ )
+ add_types.update(msgtype)
+
+ typestore.register(add_types)
+
+ # read input rosbag and write output rosbag
+ with Reader(input) as reader, Writer(temp) as writer:
+ for connection in reader.connections:
+ ext = cast(ConnectionExtRosbag2, connection.ext)
+
+ writer_connections[connection.id] = writer.add_connection(
+ topic=connection.topic,
+ msgtype=connection.msgtype,
+ typestore=typestore,
+ serialization_format=ext.serialization_format,
+ offered_qos_profiles=ext.offered_qos_profiles,
+ )
+
+ # initialize timestamp dict
+ start_timestamps[connection.id] = -1
+
+ for topic in self.yaml.topics:
+ # extract parameters
+ name, type_, fault_type, fault_values, start_after_sec, duration_sec = self.yaml.get_topic_params(topic)
+
+ # skip topics not in topic mask filter
+ if name not in self.yaml.topic_mask_filter:
+ continue
+ # skip if no values are changed
+ elif fault_types.is_empty_operation(fault_type, fault_values):
+ continue
+
+ # message type
+ msg_type = 'msg' + type_
+
+ for connection, timestamp, rawdata in reader.messages():
+ # deserialize the message
+ msg = typestore.deserialize_cdr(rawdata, connection.msgtype)
+
+ if connection.topic == name:
+ # message timestamp in sec (recording timestamp)
+ timestamp_sec = timestamp / 1e9
+
+ # the first message
+ if start_timestamps[connection.id] == -1:
+ # first timestamp
+ start_timestamps[connection.id] = timestamp_sec
+
+ # start and stop time of fault injection
+ start = timestamp_sec + start_after_sec
+ if duration_sec != 0:
+ stop = start + duration_sec
+ else:
+ stop = None
+
+ # original value
+ original_value = eval(msg_type)
+
+ # fault value data type should not contradict message type
+ self.yaml.mismatching_datatype(
+ original_value, fault_values
+ )
+
+ # inject only in between start and stop timestamp
+ if (start <= timestamp_sec and
+ (stop is None or timestamp_sec <= stop)):
+ # operation result
+ result = fault_types.operation(
+ fault_type,
+ original_value,
+ fault_values
+ )
+
+ # update message type value
+ exec(msg_type + '= result')
+
+ # write rosbag
+ writer.write(
+ writer_connections[connection.id],
+ timestamp,
+ typestore.serialize_cdr(msg, connection.msgtype)
+ )
+
+
+if __name__ == "__main__":
+ import sys
+
+ config_path = sys.argv[1]
+
+ fault_inject = Fault_Injection(config_path)
+
+ fault_inject.inject()
+
+ del fault_inject
\ No newline at end of file
diff --git a/src/fault_types.py b/src/fault_types.py
new file mode 100644
index 0000000..c59ece3
--- /dev/null
+++ b/src/fault_types.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+from typing import Union
+import random
+
+
+def random_value(range: tuple) -> Union[int, float]:
+ """
+ Return random values according to the data type.
+
+ Keyword arguments:
+ range -- range with minimum and maximum value
+ """
+ if isinstance(range[0], float) or isinstance(range[1], float):
+ return random.uniform(range[0], range[1])
+ elif isinstance(range[0], int) and isinstance(range[1], int):
+ return random.randint(range[0], range[1])
+
+
+def add(base: Union[int, float],
+ offset: Union[int, float]) -> Union[int, float]:
+ """
+ Return added value of base and offset.
+
+ Keyword arguments:
+ base -- base value
+ offset -- offset to add
+ """
+ return base + offset
+
+
+def random_add(base: Union[int, float], range: tuple) -> Union[int, float]:
+ """
+ Return added value of base and random offset.
+ The random value is determined according to the data types of the
+ input range values.
+
+ Keyword arguments:
+ base -- base value
+ range -- range with minimum and maximum value
+ """
+ random_offset = random_value(range)
+
+ return add(base, random_offset)
+
+
+def is_empty_operation(fault_type: str, fault_values: tuple) -> bool:
+ """
+ Check an empty operation.
+
+ Keyword arguments:
+ fault_type -- fault type
+ fault_values -- fault values
+ """
+ # fault types
+ if fault_type == "add" and fault_values[0] == 0:
+ return True
+ elif (fault_type == "random_add" and
+ fault_values[0] == 0 and
+ fault_values[1] == 0):
+ return True
+ else:
+ return False
+
+
+def operation(fault_type: str,
+ base: Union[int, float],
+ fault_values: tuple) -> Union[int, float]:
+ """
+ Return the operation results according to the fault type.
+
+ Keyword arguments:
+ fault_type -- fault type
+ base -- base value
+ fault_values -- fault values
+ """
+ # fault types
+ if fault_type == "add":
+ result = add(base, fault_values[0])
+ elif fault_type == "replace":
+ result = fault_values[0]
+ elif fault_type == "random_add":
+ result = random_add(base, fault_values)
+ elif fault_type == "random_replace":
+ result = random_value(fault_values)
+ else:
+ raise ValueError("Undefined fault_type")
+
+ return result
\ No newline at end of file
diff --git a/src/inject.sh b/src/inject.sh
new file mode 100644
index 0000000..7fb08a7
--- /dev/null
+++ b/src/inject.sh
@@ -0,0 +1,130 @@
+#!/bin/bash
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+#
+# Author : Min Hee Jo
+# Email : minhee.jo@de.bosch.com
+# Usage : bash inject.sh -h
+# Description : Inject faults to rosbags and plot the results according to user options and config.yaml
+
+# define usage function
+usage() {
+ echo "Usage: $(basename "$0") [-c str] [-e str] [-h] [-p str] [-n]"
+ echo "Options:"
+ echo " -c str The filename of configuration YAML [Default: config.yaml]"
+ echo " !!! The YAML file should be located in ${PWD}/$(dirname "$0")"
+ echo " -e Executable name to re-record the rosbags to see the fault injection effect"
+ echo " -h Print manual"
+ echo " -p str ROS2 package name [Default: num_generator]"
+ echo " -n Disable rosbag play and plotting after the fault injection [Default: Deactivated]"
+}
+
+# default values for options
+YAML_FILENAME="config.yaml"
+PACKAGE="num_generator"
+NO_PLOT=false
+
+# parameter inputs
+while getopts "c:e:hp:n" opt; do
+ case $opt in
+ c)
+ YAML_FILENAME=$OPTARG
+ ;;
+ e)
+ EXECUTABLE_NAME=$OPTARG
+ ;;
+ h)
+ usage
+ exit 1
+ ;;
+ p)
+ PACKAGE=$OPTARG
+ ;;
+ n)
+ NO_PLOT=true
+ ;;
+ \?)
+ echo "Invalid option: -$OPTARG" >&2
+ usage
+ exit 1
+ ;;
+ esac
+done
+
+# remove processed parameters
+shift $((OPTIND - 1))
+
+# always execute at working directory
+cd "$(dirname "$0")/.."
+
+BASE_PATH="${PWD}/$(dirname "$0")"
+YAML_PATH="${BASE_PATH}/${YAML_FILENAME}"
+
+# config file does not exist
+if [ ! -f "${YAML_PATH}" ]; then
+ echo "Config file ${YAML_PATH} do not exist"
+ exit 1
+fi
+
+echo "Injecting faults as specified in ${YAML_PATH}"
+
+# inject fault
+python_result=$(python3 src/fault_injection.py "$YAML_PATH" 2>&1)
+
+if [ $? -ne 0 ]; then
+ echo "Injecting fault encourtered an error: Check if the input rosbag exists"
+ echo "$python_result"
+ exit 1
+else
+ echo "$python_result"
+fi
+
+if [[ "${NO_PLOT}" == "false" ]]; then
+ # executable name is necessary
+ if [ -z "$EXECUTABLE_NAME" ]; then
+ echo "Executable name not given for plotting the result"
+ exit 1
+ fi
+
+ echo "Playing the fault injected rosbag"
+
+ TEMP_FILES=($(find data/temp/* -type d))
+
+ for FILE in "${TEMP_FILES[@]}"; do
+ OUTPUT_FILE=${FILE/"temp"/"output"}
+
+ # record ros2 bag in the background
+ ros2 bag record -o $OUTPUT_FILE -a & sleep 5
+
+ # run specific node
+ ros2 run $PACKAGE $EXECUTABLE_NAME > /dev/null 2>&1 &
+
+ # play fault injected rosbags
+ ros2 bag play $FILE &
+ wait $!
+
+ echo "Stopping the command"
+
+ pkill $EXECUTABLE_NAME
+ pkill ros2
+ done
+
+ echo "Plotting the rosbag"
+
+ # plot rosbag
+ python_result=$(python3 src/plot.py "$YAML_PATH" 2>&1)
+
+ rm -rf data/temp data/output
+
+ if [ $? -ne 0 ]; then
+ echo "Plotting encourtered an error"
+ echo "$python_result"
+ exit 1
+ else
+ echo "$python_result"
+ fi
+fi
\ No newline at end of file
diff --git a/src/plot.py b/src/plot.py
new file mode 100644
index 0000000..88de0e1
--- /dev/null
+++ b/src/plot.py
@@ -0,0 +1,450 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+from rosbags.rosbag2 import Reader
+from rosbags.typesys import Stores, get_types_from_msg, get_typestore
+from pathlib import Path
+from datetime import datetime
+from functools import reduce
+from typing import Tuple
+import plotly.graph_objs as go
+import pandas as pd
+import read_yaml
+import random
+
+
+class Rosbag_Plot:
+ """
+ Create and template a pandas dataframe for original and fault
+ injected rosbags.
+ """
+ def __init__(self):
+ """
+ Initialize Rosbag_Plot class and create figure object.
+ """
+ # figure object
+ self.fig = go.Figure()
+
+
+ def add_scatter_plot(self,
+ x: pd.DataFrame,
+ y: pd.DataFrame,
+ name: str,
+ marker_symbol: str,
+ marker_color: str,
+ marker_size: float = 3.5,
+ maker_opacity: float = 0.8):
+ """
+ Add a scatter plot trace.
+
+ Keyword arguments:
+ x -- x axis data frame
+ y -- y axis data frame
+ name -- trace name
+ marker_symbol -- marker symbol
+ marker_color -- marker color
+ marker_size -- marker size [default: 3.5]
+ maker_opacity -- marker opacity [default: 0.8]
+ """
+ self.fig.add_trace(
+ go.Scatter(
+ x=x, y=y,
+ name=name,
+ mode='markers',
+ marker=dict(
+ symbol=marker_symbol,
+ color=marker_color,
+ size=marker_size,
+ opacity=maker_opacity
+ )
+ )
+ )
+
+
+ def update_layout(self,
+ title: str,
+ xaxis_title: str,
+ yaxis_title: str,
+ title_font_size: int = 25):
+ """
+ Update the layout of the plots.
+
+ Keyword arguments:
+ title -- plot title
+ title_font_size -- title font size [default: 25]
+ xaxis_title -- x axis title
+ yaxis_title -- y axis title
+ """
+ self.fig.update_layout(
+ plot_bgcolor='white',
+ title_text=title,
+ title_font_size=title_font_size,
+ xaxis=dict(
+ title=xaxis_title,
+ tickangle=45,
+ tickformat='%10d'
+ ),
+ yaxis=dict(
+ title=yaxis_title
+ )
+ )
+ self.fig.update_xaxes(
+ mirror=True,
+ ticks='outside',
+ showline=True,
+ linecolor='black',
+ gridcolor='lightgrey'
+ )
+ self.fig.update_yaxes(
+ mirror=True,
+ ticks='outside',
+ showline=True,
+ linecolor='black',
+ gridcolor='lightgrey'
+ )
+
+
+ def write_html(self, path: str):
+ """
+ Write the plots in html file.
+
+ Keyword arguments:
+ path -- path where to save html file
+ """
+ self.fig.write_html(path)
+
+
+class Dataframe_For_Plot:
+ """
+ Extract dataframes from rosbags.
+ """
+ def __init__(self, config_path: str):
+ """
+ Initialize Dataframe_For_Plot class and its variables.
+
+ Keyword arguments:
+ config_path -- relative path to configuration YAML file
+ """
+ self.cur_dir = Path.cwd()
+
+ self.yaml = read_yaml.Yaml(self.cur_dir, config_path)
+
+ # read config.yaml
+ self.yaml.read_yaml()
+
+ # read parameters
+ self.parameters = self.yaml.get_params()
+
+ # plot data
+ plot_df_columns = [
+ 'rosbag',
+ 'topic',
+ 'type',
+ 'timestamp',
+ 'value'
+ ]
+ self.data = [plot_df_columns]
+
+ # filter data
+ filter_df_columns = [
+ 'rosbag',
+ 'topic',
+ 'type',
+ 'timestamp',
+ 'value',
+ 'window_start_timestamp',
+ 'window_end_timestamp'
+ ]
+ self.filtered_df = pd.DataFrame(columns=filter_df_columns)
+
+
+ def get_df(self) -> pd.DataFrame:
+ """
+ Return a dataframe from the original rosbags and fault injected
+ rosbags.
+ """
+ # input from the first parameter set
+ first_input = self.yaml.get_input(self.parameters[0])
+ # subdirectories from the input from the first parameter set
+ input_as_parent_dir = self.yaml.get_subdirectory(first_input)
+
+ # all rosbags in directory
+ if input_as_parent_dir:
+ for rosbag in input_as_parent_dir:
+ # read each parameters for the rosbag
+ self.yaml.set_param(self.parameters[0])
+
+ # input rosbag
+ input = self.cur_dir / rosbag
+ # read every message
+ self.read_rosbag(input, "before")
+
+ # output rosbag
+ output = Path(str(input).replace("input", "output"))
+ # read every message
+ self.read_rosbag(output, "after")
+ # specific rosbag
+ else:
+ for parameter in self.parameters:
+ # read each parameters for the rosbag
+ self.yaml.set_param(parameter)
+
+ # input rosbag
+ input = self.cur_dir / self.yaml.get_input(parameter)
+ # read every message
+ self.read_rosbag(input, "before")
+
+ # output rosbag
+ output = Path(str(input).replace("input", "output"))
+ # read every message
+ self.read_rosbag(output, "after")
+
+ return pd.DataFrame(self.data[1:], columns=self.data[0])
+
+
+ def read_rosbag(self, rosbag: Path, suffix: str):
+ """
+ Read the original rosbag and the fault injected rosbag.
+
+ Keyword arguments:
+ rosbag -- path to the input rosbag
+ suffix -- suffix to differentiate the original and fault
+ injected rosbags
+ """
+ # initialize timestamp offset
+ timestamp_offset = -1
+
+ # create type store to use if the bag has no message definitions
+ typestore = get_typestore(Stores.ROS2_GALACTIC)
+ add_types = {}
+
+ for pathstr in self.yaml.get_custom_message_path():
+ msgpath = self.cur_dir.joinpath(pathstr)
+ msgdef = msgpath.read_text(encoding='utf-8')
+ msgtype = get_types_from_msg(
+ msgdef,
+ self.yaml.guess_msgtype(msgpath)
+ )
+ add_types.update(msgtype)
+
+ typestore.register(add_types)
+
+ # read rosbag
+ with Reader(rosbag) as reader:
+ for topic in self.yaml.topics:
+ # extract parameters
+ name, type_, _, _, _, _ = self.yaml.get_topic_params(topic)
+
+ # skip topics not in topic mask filter
+ if name not in self.yaml.topic_mask_filter:
+ continue
+
+ # message type
+ msg_type = 'msg' + type_
+
+ for connection, timestamp, rawdata in reader.messages():
+ if connection.topic == name:
+ # deserialize the message
+ msg = typestore.deserialize_cdr(
+ rawdata, connection.msgtype
+ )
+
+ if timestamp_offset == -1:
+ timestamp_offset = timestamp
+
+ # data -> ['rosbag', 'topic', 'type', 'timestamp', 'value']
+ self.data.append([
+ rosbag.name + "_" + suffix,
+ name,
+ type_,
+ (timestamp - timestamp_offset) / 1e9,
+ eval(msg_type)
+ ])
+
+
+ def filter_df(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
+ """
+ Return the filtered dataframe and the dataframe for what has
+ been filtered according to the parameters configured in YAML
+ file.
+
+ Keyword arguments:
+ df -- pandas dataframe
+ """
+ # input from the first parameter set
+ first_input = self.yaml.get_input(self.parameters[0])
+ # subdirectories from the input from the first parameter set
+ input_as_parent_dir = self.yaml.get_subdirectory(first_input)
+
+ # all rosbags in directory
+ if input_as_parent_dir:
+ for rosbag in input_as_parent_dir:
+ # read each parameters for the rosbag
+ self.yaml.set_param(self.parameters[0])
+
+ # filter
+ if self.yaml.filters:
+ df = self.filter_rosbag(df, rosbag.name + "_after")
+ # specific rosbag
+ else:
+ for parameter in self.parameters:
+ # read each parameters for the rosbag
+ self.yaml.set_param(parameter)
+
+ # rosbag
+ rosbag = self.yaml.get_input(parameter).rsplit('/', 1)
+
+ # filter
+ if self.yaml.filters:
+ df = self.filter_rosbag(df, rosbag[-1] + "_after")
+
+ return df, self.filtered_df
+
+
+ def filter_rosbag(self, df: pd.DataFrame, rosbag: str) -> pd.DataFrame:
+ """
+ Filter the dataframe according to the parameters configured in
+ YAML file.
+
+ Keyword arguments:
+ df -- pandas dataframe
+ rosbag -- rosbag name
+ """
+ for filter in self.yaml.filters:
+ # extract filter parameters
+ topic, type_, filter_type, filter_values, filter_window_size, pass_size = self.yaml.get_filter_params(filter)
+
+ conditions = [
+ df['rosbag'] == rosbag,
+ df['topic'] == topic,
+ df['type'] == type_
+ ]
+
+ # filter types
+ if filter_type == "show_amplitude_greater_than_or_equal_to":
+ # drop values smaller than or equal as filter_values
+ conditions.append(df['value'] < filter_values[0])
+ elif filter_type == "show_amplitude_less_than_or_equal_to":
+ # drop values bigger than or equal as filter_values
+ conditions.append(df['value'] > filter_values[0])
+ elif filter_type == "show_amplitude_between":
+ # drop values out of range of filter_values
+ conditions.append(
+ (df['value'] < filter_values[0]) |
+ (filter_values[1] < df['value'])
+ )
+ else:
+ raise ValueError("Undefined filter_type")
+
+ combined_condition = reduce(lambda x, y: x & y, conditions)
+
+ # window size given
+ if filter_window_size > 0:
+ # filters on the sliding window
+ for group in df.rolling(window = filter_window_size):
+ # did not meet the pass size
+ if len(group.loc[~combined_condition]) < pass_size:
+ unsatisfied = group.loc[combined_condition].copy()
+
+ unsatisfied['window_start_timestamp'] = group.iloc[0,:]['timestamp']
+ unsatisfied['window_end_timestamp'] = group.iloc[-1,:]['timestamp']
+
+ self.filtered_df = pd.concat([
+ self.filtered_df,
+ unsatisfied
+ ])
+
+ # filter globally
+ df = df[~combined_condition]
+
+ return df
+
+
+if __name__ == '__main__':
+ import sys
+
+ config_path = sys.argv[1]
+
+ dataframe_plot = Dataframe_For_Plot(config_path)
+ df = dataframe_plot.get_df()
+ df, filtered_df = dataframe_plot.filter_df(df)
+
+ # rosbag list
+ rosbags = df['rosbag'].unique()
+
+ # for all rosbags
+ plot_all_result = Rosbag_Plot()
+
+ for rosbag in rosbags:
+ # filter by rosbag
+ rosbag_filtered_df = df[df['rosbag'] == rosbag]
+
+ # topic list
+ topics = rosbag_filtered_df['topic'].unique()
+
+ for topic in topics:
+ # filter again by topic
+ topic_filtered_df = rosbag_filtered_df[
+ (rosbag_filtered_df['topic'] == topic)
+ ]
+
+ # type list
+ types = topic_filtered_df['type'].unique()
+
+ for type_ in types:
+ # filter again by type
+ type_filtered_df = topic_filtered_df[
+ (topic_filtered_df['type'] == type_)
+ ]
+
+ # different color for before and after
+ if rosbag.endswith("before"):
+ r, g, b = random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)
+ symbol = 'diamond'
+ else:
+ r, g, b = random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)
+ symbol = 'circle'
+
+ # recolor when low contrast
+ while any(60 < value < 190 for value in (r, g, b)):
+ r, g, b = random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)
+
+ color = f"rgb({r},{g},{b})"
+
+ plot_all_result.add_scatter_plot(
+ x=type_filtered_df['timestamp'],
+ y=type_filtered_df['value'],
+ name=rosbag + "
" + topic + "
" + type_,
+ marker_symbol=symbol,
+ marker_color=color
+ )
+
+ # all rosbags
+ plot_all_result.update_layout(
+ title="Comparison Between Original and Fault-Injected Topics for all rosbags",
+ xaxis_title="Timestamp (sec)",
+ yaxis_title="Values"
+ )
+
+ # write plot as html
+ date_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ plot_all_file_name = "./plot/plot_" + date_time + ".html"
+ plot_all_result.write_html(plot_all_file_name)
+ print("Plot file written at " + plot_all_file_name)
+
+ csv_file_name = "./csv/filtered_" + date_time + ".csv"
+ filtered_df.to_csv(csv_file_name, index=False)
+
+ del plot_all_result
+ del dataframe_plot
\ No newline at end of file
diff --git a/src/read_yaml.py b/src/read_yaml.py
new file mode 100644
index 0000000..6e86377
--- /dev/null
+++ b/src/read_yaml.py
@@ -0,0 +1,409 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+
+__author__ = "Min Hee Jo"
+__copyright__ = "Copyright 2024, Robert Bosch GmbH"
+__license__ = "BIOSL"
+__version__ = "4.0"
+__email__ = "minhee.jo@de.bosch.com"
+
+from pathlib import Path
+from typing import Tuple, Union
+import yaml
+
+
+class Yaml():
+ """
+ Load, read, format, set the parameters configured in YAML file.
+ """
+ def __init__(self, cur_dir, config_path):
+ """
+ Initialize Yaml class and its variables.
+
+ Keyword arguments:
+ cur_dir -- current working directory
+ config_path -- relative path to configuration YAML file
+ """
+ try:
+ self.cur_dir = cur_dir.resolve(strict=True)
+ except FileNotFoundError:
+ print("Current directory not valid")
+
+ if not isinstance(config_path, str):
+ raise ValueError("Configuration file path is not a string type: {}".format(config_path))
+ self.config_path = config_path
+
+ self.topic_mask_filter = []
+ self.topics = []
+ self.filters = []
+
+
+ @property
+ def DEFAULT_SEED(self):
+ """
+ Return the default value of the seed.
+ """
+ return 1
+
+
+ def read_yaml(self):
+ """
+ Read and load the configuration YAML file.
+ """
+ with open (self.cur_dir / self.config_path, "r") as f:
+ try:
+ self.config = yaml.safe_load(f)
+ except yaml.YAMLError as exc:
+ if hasattr(exc, 'problem_mark'):
+ mark = exc.problem_mark
+ print ("Could not load YAML file due to error (See position: line {} column {})".format(mark.line + 1, mark.column + 1))
+
+
+ def get_seed(self) -> int:
+ """
+ Return the seed parameter.
+ Return the default seed if no seed is configured.
+ """
+ if 'seed' in self.config:
+ seed = self.config['seed']
+ else:
+ seed = self.DEFAULT_SEED()
+
+ if not isinstance(seed, int):
+ raise ValueError("seed parameter is not an integer type: {}".format(seed))
+
+ return seed
+
+
+ def get_custom_message_path(self) -> list:
+ """
+ Return a list of relative paths to custom message files.
+ Return a empty list if no custom message path is configured.
+ """
+ custom_message_path = []
+
+ # optional custom message path parameter
+ if 'custom_message_path' in self.config:
+ custom_message_path = self.config['custom_message_path']
+
+ if not isinstance(custom_message_path, list):
+ raise ValueError("custom_message_path parameter is not a list type: {}".format(custom_message_path))
+
+ # delete the first character if it starts with /
+ return [s[1:] if s.startswith('/') else s for s in custom_message_path]
+ else:
+ return []
+
+
+ def get_params(self) -> list:
+ """
+ Return a list of parameters.
+ """
+ return self.config['parameters']
+
+
+ def get_input(self, parameter: list) -> str:
+ """
+ Return the input parameter
+
+ Keyword arguments:
+ parameter -- list of one parameter set
+ """
+ return parameter['input']
+
+
+ def set_param(self, parameter: list):
+ """
+ Set the subparameters of the input parameter list.
+ Post-process the format of each subparameter.
+
+ Keyword arguments:
+ parameter -- list of one parameter set
+ """
+ # topics masking list
+ self.topic_mask_filter = [x if x.startswith('/') else '/' + x for x in parameter['topic_mask_filter']]
+
+ if not isinstance(self.topic_mask_filter, list):
+ raise ValueError("topic_mask_filter parameter is not a list type: {}".format(self.topic_mask_filter))
+
+ # list of topics
+ self.topics = parameter['topics']
+
+ if not isinstance(self.topics, list):
+ raise ValueError("topics parameter is not a list type: {}".format(self.topics))
+
+ for topic in self.topics:
+ if not isinstance(topic, dict):
+ raise ValueError("topic parameter is not a dict type: {}".format(topic))
+
+ topic['topic'] = topic['topic'] if topic['topic'].startswith('/') else '/' + topic['topic']
+ topic['type'] = topic['type'] if topic['type'].startswith('.') else '.' + topic['type']
+
+ # optional filter parameter
+ if 'filters' in parameter:
+ # list of filters
+ self.filters = parameter['filters']
+
+ if not isinstance(self.filters, list):
+ raise ValueError("filters parameter is not a list type: {}".format(self.filters))
+
+ for filter in self.filters:
+ if not isinstance(filter, dict):
+ raise ValueError("filter parameter is not a dict type: {}".format(filter))
+
+ filter['topic'] = filter['topic'] if filter['topic'].startswith('/') else '/' + filter['topic']
+ filter['type'] = filter['type'] if filter['type'].startswith('.') else '.' + filter['type']
+ else:
+ self.filters = []
+
+
+ def get_topic_params(self, topic: dict) -> Tuple[str, str, str, Tuple[Union[int, float], Union[int, float]], Union[int, float], Union[int, float]]:
+ """
+ Return subparameters of the input topic dictionary.
+
+ Keyword arguments:
+ topic -- dictionary of one topic parameter set
+ """
+ # topic name
+ name = topic['topic']
+
+ if not isinstance(name, str):
+ raise ValueError("topic parameter is not a string type: {}".format(name))
+
+ # message type
+ msg_type = topic['type']
+
+ if not isinstance(msg_type, str):
+ raise ValueError("type parameter is not a string type: {}".format(msg_type))
+
+ # fault type
+ fault_type = topic['fault_type']
+
+ if not isinstance(fault_type, str):
+ raise ValueError("fault_type parameter is not a string type: {}".format(fault_type))
+
+ # fault values
+ fault_value = topic['fault_value']
+
+ # post process fault values
+ if fault_type.startswith('random_'):
+ if not isinstance(fault_value, str):
+ raise ValueError("fault_value parameter is not a range: {}".format(fault_value))
+
+ fault_value = [self.string_to_num(x.strip()) for x in fault_value.split(',')]
+
+ if fault_value[0] > fault_value[1]:
+ raise ValueError("Minimum value of fault_value is bigger than its maximum value in topic {}".format(name))
+
+ fault_values = (fault_value[0], fault_value[1])
+ else:
+ if not isinstance(fault_value, (int, float)):
+ raise ValueError("fault_value parameter is not an integer or a float type: {}".format(fault_value))
+
+ fault_values = (self.string_to_num(fault_value), None)
+
+ # start fault injection after time in seconds
+ if 'start_after_sec' in topic:
+ start_after_sec = topic['start_after_sec']
+
+ if not isinstance(start_after_sec, (int, float)):
+ raise ValueError("start_after_sec parameter is not an integer or a float type: {}".format(start_after_sec))
+ elif start_after_sec < 0:
+ raise ValueError("start_after_sec parameter is a negative value: {}".format(start_after_sec))
+ else:
+ start_after_sec = 0
+
+ # fault injection duration in seconds
+ if 'duration_sec' in topic:
+ duration_sec = topic['duration_sec']
+
+ if not isinstance(duration_sec, (int, float)):
+ raise ValueError("duration_sec parameter is not an integer or a float type: {}".format(duration_sec))
+ elif duration_sec < 0:
+ raise ValueError("duration_sec parameter is a negative value: {}".format(duration_sec))
+ else:
+ duration_sec = 0
+
+ return name, msg_type, fault_type, fault_values, start_after_sec, duration_sec
+
+
+ def get_filter_params(self, filter: list) -> Tuple[str, str, str, Tuple[Union[int, float], Union[int, float]], int, int]:
+ """
+ Return subparameters of the input filter list.
+
+ Keyword arguments:
+ filter -- list of filter parameter set
+ """
+ # topic name
+ name = filter['topic']
+
+ if not isinstance(name, str):
+ raise ValueError("topic parameter is not a string type: {}".format(name))
+
+ # message type
+ msg_type = filter['type']
+
+ if not isinstance(msg_type, str):
+ raise ValueError("type parameter is not a string type: {}".format(msg_type))
+
+ # filter type
+ filter_type = filter['filter_type']
+
+ if not isinstance(filter_type, str):
+ raise ValueError("filter_type parameter is not a string type: {}".format(filter_type))
+
+ # filter values
+ filter_value = filter['filter_value']
+
+ # post process filter values
+ if filter_type.endswith('_between'):
+ if not isinstance(filter_value, str):
+ raise ValueError("filter_value parameter is not a range: {}".format(filter_value))
+
+ filter_value = [self.string_to_num(x.strip()) for x in filter_value.split(',')]
+
+ if filter_value[0] > filter_value[1]:
+ raise ValueError("Minimum value of filter_value is bigger than its maximum value in topic {}".format(name))
+
+ filter_values = (filter_value[0], filter_value[1])
+ else:
+ if not isinstance(filter_value, (int, float)):
+ raise ValueError("filter_value parameter is not an integer or a float type: {}".format(filter_value))
+
+ filter_values = (self.string_to_num(filter_value), None)
+
+ # window size for filtering
+ filter_window_size = filter['filter_window_size']
+
+ if not isinstance(filter_window_size, int):
+ raise ValueError("filter_window_size parameter is not an integer type: {}".format(filter_window_size))
+ elif filter_window_size < 0:
+ raise ValueError("filter_window_size parameter is a negative value: {}".format(filter_window_size))
+
+ # pass criteria within window size
+ if 0 < filter_window_size:
+ pass_size = filter['pass_size']
+ else:
+ pass_size = 0
+
+ if not isinstance(pass_size, int):
+ raise ValueError("pass_size parameter is not an integer type: {}".format(pass_size))
+ elif pass_size < 0:
+ raise ValueError("pass_size parameter is a negative value: {}".format(pass_size))
+ elif 0 < filter_window_size < pass_size:
+ raise ValueError("pass_size parameter is bigger than the filter_window_size: {} > {}".format(pass_size, filter_window_size))
+
+ return name, msg_type, filter_type, filter_values, filter_window_size, pass_size
+
+
+ def string_to_num(self, s):
+ """
+ Return the string as integer or float.
+
+ Keyword arguments:
+ s -- string to convert
+ """
+ if isinstance(s, (int, float)):
+ return s
+ elif s.isdigit():
+ return int(s)
+ else:
+ return float(s)
+
+
+ def get_subdirectory(self, path: str) -> list:
+ """
+ Return a list of subdirectories of rosbags under the input path.
+
+ Keyword arguments:
+ path -- parent path
+ """
+ path = Path(path)
+
+ subdirectories = list(path.glob('./*/*.db3'))
+ subdirectories = [x.parent for x in subdirectories]
+
+ return subdirectories
+
+
+ def no_topic(self):
+ """
+ Check if there is no topics to inject faults.
+ """
+ if not self.topic_mask_filter:
+ raise Exception("No faults to inject")
+
+
+ def undefined_topics(self):
+ """
+ Check if there are undefined topics to inject faults.
+ """
+ # get only topic names from topics
+ topic_names = self.get_values_list_of_dict(self.topics, "topic")
+
+ # check undefined topics in topic mask filter
+ undefined = [x for x in self.topic_mask_filter if x not in set(topic_names)]
+
+ if undefined:
+ raise Exception("Undefined topics is in topic_mask_filter")
+
+
+ def get_values_list_of_dict(self, list_of_dicts: list, key) -> list:
+ """
+ Return a list of all values in a dictionary that matches with
+ the input key.
+
+ Keyword arguments:
+ list_of_dicts -- list of dictionaries
+ key -- key to search in a dictionary
+ """
+ nth_values = []
+ for dict in list_of_dicts:
+ nth_values.append(dict[key])
+
+ return nth_values
+
+
+ def mismatching_datatype(self, s: str, fault_values: tuple):
+ """
+ Check if fault value types are mismaching the message definition
+ data types.
+
+ Keyword arguments:
+ s -- base string
+ fault_values -- fault values
+ """
+ string_in_num = self.string_to_num(s)
+
+ # fault_value given as range
+ if all(fault_values):
+ if type(fault_values[0]) == type(fault_values[1]) == type(string_in_num):
+ pass
+ else:
+ raise ValueError("The data type of fault_value parameter {} should be: {}".format(fault_values, type(string_in_num)))
+ # fault_value given as value
+ else:
+ if type(fault_values[0]) == type(string_in_num):
+ pass
+ else:
+ raise ValueError("The data type of fault_value parameter {} should be: {}".format(fault_values, type(string_in_num)))
+
+
+ def guess_msgtype(self, path: Path) -> str:
+ """
+ Guess the message type from a message file path.
+
+ Keyword arguments:
+ path -- path to message file path
+ """
+ name = path.relative_to(path.parents[2]).with_suffix('')
+
+ if 'msg' not in name.parts:
+ name = name.parent / 'msg' / name.name
+
+ return str(name)
\ No newline at end of file
diff --git a/src/record.sh b/src/record.sh
new file mode 100644
index 0000000..124d38f
--- /dev/null
+++ b/src/record.sh
@@ -0,0 +1,119 @@
+#!/bin/bash
+#
+# Copyright (c) 2024 Robert Bosch GmbH and its subsidiaries.
+# This program and the accompanying materials are made available under
+# the terms of the Bosch Internal Open Source License v4
+# which accompanies this distribution, and is available at
+# http://bios.intranet.bosch.com/bioslv4.txt
+#
+# Author : Min Hee Jo
+# Email : minhee.jo@de.bosch.com
+# Usage : bash record.sh -h
+# Description : Record a new rosbag with user options
+
+# define usage function
+usage() {
+ echo "Usage: $(basename "$0") [-d value] [-h] [-l str] [-p str] [-y]"
+ echo "Options:"
+ echo " -d value The duration for recording the rosbag (unit: seconds) [Default: 20]"
+ echo " -h Print manual"
+ echo " -l str ROS2 launch file name [Default: all_launch.py]"
+ echo " -p str ROS2 package name [Default: num_generator]"
+ echo " -y Auto-confirm the parameters [Default: Deactivated]"
+}
+
+# default values for options
+DURATION=20
+LAUNCH_FILE="all_launch.py"
+PACKAGE="num_generator"
+
+# parameter inputs
+while getopts "d:hl:p:y" opt; do
+ case $opt in
+ d)
+ DURATION=$OPTARG
+ ;;
+ h)
+ usage
+ exit 1
+ ;;
+ l)
+ LAUNCH_FILE=$OPTARG
+ ;;
+ p)
+ PACKAGE=$OPTARG
+ ;;
+ y)
+ CONFIRM="Y"
+ ;;
+ \?)
+ echo "Invalid option: -$OPTARG" >&2
+ usage
+ exit 1
+ ;;
+ esac
+done
+
+# remove processed parameters
+shift $((OPTIND - 1))
+
+# always execute at working directory
+cd "$(dirname "$0")/.."
+
+# bag file directory
+BAG_DIR="${PWD}/data/input"
+
+echo "Approximate recording duration: ${DURATION} seconds"
+echo "Launch file: ${LAUNCH_FILE}"
+echo "Package name: ${PACKAGE}"
+echo "Recording the rosbags in $BAG_DIR"
+
+while true; do
+ # if not defined
+ if [ -z ${CONFIRM} ]; then
+ # check the parameter
+ read -p "Are the parameters correct? (Y/N) " CONFIRM
+ fi
+
+ case "$CONFIRM" in
+ [Yy])
+ # make directory if not exists
+ if [[ ! -e $BAG_DIR ]]; then
+ mkdir -p $BAG_DIR
+ fi
+ cd $BAG_DIR
+
+ # record ros2 bag in the background
+ ros2 bag record --max-cache-size 0 -a & sleep 5
+ cd -
+
+ # launch ros2 nodes in the background
+ ros2 launch $PACKAGE $LAUNCH_FILE & sleep $DURATION
+
+ echo "Stopping the command"
+
+ kill %1 # kill the ROS2 launch process
+ kill %2 # kill the ROS2 bag record process
+
+ NODE_LIST=$(ros2 node list)
+
+ for NODE in $NODE_LIST;
+ do
+ # remove slash in front of the node name
+ # kill running node
+ pkill ${NODE:1}
+ done
+
+ echo "Rosbag recording terminated"
+ break
+ ;;
+ [Nn])
+ echo "Exiting the shell"
+ break
+ ;;
+ *)
+ echo "Invalid input: Please enter Y or N"
+ read -p "Are the parameters correct? (Y/N) " CONFIRM
+ ;;
+ esac
+done
\ No newline at end of file