diff --git a/worker/transcribee_worker/reencode.py b/worker/transcribee_worker/reencode.py index 8ca9e18c..09d51503 100644 --- a/worker/transcribee_worker/reencode.py +++ b/worker/transcribee_worker/reencode.py @@ -1,6 +1,8 @@ -import select +import logging import subprocess from pathlib import Path +from threading import Thread +from typing import IO import ffmpeg from transcribee_worker.types import ProgressCallbackType @@ -11,20 +13,6 @@ def get_duration(input_path: Path): return float(ffmpeg.probe(input_path)["format"]["duration"]) -def readlines(pipes): - while True: - fdsin, _, _ = select.select([x.fileno() for x in pipes], [], []) - for fd in fdsin: - pipe = next(x for x in pipes if x.fileno() == fd) - line = pipe.readline() - if len(line) == 0: - pipes.remove(pipe) - continue - yield pipe, line - if pipes == []: - break - - async def reencode( input_path: Path, output_path: Path, @@ -33,6 +21,17 @@ async def reencode( duration: float, include_video: bool, ): + def _read_progress(stdout: IO): + for raw_line in stdout: + key, value = raw_line.decode().strip().split("=", maxsplit=1) + + if key == "out_time_ms": + out_time_ms = int(value) + out_time_s = out_time_ms / 1e6 + progress_callback( + progress=out_time_s / duration, + ) + def work(_): pipeline = ffmpeg.input(input_path) streams = [pipeline["a:0"]] # use only first audio stream @@ -46,31 +45,18 @@ def work(_): progress="-", map_metadata="-1", **output_params, - ).run_async(pipe_stdout=True, pipe_stderr=True) + ).run_async(pipe_stderr=True, pipe_stdout=True) + assert cmd.stderr assert cmd.stdout - raw_line: bytes - progress_dict = {} - stderr_data = [] + Thread(target=_read_progress, args=(cmd.stdout,)).start() - for pipe, raw_line in readlines([cmd.stdout, cmd.stderr]): - if pipe == cmd.stderr: - stderr_data.append(raw_line.decode()) - continue - if b"=" not in raw_line: - continue - key, value = raw_line.decode().strip().split("=", maxsplit=1) - progress_dict[key] = value.strip() + stderr_data = [] + for raw_line in cmd.stderr: + line = raw_line.decode().rstrip() + logging.info(line) + stderr_data.append(line) - if key == "progress": - if "out_time_ms" in progress_dict: - out_time_ms = int(progress_dict["out_time_ms"]) - out_time_s = out_time_ms / 1e6 - progress_callback( - progress=out_time_s / duration, - extra_data=progress_dict, - ) - progress_dict = {} returncode = cmd.wait() progress_callback( progress=1,