Skip to content

Commit

Permalink
Wait for all child processes to finish before temp file cleanup and e…
Browse files Browse the repository at this point in the history
…xit (#283)
  • Loading branch information
alexheretic authored Mar 1, 2025
1 parent 301eb32 commit 3659c0f
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased (0.9.2)
* Log crf results, instead of printing, if stderr is not a terminal.
* Wait for all child processes (ffmpeg etc) to finish before temp file cleanup and exit.

# v0.9.1
* Fix xpsnr inf score parsing.
Expand Down
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ humantime = "2.1"
indicatif = "0.17"
infer = { version = "0.19", default-features = false }
log = "0.4.21"
pin-project-lite = "0.2.16"
serde = { version = "1.0.185", features = ["derive"] }
serde_json = "1.0.105"
shell-escape = "0.1.5"
Expand Down
1 change: 1 addition & 0 deletions src/command/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub async fn run(
} => stream_sizes = Some((video, audio, subtitle, other)),
}
}
crate::process::child::add(enc.into());
bar.finish();

// successful encode, so don't delete it!
Expand Down
1 change: 1 addition & 0 deletions src/command/sample_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ pub fn run(
logger.update(sample_duration, time, fps);
}
}
crate::process::child::add(output.into());
let encode_time = b.elapsed();
let encoded_size = fs::metadata(&encoded_sample).await?.len();
let encoded_probe = ffprobe::probe(&encoded_sample);
Expand Down
7 changes: 3 additions & 4 deletions src/ffmpeg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::{
command::args::PixelFormat,
float::TerseF32,
process::{CommandExt, FfmpegOut},
process::{CommandExt, FfmpegOut, FfmpegOutStream},
temporary::{self, TempKind},
};
use anyhow::Context;
Expand All @@ -15,7 +15,6 @@ use std::{
sync::{Arc, LazyLock},
};
use tokio::process::Command;
use tokio_stream::Stream;

