diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 9f19a6a67..f14dbb26b 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -5,8 +5,10 @@ use std::ffi::CString; use std::ffi::OsString; use std::io::Seek; +use std::os::unix::io::FromRawFd; use std::os::unix::process::CommandExt; use std::process::Command; +use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use camino::Utf8PathBuf; @@ -52,6 +54,10 @@ pub(crate) struct UpgradeOpts { /// a userspace-only restart. #[clap(long, conflicts_with = "check")] pub(crate) apply: bool, + + /// Pipe download progress to this fd in a jsonl format. + #[clap(long)] + pub(crate) json_fd: Option, } /// Perform an switch operation @@ -101,6 +107,10 @@ pub(crate) struct SwitchOpts { /// Target image to use for the next boot. pub(crate) target: String, + + /// Pipe download progress to this fd in a jsonl format. + #[clap(long)] + pub(crate) json_fd: Option, } /// Options controlling rollback @@ -614,6 +624,8 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { let (booted_deployment, _deployments, host) = crate::status::get_status_require_booted(sysroot)?; let imgref = host.spec.image.as_ref(); + let jsonw = unwrap_fd(opts.json_fd); + // If there's no specified image, let's be nice and check if the booted system is using rpm-ostree if imgref.is_none() { let booted_incompatible = host @@ -670,7 +682,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { } } } else { - let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, jsonw).await?; let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); let fetched_digest = &fetched.manifest_digest; tracing::debug!("staged: {staged_digest:?}"); @@ -715,6 +727,19 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { Ok(()) } +#[allow(unsafe_code)] +fn unwrap_fd(fd: Option) -> Option>> { + unsafe { + if !fd.is_none() { + return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd( + fd.unwrap(), + )))); + } else { + return None; + }; + } +} + /// Implementation of the `bootc switch` CLI command. #[context("Switching")] async fn switch(opts: SwitchOpts) -> Result<()> { @@ -729,6 +754,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { ); let target = ostree_container::OstreeImageReference { sigverify, imgref }; let target = ImageReference::from(target); + let jsonw = unwrap_fd(opts.json_fd); // If we're doing an in-place mutation, we shortcut most of the rest of the work here if opts.mutate_in_place { @@ -764,7 +790,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, jsonw).await?; if !opts.retain { // By default, we prune the previous ostree ref so it will go away after later upgrades @@ -826,7 +852,7 @@ async fn edit(opts: EditOpts) -> Result<()> { return crate::deploy::rollback(sysroot).await; } - let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, None).await?; // TODO gc old layers here diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index 960c1abde..a6b892909 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; use std::io::{BufRead, Write}; +use std::sync::{Arc, Mutex}; use anyhow::Ok; use anyhow::{anyhow, Context, Result}; @@ -45,6 +46,17 @@ pub(crate) struct ImageState { pub(crate) ostree_commit: String, } +/// Download information +#[derive(Debug, serde::Serialize)] +pub struct JsonProgress { + pub stage: String, + pub done_bytes: u64, + pub download_bytes: u64, + pub image_bytes: u64, + pub n_layers: usize, + pub n_layers_done: usize, +} + impl<'a> RequiredHostSpec<'a> { /// Given a (borrowed) host specification, "unwrap" its internal /// options, giving a spec that is required to have a base container image. @@ -142,6 +154,7 @@ async fn handle_layer_progress_print( mut layers: tokio::sync::mpsc::Receiver, mut layer_bytes: tokio::sync::watch::Receiver>, n_layers_to_fetch: usize, + download_bytes: u64, ) { let start = std::time::Instant::now(); let mut total_read = 0u64; @@ -150,23 +163,28 @@ async fn handle_layer_progress_print( n_layers_to_fetch.try_into().unwrap(), )); let byte_bar = bar.add(indicatif::ProgressBar::new(0)); + let total_byte_bar = bar.add(indicatif::ProgressBar::new(download_bytes)); // let byte_bar = indicatif::ProgressBar::new(0); // byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden()); + println!(""); layers_bar.set_style( indicatif::ProgressStyle::default_bar() - .template("{prefix} {bar} {pos}/{len} {wide_msg}") + .template("{prefix} {pos}/{len} {bar:15}") .unwrap(), ); - layers_bar.set_prefix("Fetching layers"); + layers_bar.set_prefix("Fetched Layers"); layers_bar.set_message(""); - byte_bar.set_prefix("Fetching"); byte_bar.set_style( indicatif::ProgressStyle::default_bar() - .template( - " └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}", - ) - .unwrap() - ); + .template(" └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})") + .unwrap(), + ); + total_byte_bar.set_prefix("Total"); + total_byte_bar.set_style( + indicatif::ProgressStyle::default_bar() + .template("\n{prefix} {bar:30} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}, {elapsed}/{duration})") + .unwrap(), + ); loop { tokio::select! { // Always handle layer changes first. @@ -186,6 +204,7 @@ async fn handle_layer_progress_print( byte_bar.set_position(layer_size); layers_bar.inc(1); total_read = total_read.saturating_add(layer_size); + total_byte_bar.set_position(total_read); } } else { // If the receiver is disconnected, then we're done @@ -200,6 +219,7 @@ async fn handle_layer_progress_print( let bytes = layer_bytes.borrow(); if let Some(bytes) = &*bytes { byte_bar.set_position(bytes.fetched); + total_byte_bar.set_position(total_read + bytes.fetched); } } } @@ -223,6 +243,68 @@ async fn handle_layer_progress_print( } } +/// Write container fetch progress to standard output. +async fn handle_layer_progress_print_jsonl( + mut layers: tokio::sync::mpsc::Receiver, + mut layer_bytes: tokio::sync::watch::Receiver>, + n_layers_to_fetch: usize, + download_bytes: u64, + image_bytes: u64, + jsonw: Arc>, +) { + let mut total_read = 0u64; + let mut layers_done: usize = 0; + let mut last_json_written = std::time::Instant::now(); + loop { + tokio::select! { + // Always handle layer changes first. + biased; + layer = layers.recv() => { + if let Some(l) = layer { + if !l.is_starting() { + let layer = descriptor_of_progress(&l); + layers_done += 1; + total_read += total_read.saturating_add(layer.size()); + } + } else { + // If the receiver is disconnected, then we're done + break + }; + }, + r = layer_bytes.changed() => { + if r.is_err() { + // If the receiver is disconnected, then we're done + break + } + let bytes = layer_bytes.borrow(); + if let Some(bytes) = &*bytes { + let done_bytes = total_read + bytes.fetched; + + // Lets update the json output only on bytes fetched + // They are common enough, anyhow. Debounce on time. + let curr = std::time::Instant::now(); + if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { + let json = JsonProgress { + stage: "fetching".to_string(), + done_bytes, + download_bytes, + image_bytes, + n_layers: n_layers_to_fetch, + n_layers_done: layers_done, + }; + let json = serde_json::to_string(&json).unwrap(); + if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) { + eprintln!("Failed to write JSON progress: {}", e); + break; + } + last_json_written = curr; + } + } + } + } + } +} + /// Wrapper for pulling a container image, wiring up status output. #[context("Pulling")] pub(crate) async fn pull( @@ -230,6 +312,7 @@ pub(crate) async fn pull( imgref: &ImageReference, target_imgref: Option<&OstreeImageReference>, quiet: bool, + jsonw: Option>>, ) -> Result> { let ostree_imgref = &OstreeImageReference::from(imgref.clone()); let mut imp = new_importer(repo, ostree_imgref).await?; @@ -250,13 +333,35 @@ pub(crate) async fn pull( ostree_ext::cli::print_layer_status(&prep); let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); - let printer = (!quiet).then(|| { + let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); + let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); + + let printer = (!quiet || jsonw.is_some()).then(|| { let layer_progress = imp.request_progress(); let layer_byte_progress = imp.request_layer_progress(); - tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch) + if jsonw.is_some() { + tokio::task::spawn(async move { + handle_layer_progress_print_jsonl( + layer_progress, + layer_byte_progress, + n_layers_to_fetch, + download_bytes, + image_bytes, + jsonw.unwrap(), + ) .await - }) + }) + } else { + tokio::task::spawn(async move { + handle_layer_progress_print( + layer_progress, + layer_byte_progress, + n_layers_to_fetch, + download_bytes, + ) + .await + }) + } }); let import = imp.import(prep).await; if let Some(printer) = printer { diff --git a/lib/src/install.rs b/lib/src/install.rs index 845379a63..ce07e3f45 100644 --- a/lib/src/install.rs +++ b/lib/src/install.rs @@ -744,7 +744,8 @@ async fn install_container( let spec_imgref = ImageReference::from(src_imageref.clone()); let repo = &sysroot.repo(); repo.set_disable_fsync(true); - let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; + let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, None) + .await?; repo.set_disable_fsync(false); r }; diff --git a/ostree-ext/src/container/store.rs b/ostree-ext/src/container/store.rs index d4e4a4fec..efba13e5e 100644 --- a/ostree-ext/src/container/store.rs +++ b/ostree-ext/src/container/store.rs @@ -193,7 +193,7 @@ pub enum PrepareResult { #[derive(Debug)] pub struct ManifestLayerState { /// The underlying layer descriptor. - pub(crate) layer: oci_image::Descriptor, + pub layer: oci_image::Descriptor, // TODO semver: Make this readonly via an accessor /// The ostree ref name for this layer. pub ostree_ref: String,