From 3242366038939a22bd3966f9c22285b9164a14ef Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Tue, 26 Nov 2024 17:07:15 -0500 Subject: [PATCH] Add a little progress_jsonl module - Use `if let Some()` to deconstruct vs `is_some()` and `unwrap()` - Use RawFd over i32 for clarity - We don't need the Arc> - Use a `BufWriter` (forgetting this is a big performance footgun in Rust) - Add a unit test of the basic code Signed-off-by: Colin Walters --- lib/src/cli.rs | 25 +++-------- lib/src/deploy.rs | 19 ++++----- lib/src/lib.rs | 1 + lib/src/progress_jsonl.rs | 90 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 106 insertions(+), 29 deletions(-) create mode 100644 lib/src/progress_jsonl.rs diff --git a/lib/src/cli.rs b/lib/src/cli.rs index f14dbb26b..80d223d74 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -5,10 +5,9 @@ use std::ffi::CString; use std::ffi::OsString; use std::io::Seek; -use std::os::unix::io::FromRawFd; +use std::os::fd::RawFd; use std::os::unix::process::CommandExt; use std::process::Command; -use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; use camino::Utf8PathBuf; @@ -27,6 +26,7 @@ use serde::{Deserialize, Serialize}; use crate::deploy::RequiredHostSpec; use crate::lints; +use crate::progress_jsonl; use crate::spec::Host; use crate::spec::ImageReference; use crate::utils::sigpolicy_from_opts; @@ -57,7 +57,7 @@ pub(crate) struct UpgradeOpts { /// Pipe download progress to this fd in a jsonl format. #[clap(long)] - pub(crate) json_fd: Option, + pub(crate) json_fd: Option, } /// Perform an switch operation @@ -110,7 +110,7 @@ pub(crate) struct SwitchOpts { /// Pipe download progress to this fd in a jsonl format. #[clap(long)] - pub(crate) json_fd: Option, + pub(crate) json_fd: Option, } /// Options controlling rollback @@ -624,7 +624,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { let (booted_deployment, _deployments, host) = crate::status::get_status_require_booted(sysroot)?; let imgref = host.spec.image.as_ref(); - let jsonw = unwrap_fd(opts.json_fd); + let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd); // If there's no specified image, let's be nice and check if the booted system is using rpm-ostree if imgref.is_none() { @@ -727,19 +727,6 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { Ok(()) } -#[allow(unsafe_code)] -fn unwrap_fd(fd: Option) -> Option>> { - unsafe { - if !fd.is_none() { - return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd( - fd.unwrap(), - )))); - } else { - return None; - }; - } -} - /// Implementation of the `bootc switch` CLI command. #[context("Switching")] async fn switch(opts: SwitchOpts) -> Result<()> { @@ -754,7 +741,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { ); let target = ostree_container::OstreeImageReference { sigverify, imgref }; let target = ImageReference::from(target); - let jsonw = unwrap_fd(opts.json_fd); + let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd); // If we're doing an in-place mutation, we shortcut most of the rest of the work here if opts.mutate_in_place { diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index a6b892909..673f65fc5 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -4,7 +4,6 @@ use std::collections::HashSet; use std::io::{BufRead, Write}; -use std::sync::{Arc, Mutex}; use anyhow::Ok; use anyhow::{anyhow, Context, Result}; @@ -22,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot}; use ostree_ext::sysroot::SysrootLock; use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten; +use crate::progress_jsonl::JsonlWriter; use crate::spec::ImageReference; use crate::spec::{BootOrder, HostSpec}; use crate::status::labels_of_config; @@ -250,7 +250,7 @@ async fn handle_layer_progress_print_jsonl( n_layers_to_fetch: usize, download_bytes: u64, image_bytes: u64, - jsonw: Arc>, + mut jsonw: JsonlWriter, ) { let mut total_read = 0u64; let mut layers_done: usize = 0; @@ -284,7 +284,7 @@ async fn handle_layer_progress_print_jsonl( // They are common enough, anyhow. Debounce on time. let curr = std::time::Instant::now(); if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { - let json = JsonProgress { + let progress = JsonProgress { stage: "fetching".to_string(), done_bytes, download_bytes, @@ -292,10 +292,9 @@ async fn handle_layer_progress_print_jsonl( n_layers: n_layers_to_fetch, n_layers_done: layers_done, }; - let json = serde_json::to_string(&json).unwrap(); - if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) { - eprintln!("Failed to write JSON progress: {}", e); - break; + if let Err(e) = jsonw.send(&progress) { + tracing::debug!("failed to send progress: {e}"); + break } last_json_written = curr; } @@ -312,7 +311,7 @@ pub(crate) async fn pull( imgref: &ImageReference, target_imgref: Option<&OstreeImageReference>, quiet: bool, - jsonw: Option>>, + jsonw: Option, ) -> Result> { let ostree_imgref = &OstreeImageReference::from(imgref.clone()); let mut imp = new_importer(repo, ostree_imgref).await?; @@ -339,7 +338,7 @@ pub(crate) async fn pull( let printer = (!quiet || jsonw.is_some()).then(|| { let layer_progress = imp.request_progress(); let layer_byte_progress = imp.request_layer_progress(); - if jsonw.is_some() { + if let Some(jsonw) = jsonw { tokio::task::spawn(async move { handle_layer_progress_print_jsonl( layer_progress, @@ -347,7 +346,7 @@ pub(crate) async fn pull( n_layers_to_fetch, download_bytes, image_bytes, - jsonw.unwrap(), + jsonw, ) .await }) diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 1f0c263b5..f90ef3665 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -41,3 +41,4 @@ pub mod spec; mod docgen; mod glyph; mod imgstorage; +mod progress_jsonl; diff --git a/lib/src/progress_jsonl.rs b/lib/src/progress_jsonl.rs new file mode 100644 index 000000000..0eb11b5dd --- /dev/null +++ b/lib/src/progress_jsonl.rs @@ -0,0 +1,90 @@ +//! Output progress data using the json-lines format. For more information +//! see . + +use std::fs; +use std::io::{BufWriter, Write}; +use std::os::fd::{FromRawFd, RawFd}; + +use anyhow::Result; +use serde::Serialize; + +pub(crate) struct JsonlWriter { + fd: BufWriter, +} + +impl From for JsonlWriter { + fn from(value: fs::File) -> Self { + Self { + fd: BufWriter::new(value), + } + } +} + +impl JsonlWriter { + /// Given a raw file descriptor, create an instance of a json-lines writer. + #[allow(unsafe_code)] + pub(crate) fn from_raw_fd(fd: RawFd) -> Self { + unsafe { fs::File::from_raw_fd(fd) }.into() + } + + /// Serialize the target object to JSON as a single line + pub(crate) fn send(&mut self, v: T) -> Result<()> { + // serde is guaranteed not to output newlines here + serde_json::to_writer(&mut self.fd, &v)?; + // We always end in a newline + self.fd.write_all(b"\n")?; + // And flush to ensure the remote side sees updates immediately + self.fd.flush()?; + Ok(()) + } + + /// Flush remaining data and return the underlying file. + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> Result { + self.fd.into_inner().map_err(Into::into) + } +} + +#[cfg(test)] +mod test { + use std::io::{BufRead, BufReader, Seek}; + + use serde::Deserialize; + + use super::*; + + #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)] + struct S { + s: String, + v: u32, + } + + #[test] + fn test_jsonl() -> Result<()> { + let tf = tempfile::tempfile()?; + let mut w = JsonlWriter::from(tf); + let testvalues = [ + S { + s: "foo".into(), + v: 42, + }, + S { + // Test with an embedded newline to sanity check that serde doesn't write it literally + s: "foo\nbar".into(), + v: 0, + }, + ]; + for value in &testvalues { + w.send(value).unwrap(); + } + let mut tf = w.into_inner().unwrap(); + tf.seek(std::io::SeekFrom::Start(0))?; + let tf = BufReader::new(tf); + for (line, expected) in tf.lines().zip(testvalues.iter()) { + let line = line?; + let found: S = serde_json::from_str(&line)?; + assert_eq!(&found, expected); + } + Ok(()) + } +}