Skip to content

Commit

Permalink
Optimize SensorFusion.next_obs method
Browse files Browse the repository at this point in the history
- Use deque for efficient cache management
- Pre-allocate numpy arrays for stacked states
- Cache normalization factors for improved performance
- Streamline state updates with np.roll
  • Loading branch information
ll7 committed Aug 27, 2024
1 parent 09747e0 commit c17f338
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions robot_sf/sensor/sensor_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
from gymnasium import spaces
from collections import deque

PolarVec2D = Tuple[float, float]

Expand Down Expand Up @@ -111,6 +112,10 @@ class SensorFusion:
def __post_init__(self):
# Initialize the number of steps to cache based on the LiDAR observation space
self.cache_steps = self.unnormed_obs_space[OBS_RAYS].shape[0]
self.stacked_drive_state = np.zeros((self.cache_steps, 5), dtype=np.float32)
self.stacked_lidar_state = np.zeros((self.cache_steps, len(self.lidar_sensor())), dtype=np.float32)
self.drive_state_cache = deque(maxlen=self.cache_steps)
self.lidar_state_cache = deque(maxlen=self.cache_steps)

def next_obs(self) -> Dict[str, np.ndarray]:
"""
Expand All @@ -135,7 +140,13 @@ def next_obs(self) -> Dict[str, np.ndarray]:
next_target_angle = next_target_angle if self.use_next_goal else 0.0

# Combine the robot speed and target sensor data into the drive state
drive_state = np.array([speed_x, speed_rot, target_distance, target_angle, next_target_angle])
drive_state = np.array([
speed_x,
speed_rot,
target_distance,
target_angle,
next_target_angle
])

# info: populate cache with same states -> no movement
# If the caches are empty, fill them with the current states
Expand All @@ -147,19 +158,17 @@ def next_obs(self) -> Dict[str, np.ndarray]:
# Add the current states to the caches and remove the oldest states
self.drive_state_cache.append(drive_state)
self.lidar_state_cache.append(lidar_state)
self.drive_state_cache.pop(0)
self.lidar_state_cache.pop(0)

# Stack the cached states into arrays
stacked_drive_state = np.array(self.drive_state_cache, dtype=np.float32)
stacked_lidar_state = np.array(self.lidar_state_cache, dtype=np.float32)
self.stacked_drive_state = np.roll(self.stacked_drive_state, -1, axis=0)
self.stacked_drive_state[-1] = drive_state
self.stacked_lidar_state = np.roll(self.stacked_lidar_state, -1, axis=0)
self.stacked_lidar_state[-1] = lidar_state

# Normalize the stacked states by the maximum values in the observation space
max_drive = self.unnormed_obs_space[OBS_DRIVE_STATE].high
max_lidar = self.unnormed_obs_space[OBS_RAYS].high
return {
OBS_DRIVE_STATE: stacked_drive_state / max_drive,
OBS_RAYS: stacked_lidar_state / max_lidar
OBS_DRIVE_STATE: self.stacked_drive_state / max_drive,
OBS_RAYS: self.stacked_lidar_state / max_lidar
}

def reset_cache(self):
Expand Down

0 comments on commit c17f338

Please sign in to comment.