Skip to content

Commit

Permalink
feat(bag2lanelet): add bag2lanelet package
Browse files Browse the repository at this point in the history
Signed-off-by: Takamasa Horibe <[email protected]>
  • Loading branch information
TakaHoribe committed Feb 16, 2024
1 parent bd786f4 commit 550528d
Show file tree
Hide file tree
Showing 12 changed files with 2,081 additions and 0 deletions.
95 changes: 95 additions & 0 deletions bag2lanelet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# bag2lanelet

This package generates a lanelet map necessary for Autoware's autonomous driving from rosbag data containing information about Localization (`/tf`). This enables autonomous driving based on manual driving information.

The provided functionalities are as follows:

- bag2lanelet.py: Generates lanelet (.osm) from a rosbag based on the position of `base_link`.
- bag2trajectory.py: Generates trajectory information (.csv) for vector_map_builder from a rosbag.

## Example


As an example, the process of lanelet generation based on driving trajectories from the planning simulator is performed as follows. Typically, the expectation is to use rosbag data from manual driving, rather than from the planning simulator.


Firstly, you need to run the planning_simulator following the [planning_simulator tutorial in Autoware Documentation](https://autowarefoundation.github.io/autoware-documentation/main/tutorials/ad-hoc-simulation/planning-simulation/). The process would be, install Autoawre, download the maps, run the planning_simulator, and start autonomous driving. Make sure to save the rosbag during this driving session using the following command:

Check warning on line 16 in bag2lanelet/README.md

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (Autoawre)

```sh
ros2 bag record /tf -o /tmp/bag2lanelet_sample.bag
```



<p align="center">
<img src="./image/psim-kashiwanoha-sample-map.png" alt="kashiwa" width="800">
</p>



After completing the drive, you can run the `bag2lanelet.py` script. This requires specifying the output directory, lane width and MGRS coordinates:

```sh
./bag2lanelet.py /tmp/bag2lanelet_sample.bag /tmp/bag2lanelet_sample -l 3.0 -m 54SUE
```

The map will be saved in the specified directory, following the naming convention `<date>-lanelet2_map.osm`. The map generated will appear like this. You can see the example result in [./example/lanelet2_map.osm](./example/lanelet2_map.osm).

When you relaunch the planning_simulator with the new lanelet2 map, you will see the following.

<p align="center">
<img src="./image/psim-kashiwanoha-bag2lanelet-original-map.png" alt="kashiwa" width="800">
</p>


Please note that at this stage, although this map works with Autoware, the shape of the lanes will appear jagged. (Refer to the 'Limitations' section for more details.) While this is an issue that should be addressed in the future, it can currently be resolved by loading it in [Vector Map Builder](https://autowarefoundation.github.io/autoware-documentation/main/how-to-guides/integrating-autoware/creating-maps/creating-vector-map/#vector-map-builder) as follows.


Following the documentation of the [Vector Map Builder](https://autowarefoundation.github.io/autoware-documentation/main/how-to-guides/integrating-autoware/creating-maps/creating-vector-map/#vector-map-builder), import the generated Lanelet2 map. You can see the refined lane on the application.

<p align="center">
<img src="./image/vmb-kashiwanoha-bag2lanelet-map.png" alt="kashiwa" width="800">
</p>

Then, Export the map. You can run the planning_simulator with the refined lanelet2 map and see how it goes on the Rviz.

<p align="center">
<img src="./image/psim-kashiwanoha-bag2lanelet-vmb-map.png" alt="kashiwa" width="800">
</p>


## Requirements

```
sudo apt update
sudo apt install ros-humble-tf-transformations ros-humble-tf-transformations
pip install -r requirements.txt
```

## Usage

Check `./bag2lanelet.py --help`

### generate lanelet2 file

```
./bag2lanelet.py /home/autoware/rosbag/sample . --width=3.0
```
or with MGRS code at Monza Track

Check warning on line 78 in bag2lanelet/README.md

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (Monza)

```
./bag2lanelet.py /home/autoware/rosbag/sample . --width=3.0 --mgrs 32TNR219517
```

### generate trajectory file for Vector Map Builder

```
./bag2trajectory.py /home/autoware/rosbag/sample sample.csv
```


## Limitations

Here is the limitations of this package. Contributions to further improvements are more than welcome.

- Due to the low conversion accuracy from MGRS to latitude and longitude in this script, the lanes in the output lanelet.osm appear jagged. Importing and then exporting through vector_map_builder corrects these values with high accuracy.
56 changes: 56 additions & 0 deletions bag2lanelet/bag2lanelet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/env python3
import argparse
import os
import pathlib
from datetime import datetime

from bag2way import bag2pose, pose2line
from lanelet_xml import LaneletMap


def genarate(input_path, output, width, mgrs, interval, offset, use_centerline=False):

Check warning on line 11 in bag2lanelet/bag2lanelet.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)
pose_array = bag2pose(input_path, interval)
pose_array = pose_array[::10]
left, right, center = pose2line(pose_array, width=width, offset=offset)

m = LaneletMap(mgrs=mgrs)
left_nodes = [m.add_node(*node) for node in left]
right_nodes = [m.add_node(*node) for node in right]
center_nodes = [m.add_node(*node) for node in center]

left_line = m.add_way(left_nodes)
right_line = m.add_way(right_nodes)
center_line = m.add_way(center_nodes) if use_centerline else None

m.add_relation(left_line, right_line, center_line)
os.makedirs(output, exist_ok=True) if not os.path.isdir(output) else None
m.save(
str(output) + "/" + datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "-" + "lanelet2_map.osm"
)


def main():
parser = argparse.ArgumentParser(description="Create lanelet2 file from rosbag2")
parser.add_argument("input_bag", help="input bag stored path")
parser.add_argument("output_lanelet", help="output lanelet2 save path")
parser.add_argument("-l", "--width", type=float, default=2.0, help="lane width[m]")
parser.add_argument("-m", "--mgrs", default="54SUE", help="MGRS code")
parser.add_argument(
"--interval", type=float, nargs=2, default=[0.1, 2.0], help="min and max interval between tf position")
parser.add_argument(
"--offset", type=float, nargs=3, default=[0.0, 0.0, 0.0], help="offset[m] from base_link"
)
parser.add_argument("--center", action="store_true", help="add centerline to lanelet")

args = parser.parse_args()
input_path = pathlib.Path(args.input_bag)
if not input_path.exists():
raise FileNotFoundError("Input bag folder '{}' is not found.".format(input_path))
output_path = pathlib.Path(args.output_lanelet)

print(args)
genarate(input_path, output_path, args.width, args.mgrs, args.interval, args.offset, args.center)

Check warning on line 52 in bag2lanelet/bag2lanelet.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)


if __name__ == "__main__":
main()
47 changes: 47 additions & 0 deletions bag2lanelet/bag2map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/env python3
import argparse
import os
import pathlib
from datetime import datetime

import folium
from bag2way import bag2point_stamped, bag2pose, pose2line
from lanelet_xml import LaneletMap


def genarate(input_path, output, mgrs):

Check warning on line 12 in bag2lanelet/bag2map.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)
point_array = bag2point_stamped(input_path, 40.0, 500.0)
m = LaneletMap(mgrs=mgrs)
latlon = [{'lat': lat, 'lon': lon} for lat, lon in [m.get_latlon(*node) for node in point_array[:, 1:4]]]
t = point_array[:, 0]

