Skip to content

Commit

Permalink
Add a little progress_jsonl module
Browse files Browse the repository at this point in the history
- Use `if let Some()` to deconstruct vs `is_some()` and `unwrap()`
- Use RawFd over i32 for clarity
- We don't need the Arc<Mutex<>>
- Use a `BufWriter` (forgetting this is a big performance footgun in Rust)
- Add a unit test of the basic code

Signed-off-by: Colin Walters <[email protected]>
  • Loading branch information
cgwalters committed Nov 26, 2024
1 parent 8d2a95e commit 3242366
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 29 deletions.
25 changes: 6 additions & 19 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
use std::ffi::CString;
use std::ffi::OsString;
use std::io::Seek;
use std::os::unix::io::FromRawFd;
use std::os::fd::RawFd;
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand All @@ -27,6 +26,7 @@ use serde::{Deserialize, Serialize};

use crate::deploy::RequiredHostSpec;
use crate::lints;
use crate::progress_jsonl;
use crate::spec::Host;
use crate::spec::ImageReference;
use crate::utils::sigpolicy_from_opts;
Expand Down Expand Up @@ -57,7 +57,7 @@ pub(crate) struct UpgradeOpts {

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json_fd: Option<i32>,
pub(crate) json_fd: Option<RawFd>,
}

/// Perform an switch operation
Expand Down Expand Up @@ -110,7 +110,7 @@ pub(crate) struct SwitchOpts {

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
pub(crate) json_fd: Option<i32>,
pub(crate) json_fd: Option<RawFd>,
}

/// Options controlling rollback
Expand Down Expand Up @@ -624,7 +624,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let imgref = host.spec.image.as_ref();
let jsonw = unwrap_fd(opts.json_fd);
let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd);

// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
if imgref.is_none() {
Expand Down Expand Up @@ -727,19 +727,6 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
Ok(())
}

#[allow(unsafe_code)]
fn unwrap_fd(fd: Option<i32>) -> Option<Arc<Mutex<dyn std::io::Write + Send>>> {
unsafe {
if !fd.is_none() {
return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd(
fd.unwrap(),
))));
} else {
return None;
};
}
}

/// Implementation of the `bootc switch` CLI command.
#[context("Switching")]
async fn switch(opts: SwitchOpts) -> Result<()> {
Expand All @@ -754,7 +741,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
);
let target = ostree_container::OstreeImageReference { sigverify, imgref };
let target = ImageReference::from(target);
let jsonw = unwrap_fd(opts.json_fd);
let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd);

// If we're doing an in-place mutation, we shortcut most of the rest of the work here
if opts.mutate_in_place {
Expand Down
19 changes: 9 additions & 10 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::collections::HashSet;
use std::io::{BufRead, Write};
use std::sync::{Arc, Mutex};

use anyhow::Ok;
use anyhow::{anyhow, Context, Result};
Expand All @@ -22,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot};
use ostree_ext::sysroot::SysrootLock;
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;

use crate::progress_jsonl::JsonlWriter;
use crate::spec::ImageReference;
use crate::spec::{BootOrder, HostSpec};
use crate::status::labels_of_config;
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn handle_layer_progress_print_jsonl(
n_layers_to_fetch: usize,
download_bytes: u64,
image_bytes: u64,
jsonw: Arc<Mutex<dyn std::io::Write + Send>>,
mut jsonw: JsonlWriter,
) {
let mut total_read = 0u64;
let mut layers_done: usize = 0;
Expand Down Expand Up @@ -284,18 +284,17 @@ async fn handle_layer_progress_print_jsonl(
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
let json = JsonProgress {
let progress = JsonProgress {
stage: "fetching".to_string(),
done_bytes,
download_bytes,
image_bytes,
n_layers: n_layers_to_fetch,
n_layers_done: layers_done,
};
let json = serde_json::to_string(&json).unwrap();
if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) {
eprintln!("Failed to write JSON progress: {}", e);
break;
if let Err(e) = jsonw.send(&progress) {
tracing::debug!("failed to send progress: {e}");
break
}
last_json_written = curr;
}
Expand All @@ -312,7 +311,7 @@ pub(crate) async fn pull(
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
jsonw: Option<Arc<Mutex<dyn std::io::Write + Send>>>,
jsonw: Option<crate::progress_jsonl::JsonlWriter>,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
Expand All @@ -339,15 +338,15 @@ pub(crate) async fn pull(
let printer = (!quiet || jsonw.is_some()).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
if jsonw.is_some() {
if let Some(jsonw) = jsonw {
tokio::task::spawn(async move {
handle_layer_progress_print_jsonl(
layer_progress,
layer_byte_progress,
n_layers_to_fetch,
download_bytes,
image_bytes,
jsonw.unwrap(),
jsonw,
)
.await
})
Expand Down
1 change: 1 addition & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ pub mod spec;
mod docgen;
mod glyph;
mod imgstorage;
mod progress_jsonl;
90 changes: 90 additions & 0 deletions lib/src/progress_jsonl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Output progress data using the json-lines format. For more information
//! see <https://jsonlines.org/>.
use std::fs;
use std::io::{BufWriter, Write};
use std::os::fd::{FromRawFd, RawFd};

use anyhow::Result;
use serde::Serialize;

pub(crate) struct JsonlWriter {
fd: BufWriter<fs::File>,
}

impl From<fs::File> for JsonlWriter {
fn from(value: fs::File) -> Self {
Self {
fd: BufWriter::new(value),
}
}
}

impl JsonlWriter {
/// Given a raw file descriptor, create an instance of a json-lines writer.
#[allow(unsafe_code)]
pub(crate) fn from_raw_fd(fd: RawFd) -> Self {
unsafe { fs::File::from_raw_fd(fd) }.into()
}

/// Serialize the target object to JSON as a single line
pub(crate) fn send<T: Serialize>(&mut self, v: T) -> Result<()> {
// serde is guaranteed not to output newlines here
serde_json::to_writer(&mut self.fd, &v)?;
// We always end in a newline
self.fd.write_all(b"\n")?;
// And flush to ensure the remote side sees updates immediately
self.fd.flush()?;
Ok(())
}

/// Flush remaining data and return the underlying file.
#[allow(dead_code)]
pub(crate) fn into_inner(self) -> Result<fs::File> {
self.fd.into_inner().map_err(Into::into)
}
}

#[cfg(test)]
mod test {
use std::io::{BufRead, BufReader, Seek};

use serde::Deserialize;

use super::*;

#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
struct S {
s: String,
v: u32,
}

#[test]
fn test_jsonl() -> Result<()> {
let tf = tempfile::tempfile()?;
let mut w = JsonlWriter::from(tf);
let testvalues = [
S {
s: "foo".into(),
v: 42,
},
S {
// Test with an embedded newline to sanity check that serde doesn't write it literally
s: "foo\nbar".into(),
v: 0,
},
];
for value in &testvalues {
w.send(value).unwrap();
}
let mut tf = w.into_inner().unwrap();
tf.seek(std::io::SeekFrom::Start(0))?;
let tf = BufReader::new(tf);
for (line, expected) in tf.lines().zip(testvalues.iter()) {
let line = line?;
let found: S = serde_json::from_str(&line)?;
assert_eq!(&found, expected);
}
Ok(())
}
}

0 comments on commit 3242366

Please sign in to comment.