diff --git a/worker/transcribee_worker/reencode.py b/worker/transcribee_worker/reencode.py index bf29fcbe..65e1bb2d 100644 --- a/worker/transcribee_worker/reencode.py +++ b/worker/transcribee_worker/reencode.py @@ -1,3 +1,4 @@ +import select import subprocess from pathlib import Path @@ -10,6 +11,20 @@ def get_duration(input_path: Path): return float(ffmpeg.probe(input_path)["format"]["duration"]) +def readlines(pipes): + while True: + fdsin, _, _ = select.select([x.fileno() for x in pipes], [], []) + for fd in fdsin: + pipe = next(x for x in pipes if x.fileno() == fd) + line = pipe.readline() + if len(line) == 0: + pipes.remove(pipe) + continue + yield pipe, line + if pipes == []: + break + + async def reencode( input_path: Path, output_path: Path, @@ -27,16 +42,23 @@ def work(_): cmd: subprocess.Popen = ffmpeg.output( *streams, filename=output_path, - loglevel="quiet", stats=None, progress="-", map_metadata="-1", - **output_params - ).run_async(pipe_stdout=True) + **output_params, + ).run_async(pipe_stdout=True, pipe_stderr=True) assert cmd.stdout raw_line: bytes progress_dict = {} - for raw_line in cmd.stdout: + + stderr_data = [] + + for pipe, raw_line in readlines([cmd.stdout, cmd.stderr]): + if pipe == cmd.stderr: + stderr_data.append(raw_line.decode()) + continue + if b"=" not in raw_line: + continue key, value = raw_line.decode().strip().split("=", maxsplit=1) progress_dict[key] = value.strip() @@ -49,5 +71,11 @@ def work(_): extra_data=progress_dict, ) progress_dict = {} + returncode = cmd.wait() + progress_callback( + progress=1, + extra_data={"stderr": stderr_data, "returncode": returncode}, + ) + assert returncode == 0, f"{returncode=}" await alist(aiter(async_task(work)))