diff --git a/robot_sf/render/playback_recording.py b/robot_sf/render/playback_recording.py index ae72c5c..555e0f1 100644 --- a/robot_sf/render/playback_recording.py +++ b/robot_sf/render/playback_recording.py @@ -1,5 +1,19 @@ -""" -playback a recorded list of states +"""Robot Simulation State Playback Module + +This module provides functionality to replay and visualize recorded robot simulation states. +It supports both interactive visualization and video recording of simulation playbacks. + +Key Features: + - Load simulation states from pickle files + - Validate simulation state data + - Visualize states interactively + - Record simulation playback as video + - Support for map definitions and robot states + +Notes: + The pickle files should contain a tuple of (states, map_def) where: + - states: List[VisualizableSimState] - Sequence of simulation states + - map_def: MapDefinition - Configuration of the simulation environment """ import os @@ -14,7 +28,27 @@ def load_states(filename: str) -> List[VisualizableSimState]: """ - load a list of states from a file with pickle + Load a list of states from a pickle file. + + This function reads a pickle file containing simulation states and map definition, + performs validation checks, and returns them if valid. + + Args: + filename (str): Path to the pickle file containing the states + + Returns: + Tuple[List[VisualizableSimState], MapDefinition]: A tuple containing: + - List of VisualizableSimState objects representing simulation states + - MapDefinition object containing the map information + + Raises: + TypeError: If loaded states are not VisualizableSimState objects or map_def + is not MapDefinition + + Notes: + The pickle file must contain a tuple of (states, map_def) where: + - states is a list of VisualizableSimState objects + - map_def is a MapDefinition object """ # Check if the file is empty if os.path.getsize(filename) == 0: @@ -25,6 +59,18 @@ def load_states(filename: str) -> List[VisualizableSimState]: with open(filename, "rb") as f: # rb = read binary states, map_def = pickle.load(f) logger.info(f"Loaded {len(states)} states") + + # Verify `states` is a list of VisualizableSimState + if not all(isinstance(state, VisualizableSimState) for state in states): + logger.error(f"Invalid states loaded from {filename}") + raise TypeError(f"Invalid states loaded from {filename}") + + # Verify `map_def` is a MapDefinition + if not isinstance(map_def, MapDefinition): + logger.error(f"Invalid map definition loaded from {filename}") + logger.error(f"map_def: {type(map_def)}") + raise TypeError(f"Invalid map definition loaded from {filename}") + return states, map_def @@ -46,3 +92,43 @@ def load_states_and_visualize(filename: str): """ states, map_def = load_states(filename) visualize_states(states, map_def) + + +def load_states_and_record_video( + state_file: str, video_save_path: str, video_fps: float = 10 +): + """ + Load robot states from a file and create a video recording of the simulation. + + This function reads saved robot states from a file, initializes a simulation view, + and records each state to create a video visualization of the robot's movement. + + Args: + state_file (str): Path to the file containing saved robot states and map definition + video_save_path (str): Path where the output video file should be saved + video_fps (float, optional): Frames per second for the output video. Defaults to 10. + + Returns: + None + + Note: + The states file should contain both the robot states and map definition in a + compatible format. + The video will be written when the simulation view is closed via exit_simulation(). + + Example: + >>> load_states_and_record_video("states.pkl", "output.mp4", video_fps=30) + """ + logger.info(f"Loading states from {state_file}") + states, map_def = load_states(state_file) + sim_view = SimulationView( + map_def=map_def, + caption="RobotSF Recording", + record_video=True, + video_path=video_save_path, + video_fps=video_fps, + ) + for state in states: + sim_view.render(state) + + sim_view.exit_simulation() # to write the video file diff --git a/robot_sf/render/sim_view.py b/robot_sf/render/sim_view.py index 776cc9b..732fb49 100644 --- a/robot_sf/render/sim_view.py +++ b/robot_sf/render/sim_view.py @@ -257,11 +257,9 @@ def render(self, state: VisualizableSimState, sleep_time: float = 0.01): if self.record_video: # Capture frame - logger.debug("trying to record a frame") frame_data = pygame.surfarray.array3d(self.screen) frame_data = frame_data.swapaxes(0, 1) self.frames.append(frame_data) - logger.debug(f"Recorded frames {len(self.frames)}") if len(self.frames) > 2000: logger.warning("Too many frames recorded. Stopping video recording.") else: diff --git a/tests/test_load_states_and_record_video.py b/tests/test_load_states_and_record_video.py new file mode 100644 index 0000000..cc9da2f --- /dev/null +++ b/tests/test_load_states_and_record_video.py @@ -0,0 +1,43 @@ +"""pytest for load_states_and_record_video.py""" + +import pytest +import datetime +from robot_sf.render.playback_recording import load_states_and_record_video +from robot_sf.render.sim_view import MOVIEPY_AVAILABLE +from pathlib import Path + + +@pytest.mark.skipif( + not MOVIEPY_AVAILABLE, reason="MoviePy/ffmpeg not available for video recording" +) +def test_load_states_and_record_video(delete_video: bool = True): + """Test loading simulation states and recording them as video. + + Args: + delete_video: Whether to delete the video file after test. Default True. + """ + # Create recordings directory if it doesn't exist + recordings_dir = Path("recordings") + recordings_dir.mkdir(exist_ok=True) + + # create a unique video name + video_name = "playback_test_" + datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".mp4" + + output_path = recordings_dir / video_name + + try: + load_states_and_record_video( + "test_pygame/recordings/2024-06-04_08-39-59.pkl", str(output_path) + ) + + assert output_path.exists(), "Video file was not created" + assert output_path.stat().st_size > 0, "Video file is empty" + finally: + # Clean up + if output_path.exists() and delete_video: + output_path.unlink() + + + +if __name__ == "__main__": + test_load_states_and_record_video(delete_video=False)