-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* video player working * Refactor video player functionality and improve seek handling * Fix Timer FPS calculation to avoid division by zero on the first frame * formatting * remove old filename * debug message uses logger * formatting * line too long * add clock utest
- Loading branch information
Showing
3 changed files
with
220 additions
and
115 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,193 +1,280 @@ | ||
""" | ||
Relies on the PyAV library to decode video frames and display them using a texture. | ||
pip install av | ||
Video player implementation using PyAV for decoding and ModernGL for rendering. | ||
Requires: pip install av | ||
""" | ||
|
||
import math | ||
import logging | ||
from pathlib import Path | ||
from typing import Union | ||
from typing import Union, Literal | ||
|
||
import av | ||
import moderngl | ||
|
||
import moderngl_window | ||
from moderngl_window import geometry | ||
|
||
logger = logging.getLogger(__name__) | ||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
|
||
class VideoDecoder: | ||
"""Handles video decoding using PyAV.""" | ||
|
||
class Decoder: | ||
def __init__(self, path: Union[str, Path]): | ||
self.container = av.open(str(path)) | ||
self.video = self.container.streams[0] | ||
self._path = Path(path) | ||
if not self._path.exists(): | ||
raise FileNotFoundError(f"Video file not found: {self._path}") | ||
|
||
self.container = av.open(str(self._path)) | ||
self.video = self.container.streams.video[0] | ||
self.video.thread_type = "AUTO" | ||
self._last_packet = None | ||
self._frame_step = float(self.video.time_base) | ||
self.video.codec_context.pix_fmt = "yuv420p" | ||
|
||
self._current_frame = 0 | ||
|
||
@property | ||
def duration(self) -> float: | ||
"""float: Number of frames in the video""" | ||
"""Video duration in seconds.""" | ||
if self.video.duration is None: | ||
return -1 | ||
return self.video.duration * self.video.time_base | ||
|
||
@property | ||
def end_time(self): | ||
return self.video.end_time | ||
return float(self.container.duration * float(self.container.time_base)) | ||
return float(self.video.duration * float(self.video.time_base)) | ||
|
||
@property | ||
def average_rate(self) -> float: | ||
"""The average framerate as a float""" | ||
def framerate(self) -> float: | ||
"""Average framerate.""" | ||
rate = self.video.average_rate | ||
return rate.numerator / rate.denominator | ||
|
||
@property | ||
def frames(self) -> int: | ||
"""int: Number of frames in the video""" | ||
return self.video.frames | ||
|
||
@property | ||
def video_size(self) -> tuple[int, int]: | ||
"""tuple[int, int]: The width and height of the video in pixels""" | ||
def size(self) -> tuple[int, int]: | ||
"""Video dimensions (width, height).""" | ||
return self.video.width, self.video.height | ||
|
||
@property | ||
def current_pos(self): | ||
"""The current position in the stream""" | ||
if self._last_packet: | ||
return self._last_packet.pts | ||
return 0 | ||
def seek(self, time_seconds: float) -> None: | ||
"""Seek to specified time position.""" | ||
try: | ||
time_seconds = max(0, min(time_seconds, self.duration)) | ||
timestamp = int(time_seconds / float(self.video.time_base)) | ||
self.container.seek(timestamp, stream=self.video) | ||
self._current_frame = int(time_seconds * self.framerate) | ||
self.container.decode(video=0) | ||
except Exception as e: | ||
logger.error(f"Seek failed: {e}") | ||
self.container.seek(0) | ||
self._current_frame = 0 | ||
|
||
def get_frames(self): | ||
"""Generate video frames from current position.""" | ||
try: | ||
for packet in self.container.demux(video=0): | ||
for frame in packet.decode(): | ||
self._current_frame += 1 | ||
yield frame.to_rgb().planes[0] | ||
except Exception as e: | ||
logger.error(f"Frame generation error: {e}") | ||
|
||
@property | ||
def frame_step(self): | ||
"""Position step for each frame""" | ||
return self._frame_step | ||
def frames(self) -> int: | ||
"""Total number of frames in video.""" | ||
return int(self.duration * self.framerate) | ||
|
||
def time_to_pos(self, time: float) -> int: | ||
"""Converts time to stream position""" | ||
return time * self.average_rate | ||
def __enter__(self): | ||
return self | ||
|
||
def seek(self, position: int): | ||
"""Seek to a position in the stream""" | ||
self.container.seek(position, stream=self.video) | ||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
self.container.close() | ||
|
||
def gen_frames(self): | ||
for packet in self.container.demux(video=0): | ||
if packet.pts is not None: | ||
self._last_packet = packet | ||
for i, frame in enumerate(packet.decode()): | ||
yield frame.to_rgb().planes[0] | ||
def close(self): | ||
"""Explicitly close the video container.""" | ||
self.container.close() | ||
|
||
|
||
class Player: | ||
class VideoPlayer: | ||
"""Handles video playback and rendering using ModernGL.""" | ||
|
||
# Add constants at class level | ||
FRAME_DIFF_THRESHOLD = 5 | ||
MAX_BEHIND_COUNT = 10 | ||
SKIP_OFFSET = 2 | ||
|
||
def __init__(self, ctx: moderngl.Context, path: Union[str, Path]): | ||
self._ctx = ctx | ||
self._path = path | ||
self._decoder = Decoder(self._path) | ||
self._texture = self._ctx.texture(self._decoder.video_size, 3) | ||
self._frames = self._decoder.gen_frames() | ||
self._decoder = VideoDecoder(path) | ||
self._texture = self._ctx.texture(self._decoder.size, 3, dtype="f1") | ||
self._frames = self._decoder.get_frames() | ||
|
||
self._last_time = 0 | ||
self._fps = self._decoder.average_rate | ||
self._current_frame = 0 | ||
self._target_frame = 0 | ||
self._behind_count = 0 | ||
self._paused = False | ||
|
||
@property | ||
def fps(self) -> float: | ||
"""float: Framerate of the video""" | ||
return self._fps | ||
return self._decoder.framerate | ||
|
||
@property | ||
def duration(self) -> float: | ||
"""float: Length of video in seconds""" | ||
return self._decoder.duration | ||
|
||
@property | ||
def frames(self) -> int: | ||
"""int: The number of frames in the video""" | ||
return self._decoder.frames | ||
|
||
@property | ||
def video_size(self) -> tuple[int, int]: | ||
"""tuple[int, int]: Video size in pixels""" | ||
return self._decoder.video_size | ||
def size(self) -> tuple[int, int]: | ||
return self._decoder.size | ||
|
||
@property | ||
def texture(self) -> moderngl.Texture: | ||
return self._texture | ||
|
||
def update(self, time: float): | ||
next_pos = self._decoder.time_to_pos(time) | ||
delta = next_pos - self._decoder.current_pos | ||
|
||
print( | ||
( | ||
f"frame_step={self._decoder.frame_step}, " | ||
f"delta={delta}, " | ||
f"next_pos={next_pos}, " | ||
f"current_pos={self._decoder.current_pos}, " | ||
f"time={time}" | ||
) | ||
) | ||
|
||
# Seek we are more than 3 frames off | ||
if abs(delta) > self._decoder.frame_step * 3: | ||
seek_pos = int(next_pos) | ||
print("SEEK", delta, seek_pos) | ||
self._decoder.seek(seek_pos) | ||
# else: | ||
# if delta < self._decoder.frame_step: | ||
# print("SKIP") | ||
# return | ||
def update(self, time: float) -> bool: | ||
"""Update video playback state.""" | ||
if self._paused: | ||
return False | ||
|
||
self._target_frame = int(time * self.fps) | ||
frame_diff = self._target_frame - self._current_frame | ||
|
||
# Check if we've reached the end | ||
if self._current_frame >= self.frames: | ||
self.seek(0) | ||
return True | ||
|
||
# Handle falling behind | ||
if frame_diff > self.FRAME_DIFF_THRESHOLD: | ||
self._behind_count += 1 | ||
skip_to = min(self._target_frame - self.SKIP_OFFSET, self.frames - 1) | ||
|
||
if self._behind_count > self.MAX_BEHIND_COUNT: | ||
self._decoder.seek(skip_to / self.fps) | ||
self._frames = self._decoder.get_frames() | ||
self._current_frame = skip_to | ||
self._behind_count = 0 | ||
else: | ||
try: | ||
while self._current_frame < skip_to: | ||
next(self._frames) | ||
self._current_frame += 1 | ||
except StopIteration: | ||
self.seek(0) | ||
return True | ||
else: | ||
self._behind_count = max(0, self._behind_count - 1) | ||
if frame_diff > 0: | ||
try: | ||
data = next(self._frames) | ||
self._texture.write(data) | ||
self._current_frame += 1 | ||
except StopIteration: | ||
self.seek(0) | ||
return True | ||
|
||
return False | ||
|
||
def seek(self, time: float) -> None: | ||
"""Seek to specified time position.""" | ||
time = max(0, min(time, self.duration)) | ||
self._decoder.seek(time) | ||
self._frames = self._decoder.get_frames() | ||
self._current_frame = int(time * self.fps) | ||
self._target_frame = self._current_frame | ||
self._behind_count = 0 | ||
|
||
def toggle_pause(self) -> None: | ||
"""Toggle pause state.""" | ||
self._paused = not self._paused | ||
|
||
try: | ||
data = next(self._frames) | ||
except StopIteration: | ||
return | ||
self._texture.write(data) | ||
@property | ||
def current_frame(self) -> int: | ||
return self._current_frame | ||
|
||
@property | ||
def target_frame(self) -> int: | ||
return self._target_frame | ||
|
||
def next_frame(self) -> av.plane.Plane: | ||
"""Get RGB data for the next frame. | ||
A VideoPlane is returned containing the RGB data. | ||
This objects supports the buffer protocol and can be written to a texture directly. | ||
""" | ||
return next(self._frames) | ||
@property | ||
def is_paused(self) -> bool: | ||
return self._paused | ||
|
||
|
||
class VideoTest(moderngl_window.WindowConfig): | ||
class VideoPlayerWindow(moderngl_window.WindowConfig): | ||
"""ModernGL window configuration for video playback.""" | ||
|
||
gl_version = (3, 3) | ||
title = "Video Player" | ||
resource_dir = Path(__file__).parent.resolve() / "resources" | ||
vsync = True | ||
seek_time = 1.0 # Seconds to seek when using arrow keys | ||
log_level = logging.DEBUG | ||
|
||
def __init__(self, **kwargs): | ||
super().__init__(**kwargs) | ||
|
||
self.player = Player(self.ctx, self.resource_dir / "videos/Lightning - 33049.mp4") | ||
print("duration :", self.player.duration) | ||
print("fps :", self.player.fps) | ||
print("video_size :", self.player.video_size) | ||
print("frames :", self.player.frames) | ||
print("step :", self.player._decoder.frame_step) | ||
# Initialize video player | ||
video_path = self.resource_dir / "videos" / "Lightning - 33049.mp4" | ||
|
||
self.player = VideoPlayer(self.ctx, video_path) | ||
|
||
# Setup rendering | ||
self.quad = geometry.quad_fs() | ||
self.program = self.load_program("programs/texture_flipped.glsl") | ||
|
||
def on_render(self, time: float, frametime: float): | ||
self.player.update(math.fmod(time, 5)) | ||
# Setup stats printing | ||
self._last_print_time = 0 | ||
|
||
def on_render(self, time: float, frametime: float) -> None: | ||
"""Render frame.""" | ||
if self.player.update(time): # Check if video ended | ||
self.timer.time = 0 # Reset timer if video ended | ||
|
||
# Render video | ||
self.player.texture.use(0) | ||
self.quad.render(self.program) | ||
|
||
def on_key_event(self, key, action, modifiers): | ||
# Print debug stats every 0.5 seconds regardless of pause state | ||
if (time - self._last_print_time) >= 0.5 or time < self._last_print_time: | ||
# Get FPS values with safety checks | ||
fps_avg = self.timer.fps_average if self.timer.time > 0 else 0.0 | ||
|
||
logger.debug( | ||
"Movie Target FPS: %.1f | Window FPS: %.1f | Frame: %d/%d | \ | ||
Time: %.2f/%.2f | Frame Diff: %d | Paused: %s", | ||
self.player.fps, | ||
fps_avg, | ||
self.player.current_frame, | ||
self.player.frames, | ||
self.timer.time, | ||
self.player.duration, | ||
self.player.target_frame - self.player.current_frame, | ||
self.player.is_paused, | ||
) | ||
self._last_print_time = time | ||
|
||
def _handle_seek(self, direction: Literal["forward", "backward"]) -> None: | ||
"""Handle seeking in video. direction: 'forward' or 'backward'""" | ||
seek_amount = self.seek_time if direction == "forward" else -self.seek_time | ||
new_time = max(0, min(self.player.duration, self.timer.time + seek_amount)) | ||
|
||
if self.timer.is_paused: | ||
self.player.seek(new_time) | ||
else: | ||
self.timer.time = new_time | ||
self.player.seek(new_time) | ||
|
||
def on_key_event(self, key, action, modifiers) -> None: | ||
"""Handle keyboard input.""" | ||
super().on_key_event(key, action, modifiers) | ||
keys = self.wnd.keys | ||
|
||
# Key presses | ||
if action == keys.ACTION_PRESS: | ||
if key == keys.LEFT: | ||
self.timer.time = self.timer.time - 10 | ||
|
||
if key == keys.RIGHT: | ||
self.timer.time = self.timer.time + 10 | ||
|
||
if key == keys.SPACE: | ||
self._handle_seek("backward") | ||
elif key == keys.RIGHT: | ||
self._handle_seek("forward") | ||
elif key == keys.SPACE: | ||
self.timer.toggle_pause() | ||
self.player.toggle_pause() | ||
|
||
|
||
if __name__ == "__main__": | ||
VideoTest.run() | ||
VideoPlayerWindow.run() |
Oops, something went wrong.