/// Exposed ffmpeg encoding args.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -72,7 +71,7 @@ pub fn encode_sample(
}: FfmpegEncodeArgs,
temp_dir: Option<PathBuf>,
dest_ext: &str,
) -> anyhow::Result<(PathBuf, impl Stream<Item = anyhow::Result<FfmpegOut>>)> {
) -> anyhow::Result<(PathBuf, FfmpegOutStream)> {
let pre = pre_extension_name(&vcodec);
let crf_str = format!("{}", TerseF32(crf)).replace('.', "_");
let dest_file_name = match &preset {
Expand Down Expand Up @@ -127,7 +126,7 @@ pub fn encode(
has_audio: bool,
audio_codec: Option<&str>,
downmix_to_stereo: bool,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<FfmpegOut>>> {
) -> anyhow::Result<FfmpegOutStream> {
let oargs: HashSet<_> = output_args.iter().map(|a| a.as_str()).collect();
let output_ext = output.extension().and_then(|e| e.to_str());

Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ async fn main() {
r = command => r,
_ = signal::ctrl_c() => Err(anyhow!("ctrl_c")),
};
drop(local);

crate::process::child::wait().await;

// Final cleanup. Samples are already deleted (if wished by the user) during `command::sample_encode::run`.
temporary::clean(keep).await;
Expand Down
79 changes: 61 additions & 18 deletions src/process.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
pub mod child;

use anyhow::{anyhow, ensure};
use std::{
borrow::Cow,
ffi::OsStr,
fmt::Display,
io,
pin::Pin,
process::{ExitStatus, Output},
sync::Arc,
task::{Context, Poll, ready},
time::Duration,
};
use time::macros::format_description;
use tokio::process::Child;
use tokio_process_stream::{Item, ProcessChunkStream};
use tokio_stream::{Stream, StreamExt};
use tokio_stream::Stream;

pub fn ensure_success(name: &'static str, out: &Output) -> anyhow::Result<()> {
ensure!(
Expand Down Expand Up @@ -103,23 +107,13 @@ impl FfmpegOut {
None
}

pub fn stream(
child: Child,
name: &'static str,
cmd_str: String,
) -> impl Stream<Item = anyhow::Result<FfmpegOut>> {
let mut chunks = Chunks::default();
ProcessChunkStream::from(child).filter_map(move |item| match item {
Item::Stderr(chunk) => {
chunks.push(&chunk);
FfmpegOut::try_parse(chunks.last_line()).map(Ok)
}
Item::Stdout(_) => None,
Item::Done(code) => match exit_ok_stderr(name, code, &cmd_str, &chunks) {
Ok(_) => None,
Err(err) => Some(Err(err)),
},
})
pub fn stream(child: Child, name: &'static str, cmd_str: String) -> FfmpegOutStream {
FfmpegOutStream {
chunk_stream: ProcessChunkStream::from(child),
chunks: <_>::default(),
name,
cmd_str,
}
}
}

Expand Down Expand Up @@ -231,6 +225,55 @@ impl Chunks {
}
}

pin_project_lite::pin_project! {
#[must_use = "streams do nothing unless polled"]
pub struct FfmpegOutStream {
#[pin]
chunk_stream: ProcessChunkStream,
name: &'static str,
cmd_str: String,
chunks: Chunks,
}
}

impl From<FfmpegOutStream> for ProcessChunkStream {
fn from(stream: FfmpegOutStream) -> Self {
stream.chunk_stream
}
}

impl Stream for FfmpegOutStream {
type Item = anyhow::Result<FfmpegOut>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match ready!(self.as_mut().project().chunk_stream.poll_next(cx)) {
Some(item) => match item {
Item::Stderr(chunk) => {
self.chunks.push(&chunk);
if let Some(out) = FfmpegOut::try_parse(self.chunks.last_line()) {
return Poll::Ready(Some(Ok(out)));
}
}
Item::Stdout(_) => {}
Item::Done(code) => {
if let Err(err) =
exit_ok_stderr(self.name, code, &self.cmd_str, &self.chunks)
{
return Poll::Ready(Some(Err(err)));
}
}
},
None => return Poll::Ready(None),
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, self.chunk_stream.size_hint().1)
}
}

#[test]
fn parse_ffmpeg_progress_chunk() {
let out = "frame= 288 fps= 94 q=-0.0 size=N/A time=01:23:12.34 bitrate=N/A speed=3.94x \r";
Expand Down
100 changes: 100 additions & 0 deletions src/process/child.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use log::info;
use std::{
io::IsTerminal,
mem,
ops::{Deref, DerefMut},
pin::pin,
sync::{LazyLock, Mutex},
time::Duration,
};
use tokio::{
signal,
time::{Instant, timeout_at},
};
use tokio_process_stream::ProcessChunkStream;

static RUNNING: LazyLock<Mutex<Vec<ProcessChunkStream>>> = LazyLock::new(<_>::default);

/// Add a child process so it may be waited on before exiting.
pub fn add(mut child: ProcessChunkStream) {
let mut running = RUNNING.lock().unwrap();

// remove any that have exited already
running.retain_mut(|c| c.child_mut().is_some_and(|c| c.try_wait().is_err()));

if child.child_mut().is_some_and(|c| c.try_wait().is_err()) {
running.push(child);
}
}

/// Wait for all child processes, that were added with [`add`], to exit.
pub async fn wait() {
// if waiting takes >500ms log what's happening
let mut log_deadline = Some(Instant::now() + Duration::from_millis(500));
let procs = mem::take(&mut *RUNNING.lock().unwrap());
let mut ctrl_c = pin!(signal::ctrl_c());

for mut proc in procs {
if let Some(child) = proc.child_mut() {
if let Some(deadline) = log_deadline {
if timeout_at(deadline, child.wait()).await.is_err() {
log_waiting();
log_deadline = None;
}
}
tokio::select! {
_ = &mut ctrl_c => {
log_abort_wait();
return;
}
_ = child.wait() => {}
}
}
}
}

fn log_waiting() {
match std::io::stderr().is_terminal() {
true => eprintln!("Waiting for child processes to exit..."),
_ => info!("Waiting for child processes to exit"),
}
}

fn log_abort_wait() {
match std::io::stderr().is_terminal() {
true => eprintln!("Aborting wait for child processes"),
_ => info!("Aborting wait for child processes"),
}
}

/// Wrapper that [`add`]s the inner on drop.
#[derive(Debug)]
pub struct AddOnDropChunkStream(Option<ProcessChunkStream>);

impl From<ProcessChunkStream> for AddOnDropChunkStream {
fn from(v: ProcessChunkStream) -> Self {
Self(Some(v))
}
}

impl Drop for AddOnDropChunkStream {
fn drop(&mut self) {
if let Some(child) = self.0.take() {
add(child);
}
}
}

impl Deref for AddOnDropChunkStream {
type Target = ProcessChunkStream;

fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap() // only none after drop
}
}

impl DerefMut for AddOnDropChunkStream {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap() // only none after drop
}
}
7 changes: 5 additions & 2 deletions src/vmaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub fn run(
);

let mut cmd = Command::new("ffmpeg");
cmd.arg2_opt("-r", fps)
cmd.kill_on_drop(true)
.arg2_opt("-r", fps)
.arg2("-i", distorted)
.arg2_opt("-r", fps)
.arg2("-i", reference)
Expand All @@ -37,7 +38,9 @@ pub fn run(

let cmd_str = cmd.to_cmd_str();
debug!("cmd `{cmd_str}`");
let mut vmaf = ProcessChunkStream::try_from(cmd).context("ffmpeg vmaf")?;
let mut vmaf = crate::process::child::AddOnDropChunkStream::from(
ProcessChunkStream::try_from(cmd).context("ffmpeg vmaf")?,
);

Ok(async_stream::stream! {
let mut chunks = Chunks::default();
Expand Down
7 changes: 5 additions & 2 deletions src/xpsnr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ pub fn run(
);

let mut cmd = Command::new("ffmpeg");
cmd.arg2_opt("-r", fps)
cmd.kill_on_drop(true)
.arg2_opt("-r", fps)
.arg2("-i", reference)
.arg2_opt("-r", fps)
.arg2("-i", distorted)
Expand All @@ -32,7 +33,9 @@ pub fn run(

let cmd_str = cmd.to_cmd_str();
debug!("cmd `{cmd_str}`");
let mut xpsnr = ProcessChunkStream::try_from(cmd).context("ffmpeg xpsnr")?;
let mut xpsnr = crate::process::child::AddOnDropChunkStream::from(
ProcessChunkStream::try_from(cmd).context("ffmpeg xpsnr")?,
);

Ok(async_stream::stream! {
let mut chunks = Chunks::default();
Expand Down

0 comments on commit 3659c0f

Please sign in to comment.