Skip to content

Commit

Permalink
feat(perception_replayer): add a new perception_reproducer. (#44)
Browse files Browse the repository at this point in the history
* fix a bug that perception_replayer/perception_reproducer can't publish old 'autoware_auto_perception_msgs' in old rosbags.

Signed-off-by: temkei.kem <[email protected]>

* hard reset to fix conflict.

Signed-off-by: temkei.kem <[email protected]>

* replace perception_reproducer_v2,py to perception_reproducer.py, update README

Signed-off-by: temkei.kem <[email protected]>

---------

Signed-off-by: temkei.kem <[email protected]>
  • Loading branch information
xtk8532704 authored Jun 11, 2024
1 parent 7a4ff19 commit 10b1ae3
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 36 deletions.
15 changes: 13 additions & 2 deletions planning/planning_debug_tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,16 @@ PlotCurrentVelocity('localization_kinematic_state', '/localization/kinematic_sta

This script can overlay the perception results from the rosbag on the planning simulator synchronized with the simulator's ego pose.

In detail, the ego pose in the rosbag which is closest to the current ego pose in the simulator is calculated.
The perception results at the timestamp of the closest ego pose is extracted, and published.
### How it works

Whenever the ego's position changes, a chronological `reproduce_sequence` queue is generated base on its position with a search radius (default to 2 m).
If the queue is empty, the nearest odom message in the rosbag is added to the queue.
When publishing perception messages, the first element in the `reproduce_sequence` is popped and published.

This design results in the following behavior:

- When ego stops, the perception messages are published in chronological order until queue is empty.
- When the ego moves, a perception message close to ego's position is published.

### How to use

Expand All @@ -215,6 +223,9 @@ ros2 run planning_debug_tools perception_reproducer.py -b <dir-to-bag-files>

Instead of publishing predicted objects, you can publish detected/tracked objects by designating `-d` or `-t`, respectively.

You can use `-r` option to set the search radius in meters for the perception messages. If it is set to 0, the reproducer always publishes the nearest perception message as how did the old perception_reproducer work.
`-c`(`--reproduce-cool-down`) option is to set the cool down time in seconds, aiming to prevent republishing recently published messages.

## Perception replayer

A part of the feature is under development.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

import argparse
from collections import deque
import pickle

import numpy as np
Expand All @@ -24,19 +25,48 @@
from utils import create_empty_pointcloud
from utils import translate_objects_coordinate

dist_eps = 1e-2 # (meters)


class PerceptionReproducer(PerceptionReplayerCommon):
def __init__(self, args):
self.rosbag_ego_odom_search_radius = (
args.search_radius
) # (m) the range of the ego odom to search,
self.ego_odom_search_radius = (
self.rosbag_ego_odom_search_radius
) # it may be set by an individual parameter.

self.reproduce_cool_down = (
args.reproduce_cool_down if args.search_radius != 0.0 else 0.0
) # (sec) the cool down time for republishing published data, please make sure that it's greater than the ego's stopping time.

super().__init__(args, "perception_reproducer")

self.reproduce_sequence_indices = deque() # contains ego_odom_idx
self.cool_down_indices = deque() # contains ego_odom_idx
self.ego_odom_id2last_published_timestamp = {} # for checking last published timestamp.

self.prev_ego_pos = None
self.prev_ego_odom_msg = None
self.perv_objects_msg = None
self.prev_traffic_signals_msg = None

self.stopwatch = StopWatch(self.args.verbose) # for debug

# to make some data to accelerate computation
self.preprocess_data()

# start main timer callback
self.timer = self.create_timer(0.1, self.on_timer)
time_diffs = []
prev_stamp = self.rosbag_ego_odom_data[0][0]
for stamp, _ in self.rosbag_ego_odom_data[1:]:
time_diff = (stamp - prev_stamp) / 1e9
time_diffs.append(time_diff)
prev_stamp = stamp

average_ego_odom_interval = 0.1 # sum(time_diffs) / len(time_diffs)
self.timer = self.create_timer(average_ego_odom_interval, self.on_timer)

# kill perception process to avoid a conflict of the perception topics
self.timer_check_perception_process = self.create_timer(3.0, self.on_timer_kill_perception)
Expand All @@ -60,70 +90,154 @@ def on_timer(self):
print("\n-- on_timer start --")
self.stopwatch.tic("total on_timer")

timestamp = self.get_clock().now().to_msg()
timestamp = self.get_clock().now()
timestamp_msg = timestamp.to_msg()

if self.args.detected_object:
pointcloud_msg = create_empty_pointcloud(timestamp)
pointcloud_msg = create_empty_pointcloud(timestamp_msg)
self.pointcloud_pub.publish(pointcloud_msg)

if not self.ego_pose:
print("No ego pose found.")
return

# find nearest ego odom by simulation observation
self.stopwatch.tic("find_nearest_ego_odom_by_observation")
ego_odom = self.find_nearest_ego_odom_by_observation(self.ego_pose)
pose_timestamp = ego_odom[0]
log_ego_pose = ego_odom[1].pose.pose
self.stopwatch.toc("find_nearest_ego_odom_by_observation")
# Update reproduce list if ego_pos is moved.
if (
self.ego_pose is None
or self.prev_ego_pos is None
or (
(self.ego_pose.position.x - self.prev_ego_pos.position.x) ** 2
+ (self.ego_pose.position.y - self.prev_ego_pos.position.y) ** 2
)
> dist_eps**2
):
# find the nearest ego odom by simulation observation
self.stopwatch.tic("find_nearest_ego_odom_by_observation")
nearby_ego_odom_indies = self.find_nearby_ego_odom_indies(
[self.ego_pose], self.ego_odom_search_radius
)
nearby_ego_odom_indies = [
self.rosbag_ego_odom_data[idx][1].pose.pose for idx in nearby_ego_odom_indies
]
if not nearby_ego_odom_indies:
nearest_ego_odom_ind = self.find_nearest_ego_odom_index(self.ego_pose)
nearby_ego_odom_indies += [
self.rosbag_ego_odom_data[nearest_ego_odom_ind][1].pose.pose
]
self.stopwatch.toc("find_nearest_ego_odom_by_observation")

# find a list of ego odom around the nearest_ego_odom_pos.
self.stopwatch.tic("find_nearby_ego_odom_indies")
ego_odom_indices = self.find_nearby_ego_odom_indies(
nearby_ego_odom_indies, self.rosbag_ego_odom_search_radius
)
self.stopwatch.toc("find_nearby_ego_odom_indies")

# extract message by the nearest ego odom timestamp
self.stopwatch.tic("find_topics_by_timestamp")
msgs_orig = self.find_topics_by_timestamp(pose_timestamp)
self.stopwatch.toc("find_topics_by_timestamp")
# update reproduce_sequence with those data not in cool down list.
while self.cool_down_indices:
last_timestamp = self.ego_odom_id2last_published_timestamp[
self.cool_down_indices[0]
]
if (timestamp - last_timestamp).nanoseconds / 1e9 > self.reproduce_cool_down:
self.cool_down_indices.popleft()
else:
break

# copy the messages
self.stopwatch.tic("message deepcopy")
if self.args.detected_object:
msgs = pickle.loads(pickle.dumps(msgs_orig)) # this is x5 faster than deepcopy
objects_msg = msgs[0]
traffic_signals_msg = msgs[1]
else:
# NOTE: No need to deepcopy since only timestamp will be changed and it will be changed every time correctly.
objects_msg = msgs_orig[0]
traffic_signals_msg = msgs_orig[1]
self.stopwatch.toc("message deepcopy")
self.stopwatch.tic("update reproduce_sequence")
ego_odom_indices = [
idx for idx in ego_odom_indices if idx not in self.cool_down_indices
]
ego_odom_indices = sorted(ego_odom_indices)
self.reproduce_sequence_indices = deque(ego_odom_indices)

self.stopwatch.toc("update reproduce_sequence")

self.prev_ego_pos = self.ego_pose

# get data to publish
repeat_trigger = len(self.reproduce_sequence_indices) == 0
if not repeat_trigger: # pop data from reproduce_sequence if sequence is not empty.
ego_odom_idx = self.reproduce_sequence_indices.popleft()
pose_timestamp, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
# extract message by the nearest ego odom timestamp
self.stopwatch.tic("find_topics_by_timestamp")
objects_msg, traffic_signals_msg = self.find_topics_by_timestamp(pose_timestamp)
self.stopwatch.toc("find_topics_by_timestamp")
# if self.args.verbose:
# print("reproduce_sequence_indices: ", list(self.reproduce_sequence_indices)[:20])

else: # get perv data to publish if reproduce_sequence is empty.
ego_odom_msg = self.prev_ego_odom_msg
objects_msg = self.perv_objects_msg
traffic_signals_msg = self.prev_traffic_signals_msg

# transform and publish current data.
self.stopwatch.tic("transform and publish")

# ego odom
self.recorded_ego_pub.publish(ego_odom_msg)

# objects
if objects_msg:
objects_msg.header.stamp = timestamp
objects_msg.header.stamp = timestamp_msg
if self.args.detected_object:
translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg)
self.objects_pub.publish(objects_msg)
# copy the messages
self.stopwatch.tic("message deepcopy")
objects_msg_copied = pickle.loads(
pickle.dumps(objects_msg)
) # this is x5 faster than deepcopy
self.stopwatch.toc("message deepcopy")

# ego odom
self.recorded_ego_pub.publish(ego_odom[1])
log_ego_pose = ego_odom_msg.pose.pose
translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg_copied)
else:
objects_msg_copied = objects_msg
self.objects_pub.publish(objects_msg_copied)

