From e1d5574ee7da5d0e6bff40c13756515dc56b193f Mon Sep 17 00:00:00 2001 From: Thomas Erlang Date: Sat, 9 Dec 2023 03:02:11 +0100 Subject: [PATCH] Hls transcoder can now generate the whole file index and handles seeking in native clients --- seplis_play_server/main.py | 3 + seplis_play_server/routes/hls.py | 92 ++++++++++++ seplis_play_server/routes/request_media.py | 10 +- seplis_play_server/transcoders/hls.py | 140 ++++++++++++++---- seplis_play_server/transcoders/subtitle.py | 8 +- seplis_play_server/transcoders/video.py | 82 ++++++---- .../transcoders/test_hls.py | 89 +++++++++++ 7 files changed, 355 insertions(+), 69 deletions(-) create mode 100644 seplis_play_server/routes/hls.py create mode 100644 tests/seplis_play_server/transcoders/test_hls.py diff --git a/seplis_play_server/main.py b/seplis_play_server/main.py index 5502a70..cfad27d 100644 --- a/seplis_play_server/main.py +++ b/seplis_play_server/main.py @@ -16,6 +16,7 @@ close_session, download_source, request_media, + hls, ) app = FastAPI( @@ -37,9 +38,11 @@ app.include_router(close_session.router) app.include_router(download_source.router) app.include_router(request_media.router) +app.include_router(hls.router) # The media.m3u8 gets updated too fast and the browser gets an old version StaticFiles.is_not_modified = lambda *args, **kwargs: False +app.include_router(hls.router) app.mount('/files', StaticFiles(directory=config.transcode_folder), name='files') @app.on_event('startup') diff --git a/seplis_play_server/routes/hls.py b/seplis_play_server/routes/hls.py new file mode 100644 index 0000000..7998585 --- /dev/null +++ b/seplis_play_server/routes/hls.py @@ -0,0 +1,92 @@ +import asyncio +import os.path +import anyio +from fastapi import APIRouter, HTTPException, Depends, Response +from fastapi.responses import FileResponse + +from seplis_play_server import logger + +from .. import config +from ..dependencies import get_metadata +from ..transcoders.video import Transcode_settings, close_transcoder, sessions +from ..transcoders.hls import Hls_transcoder + +router = APIRouter() + +@router.get('/hls/media.m3u8') +async def start_media( + settings: Transcode_settings = Depends(), + metadata = Depends(get_metadata), +): + if not metadata or settings.source_index > len(metadata): + raise HTTPException(404, 'No metadata') + + if settings.session in sessions: + transcoder = Hls_transcoder(settings=settings, metadata=metadata[settings.source_index]) + else: + transcoder = await start_transcode(settings) + return Response( + content=transcoder.generate_hls_playlist(), + media_type='application/x-mpegURL', + ) + +@router.get('/hls/media{segment}.m4s') +async def get_media( + segment: int, + settings: Transcode_settings = Depends(), +): + if settings.session in sessions: + folder = sessions[settings.session].transcode_folder + + if await Hls_transcoder.is_segment_ready(folder, segment): + return FileResponse(Hls_transcoder.get_segment_path(folder, segment)) + + # If the segment is within 15 segments of the last transcoded segment + # then wait for the segment to be transcoded. + first_transcoded_segment, last_transcoded_segment = await Hls_transcoder.first_last_transcoded_segment(folder) + if first_transcoded_segment <= segment and (last_transcoded_segment + 15) >= segment: + logger.debug(f'Requested segment {segment} is within the range {first_transcoded_segment}-{last_transcoded_segment+15} to wait for transcoding') + if await Hls_transcoder.wait_for_segment(folder, segment): + return FileResponse(Hls_transcoder.get_segment_path(folder, segment)) + + logger.debug(f'Requested segment {segment} is not within the range {first_transcoded_segment}-{last_transcoded_segment+15} to wait for transcoding, start a new transcoder') + else: + logger.debug(f'Start new transcoder since the session does not exist') + + await start_transcode(settings, segment) + + folder = sessions[settings.session].transcode_folder + if await Hls_transcoder.wait_for_segment(folder, segment): + return FileResponse(Hls_transcoder.get_segment_path(folder, segment)) + + raise HTTPException(404, 'No media') + +@router.get('/hls/init.mp4') +def get_init( + settings: Transcode_settings = Depends(), +): + try: + return FileResponse(os.path.join( + config.transcode_folder, + settings.session, + 'init.mp4', + )) + except: + raise HTTPException(404, 'No init file') + +async def start_transcode(settings: Transcode_settings, start_segment: int = -1): + metadata = await get_metadata(settings.play_id) + if not metadata or settings.source_index > len(metadata): + raise HTTPException(404, 'No metadata') + transcode = Hls_transcoder(settings=settings, metadata=metadata[settings.source_index]) + if start_segment == -1: + transcode.settings.start_segment = transcode.start_segment_from_start_time(settings.start_time) + transcode.settings.start_time = transcode.start_time_from_segment(transcode.settings.start_segment) + else: + transcode.settings.start_time = transcode.start_time_from_segment(start_segment) + transcode.settings.start_segment = start_segment + + ready = await transcode.start() + if ready == False: + raise HTTPException(500, 'Transcode failed to start') + return transcode \ No newline at end of file diff --git a/seplis_play_server/routes/request_media.py b/seplis_play_server/routes/request_media.py index 8ad196c..39e7c01 100644 --- a/seplis_play_server/routes/request_media.py +++ b/seplis_play_server/routes/request_media.py @@ -1,5 +1,5 @@ from fastapi import APIRouter, HTTPException, Depends -from pydantic import BaseModel, RootModel +from pydantic import BaseModel from urllib.parse import urlencode from ..transcoders.video import Transcode_settings, Transcoder from ..dependencies import get_metadata @@ -10,7 +10,6 @@ class Request_media(BaseModel): direct_play_url: str can_direct_play: bool transcode_url: str - transcode_start_time: float @router.get('/request-media', response_model=Request_media) async def request_media( @@ -23,16 +22,11 @@ async def request_media( t = Transcoder(settings=settings, metadata=metadata[source_index]) - settings_dict = RootModel[Transcode_settings](settings).model_dump(exclude_none=True, exclude_unset=True) - for key in settings_dict: - if isinstance(settings_dict[key], list): - settings_dict[key] = ','.join(settings_dict[key]) can_device_direct_play = t.can_device_direct_play() format_supported = any(fmt in settings.supported_video_containers \ for fmt in metadata[source_index]['format']['format_name'].split(',')) return Request_media( direct_play_url=f'/source?play_id={settings.play_id}&source_index={source_index}', can_direct_play=format_supported and can_device_direct_play and t.can_copy_audio(), - transcode_url=f'/transcode?source_index={source_index}&{urlencode(settings_dict)}', - transcode_start_time=t.closest_keyframe_time(settings.start_time), + transcode_url=f'/hls/media.m3u8?{urlencode(settings.to_args_dict())}', ) \ No newline at end of file diff --git a/seplis_play_server/transcoders/hls.py b/seplis_play_server/transcoders/hls.py index 68cd3d3..32a58ce 100644 --- a/seplis_play_server/transcoders/hls.py +++ b/seplis_play_server/transcoders/hls.py @@ -1,24 +1,33 @@ import asyncio, os +import math +import re +from urllib.parse import urlencode +from decimal import Decimal from aiofile import async_open +import anyio from seplis_play_server import logger from . import video class Hls_transcoder(video.Transcoder): + media_name: str = 'media.m3u8' + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - # For now force h264 for hls hevc breaks in safari for some reason + # For now force h264 since hls hevc breaks in safari for some reason self.settings.transcode_video_codec = 'h264' self.settings.supported_video_codecs = ['h264'] def ffmpeg_extend_args(self) -> None: self.ffmpeg_args.extend([ + *self.keyframe_params(), {'-f': 'hls'}, {'-hls_playlist_type': 'event'}, {'-hls_segment_type': 'fmp4'}, {'-hls_time': str(self.segment_time())}, {'-hls_list_size': '0'}, + {'-start_number': str(self.settings.start_segment or 0)}, {self.media_path: None}, ]) @@ -26,41 +35,117 @@ def ffmpeg_extend_args(self) -> None: def media_path(self) -> str: return os.path.join(self.transcode_folder, self.media_name) - @property - def media_name(self) -> str: - return 'media.m3u8' - async def wait_for_media(self): - files = 0 + await self.wait_for_segment( + self.transcode_folder, + self.settings.start_segment or 0, + ) + + @classmethod + async def wait_for_segment(cls, transcode_folder: str, segment: str | int): + async def wait_for(): + while True: + if await cls.is_segment_ready(transcode_folder, segment): + return True + await asyncio.sleep(0.1) + try: + return await asyncio.wait_for(wait_for(), timeout=10) + except asyncio.TimeoutError: + logger.error(f'[{transcode_folder}] Timeout waiting for segment {segment}') + return False - while True: - if os.path.exists(self.media_path): - async with async_open(self.media_path, "r") as afp: - async for line in afp: - if not '#' in line: - files += 1 - if files >= 1: - return True - await asyncio.sleep(0.5) + @classmethod + async def first_last_transcoded_segment(cls, transcode_folder: str): + f = os.path.join(transcode_folder, cls.media_name) + first, last = (0, 0) + if await anyio.to_thread.run_sync(os.path.exists, f): + async with async_open(f, "r") as afp: + async for line in afp: + if not '#' in line: + m = re.search(r'(\d+)\.m4s', line) + last = int(m.group(1)) + if not first: + first = last + else: + logger.debug(f'No media file {f}') + return (first, last) + + @classmethod + async def is_segment_ready(cls, transcode_folder: str, segment: int): + return await anyio.to_thread.run_sync( + os.path.exists, + cls.get_segment_path(transcode_folder, segment) + ) + + @staticmethod + def get_segment_path(transcode_folder: str, segment: int): + return os.path.join(transcode_folder, f'media{segment}.m4s') - async def write_hls_playlist(self) -> None: + def generate_hls_playlist(self): + settings_dict = self.settings.to_args_dict() + url_settings = urlencode(settings_dict) + segments = self.get_segments() l = [] l.append('#EXTM3U') l.append('#EXT-X-VERSION:7') l.append('#EXT-X-PLAYLIST-TYPE:VOD') - l.append(f'#EXT-X-TARGETDURATION:{str(self.segment_time())}') + l.append(f'#EXT-X-TARGETDURATION:{round(max(segments)) if len(segments) > 0 else str(self.segment_time())}') l.append('#EXT-X-MEDIA-SEQUENCE:0') + l.append(f'#EXT-X-MAP:URI="/hls/init.mp4?{url_settings}"') - # Keyframes is in self.metadata['keyframes'] - - # Make the EXTINF lines - prev = 0.0 - for i, t in enumerate(self.metadata['keyframes']): - l.append(f'#EXTINF:{str(t-prev)},') - l.append(f'media{i}.m4s') + for i, segment_time in enumerate(segments): + l.append(f'#EXTINF:{str(segment_time)}, nodesc') + l.append(f'/hls/media{i}.m4s?{url_settings}') + l.append('#EXT-X-ENDLIST') + return '\n'.join(l) + + def get_segments(self): + if self.can_copy_video(): + return self.calculate_keyframe_segments() + else: + return self.calculate_equal_segments() - logger.info(l) + def calculate_keyframe_segments(self): + result: list[Decimal] = [] + target_duration = Decimal(self.segment_time()) + keyframes = [Decimal(t) for t in self.metadata['keyframes']] + break_time = target_duration + prev_keyframe = Decimal(0) + for keyframe in keyframes: + if keyframe >= break_time: + result.append(keyframe - prev_keyframe) + prev_keyframe = keyframe + break_time += target_duration + result.append(Decimal(self.metadata['format']['duration']) - prev_keyframe) + return result + def calculate_equal_segments(self): + target_duration = Decimal(self.segment_time()) + duration = Decimal(self.metadata['format']['duration']) + segments = duration / target_duration + left_over = duration % target_duration + result = [target_duration for _ in range(int(segments))] + if left_over: + result.append(left_over) + return result + + def start_time_from_segment(self, segment: int) -> Decimal: + segments = self.get_segments() + if segment >= len(segments) or segment < 1: + return Decimal(0) + return sum(segments[:segment]) + + def start_segment_from_start_time(self, start_time: Decimal) -> int: + if start_time <= 0: + return 0 + segments = self.get_segments() + time = Decimal(0) + for i, t in enumerate(segments): + time += t + if time > start_time: + return i + return 0 + def keyframe_params(self) -> list[dict]: if self.output_codec_lib == 'copy': return [] @@ -69,12 +154,11 @@ def keyframe_params(self) -> list[dict]: keyframe_args = [ {'-force_key_frames:0': f'expr:gte(t,n_forced*{self.segment_time()})'}, ] - if self.video_stream.get('r_frame_rate'): r_frame_rate = self.video_stream['r_frame_rate'].split('/') - r_frame_rate = int(r_frame_rate[0]) / int(r_frame_rate[1]) + r_frame_rate = Decimal(r_frame_rate[0]) / Decimal(r_frame_rate[1]) - v = self.segment_time() * r_frame_rate + v = math.ceil(Decimal(self.segment_time()) * r_frame_rate) go_args.extend([ {'-g:v:0': str(v)}, {'-keyint_min:v:0': str(v)}, diff --git a/seplis_play_server/transcoders/subtitle.py b/seplis_play_server/transcoders/subtitle.py index 34f3d11..a171677 100644 --- a/seplis_play_server/transcoders/subtitle.py +++ b/seplis_play_server/transcoders/subtitle.py @@ -12,8 +12,8 @@ async def get_subtitle_file(metadata: Dict, lang: str, start_time: int): if not sub_index: return args = [ - {'-analyzeduration': '20000000'}, - {'-probesize': '20000000'}, + {'-analyzeduration': '200M'}, + {'-probesize': '200M'}, {'-ss': str(start_time)}, {'-i': metadata['format']['filename']}, {'-y': None}, @@ -57,8 +57,8 @@ async def get_subtitle_file_from_external(id_: int, start_time: int): return None args = [ - {'-analyzeduration': '20000000'}, - {'-probesize': '20000000'}, + {'-analyzeduration': '200M'}, + {'-probesize': '200M'}, {'-ss': str(start_time)}, {'-i': sub_metadata.path}, {'-y': None}, diff --git a/seplis_play_server/transcoders/video.py b/seplis_play_server/transcoders/video.py index a23318e..6f80d3b 100644 --- a/seplis_play_server/transcoders/video.py +++ b/seplis_play_server/transcoders/video.py @@ -1,3 +1,4 @@ +from decimal import Decimal import os, asyncio, sys import shutil from fastapi import Query @@ -8,8 +9,10 @@ @dataclass class Transcode_settings: + source_index: int play_id: constr(min_length=1) session: constr(min_length=1) + source_index: int supported_video_codecs: Annotated[list[constr(min_length=1)], Query()] supported_audio_codecs: Annotated[list[constr(min_length=1)], Query()] format: Literal['pipe', 'hls', 'dash'] @@ -18,7 +21,8 @@ class Transcode_settings: supported_hdr_formats: list[Literal['hdr10', 'hlg', 'dovi']] = Query(default=[]) supported_video_color_bit_depth: conint(ge=8) = 10 - start_time: Optional[float] | constr(max_length=0) = 0 + start_time: Optional[Decimal] | constr(max_length=0) = 0 + start_segment: Optional[int] | constr(max_length=0) = None audio_lang: Optional[str] = None audio_channels: Optional[int] | constr(max_length=0) = None width: Optional[int] | constr(max_length=0) = None @@ -39,6 +43,14 @@ def comma_string(cls, v): l.extend([s.strip() for s in a.split(',')]) return l + def to_args_dict(self): + from pydantic import RootModel + settings_dict = RootModel[Transcode_settings](self).model_dump(exclude_none=True, exclude_unset=True) + for key in settings_dict: + if isinstance(settings_dict[key], list): + settings_dict[key] = ','.join(settings_dict[key]) + return settings_dict + class Video_color(BaseModel): range: str @@ -86,11 +98,7 @@ def __init__(self, settings: Transcode_settings, metadata: Dict): async def start(self, send_data_callback=None) -> bool | bytes: self.transcode_folder = self.create_transcode_folder() - if self.settings.session in sessions: - try: - return await asyncio.wait_for(self.wait_for_media(), timeout=5) - except asyncio.TimeoutError: - return False + await self.set_ffmpeg_args() args = to_subprocess_arguments(self.ffmpeg_args) @@ -140,16 +148,27 @@ def media_name(self) -> str: def register_session(self): loop = asyncio.get_event_loop() - logger.info(f'[{self.settings.session}] Registered') - sessions[self.settings.session] = Session_model( - process=self.process, - transcode_folder=self.transcode_folder, - call_later=loop.call_later( + if self.settings.session in sessions: + close_transcoder(self.settings.session) + logger.info(f'[{self.settings.session}] Reregistered') + sessions[self.settings.session].process = self.process + sessions[self.settings.session].call_later.cancel() + sessions[self.settings.session].call_later = loop.call_later( config.session_timeout, close_session_callback, self.settings.session - ), - ) + ) + else: + logger.info(f'[{self.settings.session}] Registered') + sessions[self.settings.session] = Session_model( + process=self.process, + transcode_folder=self.transcode_folder, + call_later=loop.call_later( + config.session_timeout, + close_session_callback, + self.settings.session + ), + ) async def set_ffmpeg_args(self): self.ffmpeg_args = [ @@ -159,13 +178,16 @@ async def set_ffmpeg_args(self): if self.settings.start_time: self.ffmpeg_args.append({'-ss': str(self.settings.start_time)}) self.ffmpeg_args.extend([ - {'-autorotate': '0'}, - {'-i': self.metadata['format']['filename']}, - {'-y': None}, - {'-copyts': None}, + {'-i': f"file:{self.metadata['format']['filename']}"}, + {'-map_metadata': '-1'}, + {'-map_chapters': '-1'}, + {'-threads': '0'}, {'-start_at_zero': None}, + #{'-copyts': None}, {'-avoid_negative_ts': 'disabled'}, {'-muxdelay': '0'}, + {'-max_delay': '5000000'}, + {'-max_muxing_queue_size': '2048'}, ]) self.set_video() self.set_audio() @@ -178,7 +200,6 @@ def closest_keyframe_time(self, time: float): keyframes = [float(r) for r in self.metadata['keyframes']] corrected_time = 0 for t in keyframes: - logger.info(f'[{self.settings.session}] Keyframe: {t}') if t > time: break corrected_time = t @@ -208,10 +229,6 @@ def set_hardware_decoder(self): def set_video(self): codec = codecs_to_library.get(self.settings.transcode_video_codec, self.settings.transcode_video_codec) - self.ffmpeg_args.append({'-map_metadata': '-1'}) - self.ffmpeg_args.append({'-map_chapters': '-1'}) - self.ffmpeg_args.append({'-threads': '0'}) - if self.can_copy_video(): codec = 'copy' self.ffmpeg_args.insert(0, {'-noaccurate_seek': None}) @@ -411,6 +428,7 @@ def set_audio(self): # Audio goes out of sync audio copy is used while the video is being transcoded if self.can_copy_video() and self.can_copy_audio(stream): codec = 'copy' + self.ffmpeg_args.insert(5, {'-noaccurate_seek': None}) else: if not codec or codec not in self.settings.supported_audio_codecs: codec = codecs_to_library.get(self.settings.transcode_audio_codec, '') @@ -530,7 +548,7 @@ def create_transcode_folder(self): return transcode_folder def segment_time(self): - return 6 if self.output_codec_lib == 'copy' else 3 + return 6 if self.can_copy_video() else 3 def subprocess_env(session, type_): @@ -626,18 +644,17 @@ def stream_index_by_lang(metadata: Dict, codec_type:str, lang: str): def close_session_callback(session): + logger.debug(f'[{session}] Session timeout reached') close_session(session) def close_session(session): - logger.info(f'[{session}] Closing') if session not in sessions: + logger.info(f'[{session}] Already closed') return + logger.info(f'[{session}] Closing') + close_transcoder(session) s = sessions[session] - try: - s.process.kill() - except: - pass try: if s.transcode_folder: if os.path.exists(s.transcode_folder): @@ -647,4 +664,11 @@ def close_session(session): except: pass s.call_later.cancel() - del sessions[session] \ No newline at end of file + del sessions[session] + + +def close_transcoder(session): + try: + sessions[session].process.kill() + except: + pass \ No newline at end of file diff --git a/tests/seplis_play_server/transcoders/test_hls.py b/tests/seplis_play_server/transcoders/test_hls.py new file mode 100644 index 0000000..30f825f --- /dev/null +++ b/tests/seplis_play_server/transcoders/test_hls.py @@ -0,0 +1,89 @@ +from seplis_play_server.testbase import run_file +from seplis_play_server.transcoders.hls import Hls_transcoder +from seplis_play_server.transcoders.video import Transcode_settings + +def test_hls(): + settings = Transcode_settings( + play_id='a', + session='123', + supported_video_codecs=['h264'], + supported_audio_codecs=['aac'], + transcode_video_codec='h264', + transcode_audio_codec='aac', + format='hls', + ) + metadata = { + "streams": [ + { + "index": 0, + "codec_name": "hevc", + "codec_long_name": "H.265 / HEVC (High Efficiency Video Coding)", + "profile": "Main 10", + "codec_type": "video", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "width": 1920, + "height": 1080, + "coded_width": 1920, + "coded_height": 1080, + "has_b_frames": 2, + "sample_aspect_ratio": "1:1", + "display_aspect_ratio": "16:9", + "pix_fmt": "yuv420p10le", + "level": 120, + "color_range": "tv", + "chroma_location": "left", + "r_frame_rate": "24000/1001", + "avg_frame_rate": "24000/1001", + }, + { + "index": 1, + "codec_name": "aac", + "codec_type": "audio", + "sample_rate": "48000", + "channels": 6, + } + ], + "format": { + "format_name": "matroska,webm", + "format_long_name": "Matroska / WebM", + "start_time": "0.000000", + "duration": "3486.590000", + "size": "2743430123", + "bit_rate": "6294815", + "probe_score": 100, + }, + "keyframes": [ + "0.000000", + "6.715000", + "10.761000", + "14.473000", + "24.900000", + "25.984000", + "27.819000", + "30.489000", + "31.865000", + "33.200000", + "36.787000", + "38.455000", + "41.208000", + "44.002000", + "46.505000", + "48.757000", + "50.634000", + "56.723000", + "60.352000", + "62.562000", + "68.527000", + "75.909000", + "86.336000", + "90.882000", + "92.384000", + "96.221000", + ] + } + + hls = Hls_transcoder(settings, metadata) + +if __name__ == '__main__': + run_file(__file__) \ No newline at end of file