Skip to content

Commit

Permalink
style(pre-commit): autofix
Browse files Browse the repository at this point in the history
  • Loading branch information
pre-commit-ci[bot] committed Jun 7, 2024
1 parent 7ef03e2 commit 2ab4152
Showing 1 changed file with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# limitations under the License.

import argparse
import pickle
from collections import deque
import pickle

import numpy as np
from perception_replayer_common import PerceptionReplayerCommon
Expand All @@ -25,28 +25,33 @@
from utils import create_empty_pointcloud
from utils import translate_objects_coordinate

dist_eps = 1e-2 # (meters)
dist_eps = 1e-2 # (meters)


class PerceptionReproducerV2(PerceptionReplayerCommon):
'''
"""
PerceptionReproducerV2 is used reproduces perception topic data from rosbag, specifically optimized for use with simulators.
'''
"""

def __init__(self, args):
self.rosbag_ego_odom_seach_radius = args.seach_radius # (m) the range of the ego odom to search,
self.reproduce_cooldown = args.reproduce_cooldown # (sec) the cooldown time for republishing published data, please make sure that it's greater than the ego's stopping time.

self.rosbag_ego_odom_seach_radius = (

Check warning on line 37 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (seach)
args.seach_radius

Check warning on line 38 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (seach)
) # (m) the range of the ego odom to search,
self.reproduce_cooldown = (

Check warning on line 40 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (cooldown)
args.reproduce_cooldown

Check warning on line 41 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (cooldown)
) # (sec) the cooldown time for republishing published data, please make sure that it's greater than the ego's stopping time.

Check warning on line 42 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (cooldown)

super().__init__(args, "perception_reproducer_v2")
self.reproduce_sequence_indices = deque()# contains ego_odom_idx
self.cooldown_indices = deque() # contains ego_odom_idx
self.ego_odom_id2last_published_timestamp=dict() # for checking last published timestap.

self.reproduce_sequence_indices = deque() # contains ego_odom_idx
self.cooldown_indices = deque() # contains ego_odom_idx

Check warning on line 47 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (cooldown)
self.ego_odom_id2last_published_timestamp = dict() # for checking last published timestap.

Check warning on line 48 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (timestap)

self.prev_ego_pos = None
self.prev_ego_odom_msg = None
self.perv_objects_msg = None
self.prev_traffic_signals_msg = None


self.stopwatch = StopWatch(self.args.verbose) # for debug

# to make some data to accelerate computation
Expand Down Expand Up @@ -87,53 +92,57 @@ def on_timer(self):
if not self.ego_pose:
print("No ego pose found.")
return

## Update reproduce list if ego_pos is moved.
if self.ego_pose is None \
or self.prev_ego_pos is None or \
((self.ego_pose.position.x - self.prev_ego_pos.position.x)**2 + \
(self.ego_pose.position.y - self.prev_ego_pos.position.y)**2) > dist_eps**2:

## Update reproduce list if ego_pos is moved.
if (
self.ego_pose is None
or self.prev_ego_pos is None
or (
(self.ego_pose.position.x - self.prev_ego_pos.position.x) ** 2
+ (self.ego_pose.position.y - self.prev_ego_pos.position.y) ** 2
)
> dist_eps**2
):
# find the nearest ego odom by simulation observation
self.stopwatch.tic("find_nearest_ego_odom_by_observation")
nearst_ego_odom_ind = self.find_nearest_ego_odom_index(self.ego_pose)

Check warning on line 108 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (nearst)
nearst_ego_odom_pos = self.rosbag_ego_odom_data[nearst_ego_odom_ind][1].pose.pose

Check warning on line 109 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (nearst)

Check warning on line 109 in planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer_v2.py

View workflow job for this annotation

GitHub Actions / spell-check-partial

Unknown word (nearst)
self.stopwatch.toc("find_nearest_ego_odom_by_observation")

# find a list of ego odoms around the nearst_ego_odom_pos.
# find a list of ego odoms around the nearst_ego_odom_pos.
self.stopwatch.tic("find_nearby_ego_odom_indies")
ego_odom_indices = self.find_nearby_ego_odom_indies(nearst_ego_odom_pos,
self.rosbag_ego_odom_seach_radius)
ego_odom_indices = self.find_nearby_ego_odom_indies(
nearst_ego_odom_pos, self.rosbag_ego_odom_seach_radius
)
self.stopwatch.toc("find_nearby_ego_odom_indies")

# update reproduce_sequence with those data not in cooldown list.
while self.cooldown_indices:
last_timestamp = self.ego_odom_id2last_published_timestamp[self.cooldown_indices[0]]
if (timestamp - last_timestamp).nanoseconds/1e9 > self.reproduce_cooldown:
if (timestamp - last_timestamp).nanoseconds / 1e9 > self.reproduce_cooldown:
self.cooldown_indices.popleft()
else:
break


self.stopwatch.tic("update reproduce_sequence")
self.reproduce_sequence_indices = deque(
[idx for idx in ego_odom_indices
if idx not in self.cooldown_indices])
[idx for idx in ego_odom_indices if idx not in self.cooldown_indices]
)
self.stopwatch.toc("update reproduce_sequence")

self.prev_ego_pos = self.ego_pose

## get data to publish
repeat_trigger = len(self.reproduce_sequence_indices) == 0
if not repeat_trigger:# pop data from reproduce_sequence if sequence is not empty.
if not repeat_trigger: # pop data from reproduce_sequence if sequence is not empty.
ego_odom_idx = self.reproduce_sequence_indices.popleft()
pose_timestamp, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
# extract message by the nearest ego odom timestamp
self.stopwatch.tic("find_topics_by_timestamp")
objects_msg, traffic_signals_msg = self.find_topics_by_timestamp(pose_timestamp)
self.stopwatch.toc("find_topics_by_timestamp")

else:#get perv data to publish if reproduce_sequence is empty.
else: # get perv data to publish if reproduce_sequence is empty.
ego_odom_msg = self.prev_ego_odom_msg
objects_msg = self.perv_objects_msg
traffic_signals_msg = self.prev_traffic_signals_msg
Expand All @@ -150,7 +159,9 @@ def on_timer(self):
if self.args.detected_object:
# copy the messages
self.stopwatch.tic("message deepcopy")
objects_msg_copied = pickle.loads(pickle.dumps(objects_msg)) # this is x5 faster than deepcopy
objects_msg_copied = pickle.loads(
pickle.dumps(objects_msg)
) # this is x5 faster than deepcopy
self.stopwatch.toc("message deepcopy")

log_ego_pose = ego_odom_msg.pose.pose
Expand All @@ -159,7 +170,6 @@ def on_timer(self):
objects_msg_copied = objects_msg
self.objects_pub.publish(objects_msg_copied)


# traffic signals
if traffic_signals_msg:
traffic_signals_msg.stamp = timestamp_msg
Expand All @@ -173,8 +183,14 @@ def on_timer(self):
if not repeat_trigger:
# save data for repeate pubulication.
self.prev_ego_odom_msg = ego_odom_msg
self.perv_objects_msg = objects_msg if objects_msg is not None else self.perv_objects_msg
self.prev_traffic_signals_msg = traffic_signals_msg if traffic_signals_msg is not None else self.prev_traffic_signals_msg
self.perv_objects_msg = (
objects_msg if objects_msg is not None else self.perv_objects_msg
)
self.prev_traffic_signals_msg = (
traffic_signals_msg
if traffic_signals_msg is not None
else self.prev_traffic_signals_msg
)

# update cooldown info.
self.ego_odom_id2last_published_timestamp[ego_odom_idx] = timestamp
Expand All @@ -188,18 +204,20 @@ def find_nearest_ego_odom_index(self, ego_pose):
dists_squared = np.sum((self.rosbag_ego_odom_data_numpy - self_pose) ** 2, axis=1)
nearest_idx = np.argmin(dists_squared)
return nearest_idx

def find_nearby_ego_odom_indies(self, ego_pose, search_radius: float):
'''
"""
Find the indices of the ego odoms that are within a certain distance of the ego_pose.
Inputs:
ego_pose: the ego pose to search around.
search_radius: the radius to search around the ego_pose.'''
search_radius: the radius to search around the ego_pose."""
self_pose = np.array([ego_pose.position.x, ego_pose.position.y])
dists_squared = np.sum((self.rosbag_ego_odom_data_numpy - self_pose) ** 2, axis=1)
nearby_indices = np.where(dists_squared <= search_radius**2)[0]

return nearby_indices


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bag", help="rosbag", default=None)
Expand All @@ -216,10 +234,18 @@ def find_nearby_ego_odom_indies(self, ego_pose, search_radius: float):
"-v", "--verbose", help="output debug data", action="store_true", default=False
)
parser.add_argument(
"-r", "--seach-radius", help="the search radius for searching rosbag's ego odom messages around the nearest ego odom pose (default is 1 meters)", type=float, default=1.0
"-r",
"--seach-radius",
help="the search radius for searching rosbag's ego odom messages around the nearest ego odom pose (default is 1 meters)",
type=float,
default=1.0,
)
parser.add_argument(
"-c", "--reproduce-cooldown", help="The cooldown time for republishing published messages (default is 40.0 seconds)", type=float, default=40.0
"-c",
"--reproduce-cooldown",
help="The cooldown time for republishing published messages (default is 40.0 seconds)",
type=float,
default=40.0,
)
args = parser.parse_args()

Expand Down

0 comments on commit 2ab4152

Please sign in to comment.