From 3659c0fd47c56a04bd166cefd6198caed8fe32c8 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sat, 1 Mar 2025 21:41:52 +0000 Subject: [PATCH] Wait for all child processes to finish before temp file cleanup and exit (#283) --- CHANGELOG.md | 1 + Cargo.lock | 9 ++-- Cargo.toml | 1 + src/command/encode.rs | 1 + src/command/sample_encode.rs | 1 + src/ffmpeg.rs | 7 ++- src/main.rs | 3 ++ src/process.rs | 79 ++++++++++++++++++++------- src/process/child.rs | 100 +++++++++++++++++++++++++++++++++++ src/vmaf.rs | 7 ++- src/xpsnr.rs | 7 ++- 11 files changed, 186 insertions(+), 30 deletions(-) create mode 100644 src/process/child.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index a09cfd6..9c59de1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Unreleased (0.9.2) * Log crf results, instead of printing, if stderr is not a terminal. +* Wait for all child processes (ffmpeg etc) to finish before temp file cleanup and exit. # v0.9.1 * Fix xpsnr inf score parsing. diff --git a/Cargo.lock b/Cargo.lock index 0d362ea..7cbfbf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,7 @@ dependencies = [ "indicatif", "infer", "log", + "pin-project-lite", "serde", "serde_json", "shell-escape", @@ -166,9 +167,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "blake3" @@ -608,7 +609,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.8.0", + "bitflags 2.9.0", "libc", ] @@ -792,7 +793,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.8.0", + "bitflags 2.9.0", "errno", "libc", "linux-raw-sys", diff --git a/Cargo.toml b/Cargo.toml index 4568c1c..b3c1c10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ humantime = "2.1" indicatif = "0.17" infer = { version = "0.19", default-features = false } log = "0.4.21" +pin-project-lite = "0.2.16" serde = { version = "1.0.185", features = ["derive"] } serde_json = "1.0.105" shell-escape = "0.1.5" diff --git a/src/command/encode.rs b/src/command/encode.rs index 16bc67b..23c8e9c 100644 --- a/src/command/encode.rs +++ b/src/command/encode.rs @@ -118,6 +118,7 @@ pub async fn run( } => stream_sizes = Some((video, audio, subtitle, other)), } } + crate::process::child::add(enc.into()); bar.finish(); // successful encode, so don't delete it! diff --git a/src/command/sample_encode.rs b/src/command/sample_encode.rs index 88bc3e4..f30a15b 100644 --- a/src/command/sample_encode.rs +++ b/src/command/sample_encode.rs @@ -281,6 +281,7 @@ pub fn run( logger.update(sample_duration, time, fps); } } + crate::process::child::add(output.into()); let encode_time = b.elapsed(); let encoded_size = fs::metadata(&encoded_sample).await?.len(); let encoded_probe = ffprobe::probe(&encoded_sample); diff --git a/src/ffmpeg.rs b/src/ffmpeg.rs index bb2dcad..ad6b2c3 100644 --- a/src/ffmpeg.rs +++ b/src/ffmpeg.rs @@ -2,7 +2,7 @@ use crate::{ command::args::PixelFormat, float::TerseF32, - process::{CommandExt, FfmpegOut}, + process::{CommandExt, FfmpegOut, FfmpegOutStream}, temporary::{self, TempKind}, }; use anyhow::Context; @@ -15,7 +15,6 @@ use std::{ sync::{Arc, LazyLock}, }; use tokio::process::Command; -use tokio_stream::Stream; /// Exposed ffmpeg encoding args. #[derive(Debug, Clone)] @@ -72,7 +71,7 @@ pub fn encode_sample( }: FfmpegEncodeArgs, temp_dir: Option, dest_ext: &str, -) -> anyhow::Result<(PathBuf, impl Stream>)> { +) -> anyhow::Result<(PathBuf, FfmpegOutStream)> { let pre = pre_extension_name(&vcodec); let crf_str = format!("{}", TerseF32(crf)).replace('.', "_"); let dest_file_name = match &preset { @@ -127,7 +126,7 @@ pub fn encode( has_audio: bool, audio_codec: Option<&str>, downmix_to_stereo: bool, -) -> anyhow::Result>> { +) -> anyhow::Result { let oargs: HashSet<_> = output_args.iter().map(|a| a.as_str()).collect(); let output_ext = output.extension().and_then(|e| e.to_str()); diff --git a/src/main.rs b/src/main.rs index 092320b..41caddb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -60,6 +60,9 @@ async fn main() { r = command => r, _ = signal::ctrl_c() => Err(anyhow!("ctrl_c")), }; + drop(local); + + crate::process::child::wait().await; // Final cleanup. Samples are already deleted (if wished by the user) during `command::sample_encode::run`. temporary::clean(keep).await; diff --git a/src/process.rs b/src/process.rs index 8a9244a..ef9cf8c 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,17 +1,21 @@ +pub mod child; + use anyhow::{anyhow, ensure}; use std::{ borrow::Cow, ffi::OsStr, fmt::Display, io, + pin::Pin, process::{ExitStatus, Output}, sync::Arc, + task::{Context, Poll, ready}, time::Duration, }; use time::macros::format_description; use tokio::process::Child; use tokio_process_stream::{Item, ProcessChunkStream}; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::Stream; pub fn ensure_success(name: &'static str, out: &Output) -> anyhow::Result<()> { ensure!( @@ -103,23 +107,13 @@ impl FfmpegOut { None } - pub fn stream( - child: Child, - name: &'static str, - cmd_str: String, - ) -> impl Stream> { - let mut chunks = Chunks::default(); - ProcessChunkStream::from(child).filter_map(move |item| match item { - Item::Stderr(chunk) => { - chunks.push(&chunk); - FfmpegOut::try_parse(chunks.last_line()).map(Ok) - } - Item::Stdout(_) => None, - Item::Done(code) => match exit_ok_stderr(name, code, &cmd_str, &chunks) { - Ok(_) => None, - Err(err) => Some(Err(err)), - }, - }) + pub fn stream(child: Child, name: &'static str, cmd_str: String) -> FfmpegOutStream { + FfmpegOutStream { + chunk_stream: ProcessChunkStream::from(child), + chunks: <_>::default(), + name, + cmd_str, + } } } @@ -231,6 +225,55 @@ impl Chunks { } } +pin_project_lite::pin_project! { + #[must_use = "streams do nothing unless polled"] + pub struct FfmpegOutStream { + #[pin] + chunk_stream: ProcessChunkStream, + name: &'static str, + cmd_str: String, + chunks: Chunks, + } +} + +impl From for ProcessChunkStream { + fn from(stream: FfmpegOutStream) -> Self { + stream.chunk_stream + } +} + +impl Stream for FfmpegOutStream { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match ready!(self.as_mut().project().chunk_stream.poll_next(cx)) { + Some(item) => match item { + Item::Stderr(chunk) => { + self.chunks.push(&chunk); + if let Some(out) = FfmpegOut::try_parse(self.chunks.last_line()) { + return Poll::Ready(Some(Ok(out))); + } + } + Item::Stdout(_) => {} + Item::Done(code) => { + if let Err(err) = + exit_ok_stderr(self.name, code, &self.cmd_str, &self.chunks) + { + return Poll::Ready(Some(Err(err))); + } + } + }, + None => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, self.chunk_stream.size_hint().1) + } +} + #[test] fn parse_ffmpeg_progress_chunk() { let out = "frame= 288 fps= 94 q=-0.0 size=N/A time=01:23:12.34 bitrate=N/A speed=3.94x \r"; diff --git a/src/process/child.rs b/src/process/child.rs new file mode 100644 index 0000000..4b23c91 --- /dev/null +++ b/src/process/child.rs @@ -0,0 +1,100 @@ +use log::info; +use std::{ + io::IsTerminal, + mem, + ops::{Deref, DerefMut}, + pin::pin, + sync::{LazyLock, Mutex}, + time::Duration, +}; +use tokio::{ + signal, + time::{Instant, timeout_at}, +}; +use tokio_process_stream::ProcessChunkStream; + +static RUNNING: LazyLock>> = LazyLock::new(<_>::default); + +/// Add a child process so it may be waited on before exiting. +pub fn add(mut child: ProcessChunkStream) { + let mut running = RUNNING.lock().unwrap(); + + // remove any that have exited already + running.retain_mut(|c| c.child_mut().is_some_and(|c| c.try_wait().is_err())); + + if child.child_mut().is_some_and(|c| c.try_wait().is_err()) { + running.push(child); + } +} + +/// Wait for all child processes, that were added with [`add`], to exit. +pub async fn wait() { + // if waiting takes >500ms log what's happening + let mut log_deadline = Some(Instant::now() + Duration::from_millis(500)); + let procs = mem::take(&mut *RUNNING.lock().unwrap()); + let mut ctrl_c = pin!(signal::ctrl_c()); + + for mut proc in procs { + if let Some(child) = proc.child_mut() { + if let Some(deadline) = log_deadline { + if timeout_at(deadline, child.wait()).await.is_err() { + log_waiting(); + log_deadline = None; + } + } + tokio::select! { + _ = &mut ctrl_c => { + log_abort_wait(); + return; + } + _ = child.wait() => {} + } + } + } +} + +fn log_waiting() { + match std::io::stderr().is_terminal() { + true => eprintln!("Waiting for child processes to exit..."), + _ => info!("Waiting for child processes to exit"), + } +} + +fn log_abort_wait() { + match std::io::stderr().is_terminal() { + true => eprintln!("Aborting wait for child processes"), + _ => info!("Aborting wait for child processes"), + } +} + +/// Wrapper that [`add`]s the inner on drop. +#[derive(Debug)] +pub struct AddOnDropChunkStream(Option); + +impl From for AddOnDropChunkStream { + fn from(v: ProcessChunkStream) -> Self { + Self(Some(v)) + } +} + +impl Drop for AddOnDropChunkStream { + fn drop(&mut self) { + if let Some(child) = self.0.take() { + add(child); + } + } +} + +impl Deref for AddOnDropChunkStream { + type Target = ProcessChunkStream; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().unwrap() // only none after drop + } +} + +impl DerefMut for AddOnDropChunkStream { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_mut().unwrap() // only none after drop + } +} diff --git a/src/vmaf.rs b/src/vmaf.rs index 2d7bc80..8c734b2 100644 --- a/src/vmaf.rs +++ b/src/vmaf.rs @@ -21,7 +21,8 @@ pub fn run( ); let mut cmd = Command::new("ffmpeg"); - cmd.arg2_opt("-r", fps) + cmd.kill_on_drop(true) + .arg2_opt("-r", fps) .arg2("-i", distorted) .arg2_opt("-r", fps) .arg2("-i", reference) @@ -37,7 +38,9 @@ pub fn run( let cmd_str = cmd.to_cmd_str(); debug!("cmd `{cmd_str}`"); - let mut vmaf = ProcessChunkStream::try_from(cmd).context("ffmpeg vmaf")?; + let mut vmaf = crate::process::child::AddOnDropChunkStream::from( + ProcessChunkStream::try_from(cmd).context("ffmpeg vmaf")?, + ); Ok(async_stream::stream! { let mut chunks = Chunks::default(); diff --git a/src/xpsnr.rs b/src/xpsnr.rs index 6e549fa..df9e66f 100644 --- a/src/xpsnr.rs +++ b/src/xpsnr.rs @@ -21,7 +21,8 @@ pub fn run( ); let mut cmd = Command::new("ffmpeg"); - cmd.arg2_opt("-r", fps) + cmd.kill_on_drop(true) + .arg2_opt("-r", fps) .arg2("-i", reference) .arg2_opt("-r", fps) .arg2("-i", distorted) @@ -32,7 +33,9 @@ pub fn run( let cmd_str = cmd.to_cmd_str(); debug!("cmd `{cmd_str}`"); - let mut xpsnr = ProcessChunkStream::try_from(cmd).context("ffmpeg xpsnr")?; + let mut xpsnr = crate::process::child::AddOnDropChunkStream::from( + ProcessChunkStream::try_from(cmd).context("ffmpeg xpsnr")?, + ); Ok(async_stream::stream! { let mut chunks = Chunks::default();