Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
Merge pull request #527 from cgwalters/debug-join-fetch-more
Browse files Browse the repository at this point in the history
tar: Hold open input stream as long as possible
  • Loading branch information
jmarrero authored Aug 31, 2023
2 parents a45512a + 13455cc commit 5d16ced
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub(crate) async fn join_fetch<T: std::fmt::Debug>(
(Err(worker), Err(driver)) => {
let text = driver.root_cause().to_string();
if text.ends_with("broken pipe") {
tracing::trace!("Ignoring broken pipe failure from driver");
Err(worker)
} else {
Err(worker.context(format!("proxy failure: {} and client error", text)))
Expand Down
20 changes: 16 additions & 4 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,28 @@ async fn filter_tar_async(
mut dest: impl AsyncWrite + Send + Unpin,
) -> Result<BTreeMap<String, u32>> {
let (tx_buf, mut rx_buf) = tokio::io::duplex(8192);
// The source must be moved to the heap so we know it is stable for passing to the worker thread
let src = Box::pin(src);
let tar_transformer = tokio::task::spawn_blocking(move || -> Result<_> {
let src = tokio_util::io::SyncIoBridge::new(src);
let tar_transformer = tokio::task::spawn_blocking(move || {
let mut src = tokio_util::io::SyncIoBridge::new(src);
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);
filter_tar(src, dest)
let r = filter_tar(&mut src, dest);
// Pass ownership of the input stream back to the caller - see below.
(r, src)
});
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
let (r, v) = tokio::join!(tar_transformer, copier);
let _v: u64 = v?;
r?
let (r, src) = r?;
// Note that the worker thread took temporary ownership of the input stream; we only close
// it at this point, after we're sure we've done all processing of the input. The reason
// for this is that both the skopeo process *or* us could encounter an error (see join_fetch).
// By ensuring we hold the stream open as long as possible, it ensures that we're going to
// see a remote error first, instead of the remote skopeo process seeing us close the pipe
// because we found an error.
drop(src);
// And pass back the result
r
}

/// Write the contents of a tarball as an ostree commit.
Expand Down

0 comments on commit 5d16ced

Please sign in to comment.