f = folium.Figure(width=800, height=500)
map = folium.Map(location=[latlon[0]["lat"], latlon[0]["lon"]], zoom_start=20).add_to(f)
idx = 0
# Note: latlon and t should be same index size
for ll in latlon:
print(ll["lat"], ll["lon"], t[idx])
folium.Marker(location=[ll["lat"], ll["lon"]], popup=t[idx]).add_to(map)
idx += 1
os.makedirs(output, exist_ok=True) if not os.path.isdir(output) else None
f.save(str(output) + "/" + datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "-" + "map.html")
map.show_in_browser()


def main():
parser = argparse.ArgumentParser(description="Create lanelet2 file from rosbag2")
parser.add_argument("input_bag", help="input bag stored path")
parser.add_argument("output_dir", default="", help="output html store dir")
parser.add_argument("-m", "--mgrs", default="54SUE", help="MGRS code")

args = parser.parse_args()
input_path = pathlib.Path(args.input_bag)
if not input_path.exists():
raise FileNotFoundError("Input bag folder '{}' is not found.".format(input_path))

print(input_path)
genarate(input_path, args.output_dir, args.mgrs)

Check warning on line 43 in bag2lanelet/bag2map.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)


if __name__ == "__main__":
main()
39 changes: 39 additions & 0 deletions bag2lanelet/bag2trajectory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/bin/env python3
import argparse
import pathlib

import numpy as np
import tf_transformations
from bag2way import bag2pose


def genarate(input_path, output_path):

Check warning on line 10 in bag2lanelet/bag2trajectory.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)
pose_array = bag2pose(input_path)
pose_array = pose_array[::50]