# traffic signals
if traffic_signals_msg:
traffic_signals_msg.stamp = timestamp
traffic_signals_msg.stamp = timestamp_msg
self.traffic_signals_pub.publish(traffic_signals_msg)
self.prev_traffic_signals_msg = traffic_signals_msg
elif self.prev_traffic_signals_msg:
self.prev_traffic_signals_msg.stamp = timestamp
self.prev_traffic_signals_msg.stamp = timestamp_msg

self.traffic_signals_pub.publish(self.prev_traffic_signals_msg)
self.stopwatch.toc("transform and publish")

self.stopwatch.toc("total on_timer")
if not repeat_trigger:
# save data for repeat publication.
self.prev_ego_odom_msg = ego_odom_msg
self.perv_objects_msg = (
objects_msg if objects_msg is not None else self.perv_objects_msg
)
self.prev_traffic_signals_msg = (
traffic_signals_msg
if traffic_signals_msg is not None
else self.prev_traffic_signals_msg
)

# update cool down info.
self.ego_odom_id2last_published_timestamp[ego_odom_idx] = timestamp
self.cool_down_indices.append(ego_odom_idx)

self.stopwatch.toc("total on_timer")

def find_nearest_ego_odom_by_observation(self, ego_pose):
def find_nearest_ego_odom_index(self, ego_pose):
# nearest search with numpy format is much (~ x100) faster than regular for loop
self_pose = np.array([ego_pose.position.x, ego_pose.position.y])
dists_squared = np.sum((self.rosbag_ego_odom_data_numpy - self_pose) ** 2, axis=1)
nearest_idx = np.argmin(dists_squared)
return nearest_idx

