Skip to content

Commit

Permalink
Merge pull request #96 from ll7/79-Create-videos-based-on-state-recor…
Browse files Browse the repository at this point in the history
…dings

Enhance load_states function and add video recording capability
  • Loading branch information
ll7 authored Dec 11, 2024
2 parents 4da95da + f63521b commit c975c8b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 5 deletions.
92 changes: 89 additions & 3 deletions robot_sf/render/playback_recording.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
"""
playback a recorded list of states
"""Robot Simulation State Playback Module
This module provides functionality to replay and visualize recorded robot simulation states.
It supports both interactive visualization and video recording of simulation playbacks.
Key Features:
- Load simulation states from pickle files
- Validate simulation state data
- Visualize states interactively
- Record simulation playback as video
- Support for map definitions and robot states
Notes:
The pickle files should contain a tuple of (states, map_def) where:
- states: List[VisualizableSimState] - Sequence of simulation states
- map_def: MapDefinition - Configuration of the simulation environment
"""

import os
Expand All @@ -14,7 +28,27 @@

def load_states(filename: str) -> List[VisualizableSimState]:
"""
load a list of states from a file with pickle
Load a list of states from a pickle file.
This function reads a pickle file containing simulation states and map definition,
performs validation checks, and returns them if valid.
Args:
filename (str): Path to the pickle file containing the states
Returns:
Tuple[List[VisualizableSimState], MapDefinition]: A tuple containing:
- List of VisualizableSimState objects representing simulation states
- MapDefinition object containing the map information
Raises:
TypeError: If loaded states are not VisualizableSimState objects or map_def
is not MapDefinition
Notes:
The pickle file must contain a tuple of (states, map_def) where:
- states is a list of VisualizableSimState objects
- map_def is a MapDefinition object
"""
# Check if the file is empty
if os.path.getsize(filename) == 0:
Expand All @@ -25,6 +59,18 @@ def load_states(filename: str) -> List[VisualizableSimState]:
with open(filename, "rb") as f: # rb = read binary
states, map_def = pickle.load(f)
logger.info(f"Loaded {len(states)} states")

# Verify `states` is a list of VisualizableSimState
if not all(isinstance(state, VisualizableSimState) for state in states):
logger.error(f"Invalid states loaded from {filename}")
raise TypeError(f"Invalid states loaded from {filename}")

# Verify `map_def` is a MapDefinition
if not isinstance(map_def, MapDefinition):
logger.error(f"Invalid map definition loaded from {filename}")
logger.error(f"map_def: {type(map_def)}")
raise TypeError(f"Invalid map definition loaded from {filename}")

return states, map_def


Expand All @@ -46,3 +92,43 @@ def load_states_and_visualize(filename: str):
"""
states, map_def = load_states(filename)
visualize_states(states, map_def)


def load_states_and_record_video(
state_file: str, video_save_path: str, video_fps: float = 10
):
"""
Load robot states from a file and create a video recording of the simulation.
This function reads saved robot states from a file, initializes a simulation view,
and records each state to create a video visualization of the robot's movement.
Args:
state_file (str): Path to the file containing saved robot states and map definition
video_save_path (str): Path where the output video file should be saved
video_fps (float, optional): Frames per second for the output video. Defaults to 10.
Returns:
None
Note:
The states file should contain both the robot states and map definition in a
compatible format.
The video will be written when the simulation view is closed via exit_simulation().
Example:
>>> load_states_and_record_video("states.pkl", "output.mp4", video_fps=30)
"""
logger.info(f"Loading states from {state_file}")
states, map_def = load_states(state_file)
sim_view = SimulationView(
map_def=map_def,
caption="RobotSF Recording",
record_video=True,
video_path=video_save_path,
video_fps=video_fps,
)
for state in states:
sim_view.render(state)

sim_view.exit_simulation() # to write the video file
2 changes: 0 additions & 2 deletions robot_sf/render/sim_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,9 @@ def render(self, state: VisualizableSimState, sleep_time: float = 0.01):

if self.record_video:
# Capture frame
logger.debug("trying to record a frame")
frame_data = pygame.surfarray.array3d(self.screen)
frame_data = frame_data.swapaxes(0, 1)
self.frames.append(frame_data)
logger.debug(f"Recorded frames {len(self.frames)}")
if len(self.frames) > 2000:
logger.warning("Too many frames recorded. Stopping video recording.")
else:
Expand Down
43 changes: 43 additions & 0 deletions tests/test_load_states_and_record_video.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""pytest for load_states_and_record_video.py"""

import pytest
import datetime
from robot_sf.render.playback_recording import load_states_and_record_video
from robot_sf.render.sim_view import MOVIEPY_AVAILABLE
from pathlib import Path


@pytest.mark.skipif(
not MOVIEPY_AVAILABLE, reason="MoviePy/ffmpeg not available for video recording"
)
def test_load_states_and_record_video(delete_video: bool = True):
"""Test loading simulation states and recording them as video.
Args:
delete_video: Whether to delete the video file after test. Default True.
"""
# Create recordings directory if it doesn't exist
recordings_dir = Path("recordings")
recordings_dir.mkdir(exist_ok=True)

# create a unique video name
video_name = "playback_test_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".mp4"

output_path = recordings_dir / video_name

try:
load_states_and_record_video(
"test_pygame/recordings/2024-06-04_08-39-59.pkl", str(output_path)
)

assert output_path.exists(), "Video file was not created"
assert output_path.stat().st_size > 0, "Video file is empty"
finally:
# Clean up
if output_path.exists() and delete_video:
output_path.unlink()



if __name__ == "__main__":
test_load_states_and_record_video(delete_video=False)

0 comments on commit c975c8b

Please sign in to comment.