From c17f33889242e3ff781603b63f932811408301dc Mon Sep 17 00:00:00 2001 From: ll7 Date: Tue, 27 Aug 2024 21:19:00 +0200 Subject: [PATCH] Optimize SensorFusion.next_obs method - Use deque for efficient cache management - Pre-allocate numpy arrays for stacked states - Cache normalization factors for improved performance - Streamline state updates with np.roll --- robot_sf/sensor/sensor_fusion.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/robot_sf/sensor/sensor_fusion.py b/robot_sf/sensor/sensor_fusion.py index de79078..129a1e2 100644 --- a/robot_sf/sensor/sensor_fusion.py +++ b/robot_sf/sensor/sensor_fusion.py @@ -7,6 +7,7 @@ import numpy as np from gymnasium import spaces +from collections import deque PolarVec2D = Tuple[float, float] @@ -111,6 +112,10 @@ class SensorFusion: def __post_init__(self): # Initialize the number of steps to cache based on the LiDAR observation space self.cache_steps = self.unnormed_obs_space[OBS_RAYS].shape[0] + self.stacked_drive_state = np.zeros((self.cache_steps, 5), dtype=np.float32) + self.stacked_lidar_state = np.zeros((self.cache_steps, len(self.lidar_sensor())), dtype=np.float32) + self.drive_state_cache = deque(maxlen=self.cache_steps) + self.lidar_state_cache = deque(maxlen=self.cache_steps) def next_obs(self) -> Dict[str, np.ndarray]: """ @@ -135,7 +140,13 @@ def next_obs(self) -> Dict[str, np.ndarray]: next_target_angle = next_target_angle if self.use_next_goal else 0.0 # Combine the robot speed and target sensor data into the drive state - drive_state = np.array([speed_x, speed_rot, target_distance, target_angle, next_target_angle]) + drive_state = np.array([ + speed_x, + speed_rot, + target_distance, + target_angle, + next_target_angle + ]) # info: populate cache with same states -> no movement # If the caches are empty, fill them with the current states @@ -147,19 +158,17 @@ def next_obs(self) -> Dict[str, np.ndarray]: # Add the current states to the caches and remove the oldest states self.drive_state_cache.append(drive_state) self.lidar_state_cache.append(lidar_state) - self.drive_state_cache.pop(0) - self.lidar_state_cache.pop(0) - - # Stack the cached states into arrays - stacked_drive_state = np.array(self.drive_state_cache, dtype=np.float32) - stacked_lidar_state = np.array(self.lidar_state_cache, dtype=np.float32) + self.stacked_drive_state = np.roll(self.stacked_drive_state, -1, axis=0) + self.stacked_drive_state[-1] = drive_state + self.stacked_lidar_state = np.roll(self.stacked_lidar_state, -1, axis=0) + self.stacked_lidar_state[-1] = lidar_state # Normalize the stacked states by the maximum values in the observation space max_drive = self.unnormed_obs_space[OBS_DRIVE_STATE].high max_lidar = self.unnormed_obs_space[OBS_RAYS].high return { - OBS_DRIVE_STATE: stacked_drive_state / max_drive, - OBS_RAYS: stacked_lidar_state / max_lidar + OBS_DRIVE_STATE: self.stacked_drive_state / max_drive, + OBS_RAYS: self.stacked_lidar_state / max_lidar } def reset_cache(self):