return self.rosbag_ego_odom_data[nearest_idx]
def find_nearby_ego_odom_indies(self, ego_poses: list, search_radius: float):
ego_poses_np = np.array([[pose.position.x, pose.position.y] for pose in ego_poses])
dists_squared = np.sum(
(self.rosbag_ego_odom_data_numpy[:, None] - ego_poses_np) ** 2, axis=2
)
nearby_indices = np.where(np.any(dists_squared <= search_radius**2, axis=1))[0]

return nearby_indices


if __name__ == "__main__":
Expand All @@ -141,6 +255,20 @@ def find_nearest_ego_odom_by_observation(self, ego_pose):
parser.add_argument(
"-v", "--verbose", help="output debug data", action="store_true", default=False
)
parser.add_argument(
"-r",
"--search-radius",
help="the search radius for searching rosbag's ego odom messages around the nearest ego odom pose (default is 2 meters), if the search radius is set to 0, it will always publish the closest message, just as the old reproducer did.",
type=float,
default=2.0,
)
parser.add_argument(
"-c",
"--reproduce-cool-down",
help="The cool down time for republishing published messages (default is 15.0 seconds)",
type=float,
default=15.0,
)
args = parser.parse_args()

rclpy.init()
Expand Down

0 comments on commit 10b1ae3

Please sign in to comment.