trajectory = []
for pose in pose_array:
euler = tf_transformations.euler_from_quaternion(pose[3:])
trajectory.append([*pose[:3], euler[2]])
np.savetxt(

Check warning on line 18 in bag2lanelet/bag2trajectory.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (savetxt)
str(output_path), trajectory, fmt="%.3f", delimiter=",", header="x,y,z,yaw", comments=""
)


def main():
parser = argparse.ArgumentParser(description="Create lanelet2 file from rosbag2")
parser.add_argument("input_bag", help="input bag stored path")
parser.add_argument("output_csv", help="output trajectory csv path")

args = parser.parse_args()
input_path = pathlib.Path(args.input_bag)
if not input_path.exists():
raise FileNotFoundError("Input bag folder '{}' is not found.".format(input_path))
output_path = pathlib.Path(args.output_csv)

print(input_path, output_path)
genarate(input_path, output_path)

Check warning on line 35 in bag2lanelet/bag2trajectory.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (genarate)


if __name__ == "__main__":
main()
119 changes: 119 additions & 0 deletions bag2lanelet/bag2way.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from datetime import datetime
from typing import Text

import numpy as np
import tf_transformations
from rclpy.serialization import deserialize_message
from rosbag2_py import ConverterOptions, SequentialReader, StorageOptions
from rosidl_runtime_py.utilities import get_message


def create_reader(bag_dir: str) -> SequentialReader:
storage_options = StorageOptions(
uri=bag_dir,
storage_id="sqlite3",
)
converter_options = ConverterOptions(
input_serialization_format="cdr",
output_serialization_format="cdr",
)

reader = SequentialReader()
reader.open(storage_options, converter_options)
return reader

# too close & outlier pose filter
def is_close_pose(p0, p1, eps, thresh):
dist = ((p0.x - p1.x) ** 2 + (p0.y - p1.y) ** 2 + (p0.z - p1.z) ** 2) ** 0.5
if dist < eps or thresh < dist:
return True
else:
return False


def bag2pose(input_path, interval=[0.1, 10000.0]):
reader = create_reader(str(input_path))
type_map = {}
for topic_type in reader.get_all_topics_and_types():
type_map[topic_type.name] = topic_type.type

pose_list = []
is_initial_pose = True
prev_trans = None
# read topic and fix timestamp if lidar, and write
while reader.has_next():
(topic, data, stamp) = reader.read_next()
if topic == "/tf":
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
for transform in msg.transforms:
if transform.child_frame_id != "base_link":
continue
trans = transform.transform.translation
rot = transform.transform.rotation
if is_initial_pose:
is_initial_pose = False
prev_trans = trans
elif is_close_pose(prev_trans, trans, interval[0], interval[1]):
# print("too close or too far")
continue
pose_list.append(np.r_[trans.x, trans.y, trans.z, rot.x, rot.y, rot.z, rot.w])
prev_trans = trans

return np.array(pose_list)


def bag2point_stamped(input_path, too_close, too_far):
reader = create_reader(str(input_path))
type_map = {}
for topic_type in reader.get_all_topics_and_types():
type_map[topic_type.name] = topic_type.type

pose_list = []
is_initial_pose = True
prev_trans = None
# read topic and fix timestamp if lidar, and write
while reader.has_next():
(topic, data, stamp) = reader.read_next()
if topic == "/tf":
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
for transform in msg.transforms:
if transform.child_frame_id != "base_link":
continue
trans = transform.transform.translation
rot = transform.transform.rotation
date_time = datetime.fromtimestamp(stamp * 1e-9)
if is_initial_pose:
is_initial_pose = False
prev_trans = trans
elif is_close_pose(prev_trans, trans, too_close, too_far):
# print("too close or too far")
continue
pose_list.append(np.r_[date_time, trans.x, trans.y, trans.z])
prev_trans = trans

return np.array(pose_list)


def pose2line(pose_array, width=3.0, offset=[0, 0, 0]):
pos = pose_array[:, 0:3]
rot = pose_array[:, 3:7]
left = []
right = []
center = []
for p, r in zip(pos, rot):
lat = tf_transformations.quaternion_multiply(
tf_transformations.quaternion_multiply(r, [0, width / 2.0, 0, 0]),
tf_transformations.quaternion_conjugate(r),
)[:3]

off = tf_transformations.quaternion_multiply(
tf_transformations.quaternion_multiply(r, [*offset, 0]),
tf_transformations.quaternion_conjugate(r),
)[:3]

left.append(p + off + lat)
right.append(p + off - lat)
center.append(p + off)
return left, right, center
Loading

0 comments on commit 550528d

Please sign in to comment.