diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 9f19a6a6..3e6395d9 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts { /// a userspace-only restart. #[clap(long, conflicts_with = "check")] pub(crate) apply: bool, + + /// Pipe download progress to stderr in a jsonl format. + #[clap(long)] + pub(crate) json: bool, } /// Perform an switch operation @@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts { /// Target image to use for the next boot. pub(crate) target: String, + + /// Pipe download progress to stderr in a jsonl format. + #[clap(long)] + pub(crate) json: bool, } /// Options controlling rollback @@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { } } } else { - let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?; let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); let fetched_digest = &fetched.manifest_digest; tracing::debug!("staged: {staged_digest:?}"); @@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?; if !opts.retain { // By default, we prune the previous ostree ref so it will go away after later upgrades @@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> { return crate::deploy::rollback(sysroot).await; } - let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?; + let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?; // TODO gc old layers here diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index a6e4d499..6f214ed1 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -45,6 +45,16 @@ pub(crate) struct ImageState { pub(crate) ostree_commit: String, } +/// Download information +#[derive(Debug,serde::Serialize)] +pub struct JsonProgress { + pub done_bytes: u64, + pub download_bytes: u64, + pub image_bytes: u64, + pub n_layers: usize, + pub n_layers_done: usize, +} + impl<'a> RequiredHostSpec<'a> { /// Given a (borrowed) host specification, "unwrap" its internal /// options, giving a spec that is required to have a base container image. @@ -234,6 +244,65 @@ async fn handle_layer_progress_print( } } +/// Write container fetch progress to standard output. +async fn handle_layer_progress_print_jsonl( + mut layers: tokio::sync::mpsc::Receiver, + mut layer_bytes: tokio::sync::watch::Receiver>, + n_layers_to_fetch: usize, + download_bytes: u64, + image_bytes: u64, +) { + let mut total_read = 0u64; + let mut layers_done: usize = 0; + let mut last_json_written = std::time::Instant::now(); + loop { + tokio::select! { + // Always handle layer changes first. + biased; + layer = layers.recv() => { + if let Some(l) = layer { + if !l.is_starting() { + let layer = descriptor_of_progress(&l); + layers_done += 1; + total_read += total_read.saturating_add(layer.size()); + } + } else { + // If the receiver is disconnected, then we're done + break + }; + }, + r = layer_bytes.changed() => { + if r.is_err() { + // If the receiver is disconnected, then we're done + break + } + let bytes = layer_bytes.borrow(); + if let Some(bytes) = &*bytes { + let done_bytes = total_read + bytes.fetched; + + // Lets update the json output only on bytes fetched + // They are common enough, anyhow. Debounce on time. + let curr = std::time::Instant::now(); + if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { + let json = JsonProgress { + done_bytes, + download_bytes, + image_bytes, + n_layers: n_layers_to_fetch, + n_layers_done: layers_done, + }; + let json = serde_json::to_string(&json).unwrap(); + // Write to stderr so that consumer can filter this + eprintln!("{}", json); + last_json_written = curr; + } + } + } + } + } +} + + /// Wrapper for pulling a container image, wiring up status output. #[context("Pulling")] pub(crate) async fn pull( @@ -241,6 +310,7 @@ pub(crate) async fn pull( imgref: &ImageReference, target_imgref: Option<&OstreeImageReference>, quiet: bool, + json: bool, ) -> Result> { let ostree_imgref = &OstreeImageReference::from(imgref.clone()); let mut imp = new_importer(repo, ostree_imgref).await?; @@ -262,14 +332,22 @@ pub(crate) async fn pull( let layers_to_fetch = prep.layers_to_fetch().collect::>>()?; let n_layers_to_fetch = layers_to_fetch.len(); let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum(); + let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum(); - let printer = (!quiet).then(|| { + let printer = (!quiet || json).then(|| { let layer_progress = imp.request_progress(); let layer_byte_progress = imp.request_layer_progress(); - tokio::task::spawn(async move { - handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) - .await - }) + if json { + tokio::task::spawn(async move { + handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes) + .await + }) + } else { + tokio::task::spawn(async move { + handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes) + .await + }) + } }); let import = imp.import(prep).await; if let Some(printer) = printer { diff --git a/lib/src/install.rs b/lib/src/install.rs index 98857edf..69b5fab5 100644 --- a/lib/src/install.rs +++ b/lib/src/install.rs @@ -744,7 +744,7 @@ async fn install_container( let spec_imgref = ImageReference::from(src_imageref.clone()); let repo = &sysroot.repo(); repo.set_disable_fsync(true); - let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?; + let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?; repo.set_disable_fsync(false); r };