Skip to content

Commit

Permalink
feat: add total progress bar
Browse files Browse the repository at this point in the history
Signed-off-by: Colin Walters <[email protected]>
  • Loading branch information
antheas authored and cgwalters committed Nov 26, 2024
1 parent d42710e commit 8d2a95e
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 17 deletions.
32 changes: 29 additions & 3 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
use std::ffi::CString;
use std::ffi::OsString;
use std::io::Seek;
use std::os::unix::io::FromRawFd;
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -52,6 +54,10 @@ pub(crate) struct UpgradeOpts {
/// a userspace-only restart.
#[clap(long, conflicts_with = "check")]
pub(crate) apply: bool,

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json_fd: Option<i32>,
}

/// Perform an switch operation
Expand Down Expand Up @@ -101,6 +107,10 @@ pub(crate) struct SwitchOpts {

/// Target image to use for the next boot.
pub(crate) target: String,

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json_fd: Option<i32>,
}

/// Options controlling rollback
Expand Down Expand Up @@ -614,6 +624,8 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let imgref = host.spec.image.as_ref();
let jsonw = unwrap_fd(opts.json_fd);

// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
if imgref.is_none() {
let booted_incompatible = host
Expand Down Expand Up @@ -670,7 +682,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, jsonw).await?;
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
let fetched_digest = &fetched.manifest_digest;
tracing::debug!("staged: {staged_digest:?}");
Expand Down Expand Up @@ -715,6 +727,19 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
Ok(())
}

#[allow(unsafe_code)]
fn unwrap_fd(fd: Option<i32>) -> Option<Arc<Mutex<dyn std::io::Write + Send>>> {
unsafe {
if !fd.is_none() {
return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd(
fd.unwrap(),
))));
} else {
return None;
};
}
}

/// Implementation of the `bootc switch` CLI command.
#[context("Switching")]
async fn switch(opts: SwitchOpts) -> Result<()> {
Expand All @@ -729,6 +754,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
);
let target = ostree_container::OstreeImageReference { sigverify, imgref };
let target = ImageReference::from(target);
let jsonw = unwrap_fd(opts.json_fd);

// If we're doing an in-place mutation, we shortcut most of the rest of the work here
if opts.mutate_in_place {
Expand Down Expand Up @@ -764,7 +790,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, jsonw).await?;

if !opts.retain {
// By default, we prune the previous ostree ref so it will go away after later upgrades
Expand Down Expand Up @@ -826,7 +852,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, None).await?;

// TODO gc old layers here

Expand Down
129 changes: 117 additions & 12 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashSet;
use std::io::{BufRead, Write};
use std::sync::{Arc, Mutex};

use anyhow::Ok;
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -45,6 +46,17 @@ pub(crate) struct ImageState {
pub(crate) ostree_commit: String,
}

/// Download information
#[derive(Debug, serde::Serialize)]
pub struct JsonProgress {
pub stage: String,
pub done_bytes: u64,
pub download_bytes: u64,
pub image_bytes: u64,
pub n_layers: usize,
pub n_layers_done: usize,
}

impl<'a> RequiredHostSpec<'a> {
/// Given a (borrowed) host specification, "unwrap" its internal
/// options, giving a spec that is required to have a base container image.
Expand Down Expand Up @@ -142,6 +154,7 @@ async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
n_layers_to_fetch: usize,
download_bytes: u64,
) {
let start = std::time::Instant::now();
let mut total_read = 0u64;
Expand All @@ -150,23 +163,28 @@ async fn handle_layer_progress_print(
n_layers_to_fetch.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
let total_byte_bar = bar.add(indicatif::ProgressBar::new(download_bytes));
// let byte_bar = indicatif::ProgressBar::new(0);
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
println!("");
layers_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.template("{prefix} {pos}/{len} {bar:15}")
.unwrap(),
);
layers_bar.set_prefix("Fetching layers");
layers_bar.set_prefix("Fetched Layers");
layers_bar.set_message("");
byte_bar.set_prefix("Fetching");
byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template(
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
)
.unwrap()
);
.template(" └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})")
.unwrap(),
);
total_byte_bar.set_prefix("Total");
total_byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("\n{prefix} {bar:30} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}, {elapsed}/{duration})")
.unwrap(),
);
loop {
tokio::select! {
// Always handle layer changes first.
Expand All @@ -186,6 +204,7 @@ async fn handle_layer_progress_print(
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
total_byte_bar.set_position(total_read);
}
} else {
// If the receiver is disconnected, then we're done
Expand All @@ -200,6 +219,7 @@ async fn handle_layer_progress_print(
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
byte_bar.set_position(bytes.fetched);
total_byte_bar.set_position(total_read + bytes.fetched);
}
}
}
Expand All @@ -223,13 +243,76 @@ async fn handle_layer_progress_print(
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print_jsonl(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
n_layers_to_fetch: usize,
download_bytes: u64,
image_bytes: u64,
jsonw: Arc<Mutex<dyn std::io::Write + Send>>,
) {
let mut total_read = 0u64;
let mut layers_done: usize = 0;
let mut last_json_written = std::time::Instant::now();
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
if !l.is_starting() {
let layer = descriptor_of_progress(&l);
layers_done += 1;
total_read += total_read.saturating_add(layer.size());
}
} else {
// If the receiver is disconnected, then we're done
break
};
},
r = layer_bytes.changed() => {
if r.is_err() {
// If the receiver is disconnected, then we're done
break
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
let done_bytes = total_read + bytes.fetched;

// Lets update the json output only on bytes fetched
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
let json = JsonProgress {
stage: "fetching".to_string(),
done_bytes,
download_bytes,
image_bytes,
n_layers: n_layers_to_fetch,
n_layers_done: layers_done,
};
let json = serde_json::to_string(&json).unwrap();
if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) {
eprintln!("Failed to write JSON progress: {}", e);
break;
}
last_json_written = curr;
}
}
}
}
}
}

/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
repo: &ostree::Repo,
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
jsonw: Option<Arc<Mutex<dyn std::io::Write + Send>>>,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
Expand All @@ -250,13 +333,35 @@ pub(crate) async fn pull(
ostree_ext::cli::print_layer_status(&prep);
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
let printer = (!quiet).then(|| {
let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum();

let printer = (!quiet || jsonw.is_some()).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
tokio::task::spawn(async move {
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch)
if jsonw.is_some() {
tokio::task::spawn(async move {
handle_layer_progress_print_jsonl(
layer_progress,
layer_byte_progress,
n_layers_to_fetch,
download_bytes,
image_bytes,
jsonw.unwrap(),
)
.await
})
})
} else {
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
n_layers_to_fetch,
download_bytes,
)
.await
})
}
});
let import = imp.import(prep).await;
if let Some(printer) = printer {
Expand Down
3 changes: 2 additions & 1 deletion lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ async fn install_container(
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?;
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, None)
.await?;
repo.set_disable_fsync(false);
r
};
Expand Down
2 changes: 1 addition & 1 deletion ostree-ext/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub enum PrepareResult {
#[derive(Debug)]
pub struct ManifestLayerState {
/// The underlying layer descriptor.
pub(crate) layer: oci_image::Descriptor,
pub layer: oci_image::Descriptor,
// TODO semver: Make this readonly via an accessor
/// The ostree ref name for this layer.
pub ostree_ref: String,
Expand Down

0 comments on commit 8d2a95e

Please sign in to comment.