Skip to content

Commit

Permalink
make progress writer multi-thread safe, add deploy stages
Browse files Browse the repository at this point in the history
  • Loading branch information
antheas committed Nov 27, 2024
1 parent 9615af7 commit 9b2b401
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 32 deletions.
21 changes: 8 additions & 13 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog).await?;
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?;
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
let fetched_digest = &fetched.manifest_digest;
tracing::debug!("staged: {staged_digest:?}");
Expand All @@ -709,7 +709,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
println!("No update available.")
} else {
let osname = booted_deployment.osname();
crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?;
crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?;
changed = true;
if let Some(prev) = booted_image.as_ref() {
if let Some(fetched_manifest) = fetched.get_manifest(repo)? {
Expand Down Expand Up @@ -784,7 +784,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog).await?;
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?;

if !opts.retain {
// By default, we prune the previous ostree ref so it will go away after later upgrades
Expand All @@ -798,7 +798,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}

let stateroot = booted_deployment.osname();
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?;

if opts.apply {
crate::reboot::reboot()?;
Expand Down Expand Up @@ -840,25 +840,20 @@ async fn edit(opts: EditOpts) -> Result<()> {
host.spec.verify_transition(&new_host.spec)?;
let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?;

let prog = ProgressWriter::from_empty();

// We only support two state transitions right now; switching the image,
// or flipping the bootloader ordering.
if host.spec.boot_order != new_host.spec.boot_order {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(
repo,
new_spec.image,
None,
opts.quiet,
ProgressWriter::from_empty(),
)
.await?;
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?;

// TODO gc old layers here

let stateroot = booted_deployment.osname();
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?;
crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?;

Ok(())
}
Expand Down
30 changes: 23 additions & 7 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async fn handle_layer_progress_print(
layers_total: usize,
bytes_download: u64,
bytes_total: u64,
mut prog: ProgressWriter,
prog: ProgressWriter,
) {
let start = std::time::Instant::now();
let mut total_read = 0u64;
Expand Down Expand Up @@ -219,7 +219,7 @@ async fn handle_layer_progress_print(
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
prog.send(ProgressStage::Fetching {
prog.send(ProgressStage::Fetch {
bytes_done,
bytes_download,
bytes_total,
Expand All @@ -243,14 +243,12 @@ async fn handle_layer_progress_print(
let elapsed = end.duration_since(start);
let persec = total_read as f64 / elapsed.as_secs_f64();
let persec = indicatif::HumanBytes(persec as u64);
if let Err(e) = bar.println(&format!(
println!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
)) {
tracing::warn!("writing to stdout: {e}");
}
);
}

/// Wrapper for pulling a container image, wiring up status output.
Expand Down Expand Up @@ -296,7 +294,7 @@ pub(crate) async fn pull(
layers_total,
bytes_download,
bytes_total,
prog,
prog.clone(),
)
.await
})
Expand Down Expand Up @@ -493,7 +491,15 @@ pub(crate) async fn stage(
stateroot: &str,
image: &ImageState,
spec: &RequiredHostSpec<'_>,
prog: ProgressWriter,
) -> Result<()> {
let n_steps = 3;

prog.send(ProgressStage::Deploy {
n_steps,
step: 0,
name: "deploying".to_string(),
});
let merge_deployment = sysroot.merge_deployment(Some(stateroot));
let origin = origin_from_imageref(spec.image)?;
let deployment = crate::deploy::deploy(
Expand All @@ -505,8 +511,18 @@ pub(crate) async fn stage(
)
.await?;

prog.send(ProgressStage::Deploy {
n_steps,
step: 1,
name: "pulling_bound_images".to_string(),
});
crate::boundimage::pull_bound_images(sysroot, &deployment).await?;

prog.send(ProgressStage::Deploy {
n_steps,
step: 2,
name: "cleaning_up".to_string(),
});
crate::deploy::cleanup(sysroot).await?;
println!("Queued for next boot: {:#}", spec.image);
if let Some(version) = image.version.as_deref() {
Expand Down
45 changes: 33 additions & 12 deletions lib/src/progress_jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use std::fs;
use std::io::{BufWriter, Write};
use std::mem;
use std::os::fd::{FromRawFd, RawFd};
use std::sync::{Arc, Mutex};

use anyhow::Result;
use serde::Serialize;
Expand All @@ -20,8 +22,8 @@ pub struct LayerState {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[serde(tag = "stage")]
pub enum ProgressStage {
#[serde(rename = "fetching")]
Fetching {
#[serde(rename = "fetch")]
Fetch {
bytes_done: u64,
bytes_download: u64,
bytes_total: u64,
Expand All @@ -30,16 +32,23 @@ pub enum ProgressStage {
layers_total: usize,
layers: Option<Vec<LayerState>>,
},
#[serde(rename = "deploy")]
Deploy {
n_steps: usize,
step: usize,
name: String,
},
}

#[derive(Clone)]
pub(crate) struct ProgressWriter {
fd: Option<BufWriter<fs::File>>,
fd: Arc<Mutex<Option<BufWriter<fs::File>>>>,
}

impl From<fs::File> for ProgressWriter {
fn from(value: fs::File) -> Self {
Self {
fd: Some(BufWriter::new(value)),
fd: Arc::new(Mutex::new(Some(BufWriter::new(value)))),
}
}
}
Expand All @@ -52,15 +61,21 @@ impl ProgressWriter {
}

pub(crate) fn from_empty() -> Self {
Self { fd: None }
Self {
fd: Arc::new(Mutex::new(None)),
}
}

/// Serialize the target object to JSON as a single line
pub(crate) fn send_unchecked<T: Serialize>(&mut self, v: T) -> Result<()> {
if self.fd.is_none() {
pub(crate) fn send_unchecked<T: Serialize>(&self, v: T) -> Result<()> {
let arc = self.fd.clone();
let mut mutex = arc.lock().expect("Could not lock mutex");
let fd_opt = mutex.as_mut();

if fd_opt.is_none() {
return Ok(());
}
let mut fd = self.fd.as_mut().unwrap();
let mut fd = fd_opt.unwrap();

// serde is guaranteed not to output newlines here
serde_json::to_writer(&mut fd, &v)?;
Expand All @@ -71,18 +86,24 @@ impl ProgressWriter {
Ok(())
}

pub(crate) fn send<T: Serialize>(&mut self, v: T) {
pub(crate) fn send<T: Serialize>(&self, v: T) {
if let Err(e) = self.send_unchecked(v) {
eprintln!("Failed to write to jsonl: {}", e);
// Stop writing to fd but let process continue
self.fd = None;
let arc = self.fd.clone();
let mut mutex = arc.lock().expect("Could not lock mutex");
*mutex = None.into();
}
}

/// Flush remaining data and return the underlying file.
#[allow(dead_code)]
pub(crate) fn into_inner(self) -> Result<fs::File> {
if let Some(fd) = self.fd {
let arc = self.fd.clone();
let mut mutex = arc.lock().expect("Could not lock mutex");
let fd_opt = mem::replace(&mut *mutex, None);

if let Some(fd) = fd_opt {
return fd.into_inner().map_err(Into::into);
} else {
return Err(anyhow::anyhow!("File descriptor closed/never existed."));
Expand All @@ -107,7 +128,7 @@ mod test {
#[test]
fn test_jsonl() -> Result<()> {
let tf = tempfile::tempfile()?;
let mut w = ProgressWriter::from(tf);
let w = ProgressWriter::from(tf);
let testvalues = [
S {
s: "foo".into(),
Expand Down

0 comments on commit 9b2b401

Please sign in to comment.