Skip to content

Commit

Permalink
feat: add json output to stderr of upgrade and switch
Browse files Browse the repository at this point in the history
  • Loading branch information
antheas committed Nov 25, 2024
1 parent 201d358 commit 9b0d697
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 9 deletions.
14 changes: 11 additions & 3 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts {
/// a userspace-only restart.
#[clap(long, conflicts_with = "check")]
pub(crate) apply: bool,

/// Pipe download progress to stderr in a jsonl format.
#[clap(long)]
pub(crate) json: bool,
}

/// Perform an switch operation
Expand Down Expand Up @@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts {

/// Target image to use for the next boot.
pub(crate) target: String,

/// Pipe download progress to stderr in a jsonl format.
#[clap(long)]
pub(crate) json: bool,
}

/// Options controlling rollback
Expand Down Expand Up @@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?;
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
let fetched_digest = &fetched.manifest_digest;
tracing::debug!("staged: {staged_digest:?}");
Expand Down Expand Up @@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?;

if !opts.retain {
// By default, we prune the previous ostree ref so it will go away after later upgrades
Expand Down Expand Up @@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?;

// TODO gc old layers here

Expand Down
88 changes: 83 additions & 5 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ pub(crate) struct ImageState {
pub(crate) ostree_commit: String,
}

/// Download information
#[derive(Debug,serde::Serialize)]
pub struct JsonProgress {
pub done_bytes: u64,
pub download_bytes: u64,
pub image_bytes: u64,
pub n_layers: usize,
pub n_layers_done: usize,
}

impl<'a> RequiredHostSpec<'a> {
/// Given a (borrowed) host specification, "unwrap" its internal
/// options, giving a spec that is required to have a base container image.
Expand Down Expand Up @@ -234,13 +244,73 @@ async fn handle_layer_progress_print(
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print_jsonl(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
n_layers_to_fetch: usize,
download_bytes: u64,
image_bytes: u64,
) {
let mut total_read = 0u64;
let mut layers_done: usize = 0;
let mut last_json_written = std::time::Instant::now();
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
if !l.is_starting() {
let layer = descriptor_of_progress(&l);
layers_done += 1;
total_read += total_read.saturating_add(layer.size());
}
} else {
// If the receiver is disconnected, then we're done
break
};
},
r = layer_bytes.changed() => {
if r.is_err() {
// If the receiver is disconnected, then we're done
break
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
let done_bytes = total_read + bytes.fetched;

// Lets update the json output only on bytes fetched
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
let json = JsonProgress {
done_bytes,
download_bytes,
image_bytes,
n_layers: n_layers_to_fetch,
n_layers_done: layers_done,
};
let json = serde_json::to_string(&json).unwrap();
// Write to stderr so that consumer can filter this
eprintln!("{}", json);
last_json_written = curr;
}
}
}
}
}
}


/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
repo: &ostree::Repo,
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
json: bool,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
Expand All @@ -262,14 +332,22 @@ pub(crate) async fn pull(
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum();

let printer = (!quiet).then(|| {
let printer = (!quiet || json).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
tokio::task::spawn(async move {
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
.await
})
if json {
tokio::task::spawn(async move {
handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes)
.await
})
} else {
tokio::task::spawn(async move {
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
.await
})
}
});
let import = imp.import(prep).await;
if let Some(printer) = printer {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ async fn install_container(
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?;
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?;
repo.set_disable_fsync(false);
r
};
Expand Down

0 comments on commit 9b0d697

Please sign in to comment.