Skip to content

Commit

Permalink
fix: switch to --json-fd, cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
antheas committed Nov 26, 2024
1 parent 29a7959 commit c491b24
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 24 deletions.
32 changes: 25 additions & 7 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
use std::ffi::CString;
use std::ffi::OsString;
use std::io::Seek;
use std::os::unix::io::FromRawFd;
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand Down Expand Up @@ -53,9 +55,9 @@ pub(crate) struct UpgradeOpts {
#[clap(long, conflicts_with = "check")]
pub(crate) apply: bool,

/// Pipe download progress to stderr in a jsonl format.
/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json: bool,
pub(crate) json_fd: Option<i32>,
}

/// Perform an switch operation
Expand Down Expand Up @@ -106,9 +108,9 @@ pub(crate) struct SwitchOpts {
/// Target image to use for the next boot.
pub(crate) target: String,

/// Pipe download progress to stderr in a jsonl format.
/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json: bool,
pub(crate) json_fd: Option<i32>,
}

/// Options controlling rollback
Expand Down Expand Up @@ -622,6 +624,8 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let imgref = host.spec.image.as_ref();
let jsonw = unwrap_fd(opts.json_fd);

// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
if imgref.is_none() {
let booted_incompatible = host
Expand Down Expand Up @@ -678,7 +682,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?;
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, jsonw).await?;
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
let fetched_digest = &fetched.manifest_digest;
tracing::debug!("staged: {staged_digest:?}");
Expand Down Expand Up @@ -723,6 +727,19 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
Ok(())
}

#[allow(unsafe_code)]
fn unwrap_fd(fd: Option<i32>) -> Option<Arc<Mutex<dyn std::io::Write + Send>>> {
unsafe {
if !fd.is_none() {
return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd(
fd.unwrap(),
))));
} else {
return None;
};
}
}

/// Implementation of the `bootc switch` CLI command.
#[context("Switching")]
async fn switch(opts: SwitchOpts) -> Result<()> {
Expand All @@ -737,6 +754,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
);
let target = ostree_container::OstreeImageReference { sigverify, imgref };
let target = ImageReference::from(target);
let jsonw = unwrap_fd(opts.json_fd);

// If we're doing an in-place mutation, we shortcut most of the rest of the work here
if opts.mutate_in_place {
Expand Down Expand Up @@ -772,7 +790,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?;
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, jsonw).await?;

if !opts.retain {
// By default, we prune the previous ostree ref so it will go away after later upgrades
Expand Down Expand Up @@ -834,7 +852,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?;
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, None).await?;

// TODO gc old layers here

Expand Down
45 changes: 29 additions & 16 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashSet;
use std::io::{BufRead, Write};
use std::sync::{Arc, Mutex};

use anyhow::Ok;
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -46,7 +47,7 @@ pub(crate) struct ImageState {
}

/// Download information
#[derive(Debug,serde::Serialize)]
#[derive(Debug, serde::Serialize)]
pub struct JsonProgress {
pub done_bytes: u64,
pub download_bytes: u64,
Expand Down Expand Up @@ -174,11 +175,9 @@ async fn handle_layer_progress_print(
layers_bar.set_message("");
byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template(
" └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})",
)
.unwrap()
);
.template(" └ {bar:20} {msg} ({binary_bytes}/{binary_total_bytes})")
.unwrap(),
);
total_byte_bar.set_prefix("Total");
total_byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
Expand Down Expand Up @@ -250,6 +249,7 @@ async fn handle_layer_progress_print_jsonl(
n_layers_to_fetch: usize,
download_bytes: u64,
image_bytes: u64,
jsonw: Arc<Mutex<dyn std::io::Write + Send>>,
) {
let mut total_read = 0u64;
let mut layers_done: usize = 0;
Expand Down Expand Up @@ -291,8 +291,10 @@ async fn handle_layer_progress_print_jsonl(
n_layers_done: layers_done,
};
let json = serde_json::to_string(&json).unwrap();
// Write to stderr so that consumer can filter this
eprintln!("{}", json);
if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) {
eprintln!("Failed to write JSON progress: {}", e);
break;
}
last_json_written = curr;
}
}
Expand All @@ -301,15 +303,14 @@ async fn handle_layer_progress_print_jsonl(
}
}


/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
repo: &ostree::Repo,
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
json: bool,
jsonw: Option<Arc<Mutex<dyn std::io::Write + Send>>>,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
Expand All @@ -333,18 +334,30 @@ pub(crate) async fn pull(
let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum();

let printer = (!quiet || json).then(|| {
let printer = (!quiet || jsonw.is_some()).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
if json {
if jsonw.is_some() {
tokio::task::spawn(async move {
handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes)
.await
handle_layer_progress_print_jsonl(
layer_progress,
layer_byte_progress,
n_layers_to_fetch,
download_bytes,
image_bytes,
jsonw.unwrap(),
)
.await
})
} else {
tokio::task::spawn(async move {
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
.await
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
n_layers_to_fetch,
download_bytes,
)
.await
})
}
});
Expand Down
3 changes: 2 additions & 1 deletion lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ async fn install_container(
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?;
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, None)
.await?;
repo.set_disable_fsync(false);
r
};
Expand Down

0 comments on commit c491b24

Please sign in to comment.