From f3cc793e1be16ca38fb48c1d869d6a84ffa0d219 Mon Sep 17 00:00:00 2001 From: amunra Date: Tue, 5 Dec 2023 19:25:18 +0000 Subject: [PATCH] s3s-fs: fix incomplete uploads by writing via a temp file --- crates/s3s-fs/src/fs.rs | 82 ++++++++++++++++++++++++++++++++++++++++- crates/s3s-fs/src/s3.rs | 20 ++++------ 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/crates/s3s-fs/src/fs.rs b/crates/s3s-fs/src/fs.rs index e95d3ce4..84e07f47 100644 --- a/crates/s3s-fs/src/fs.rs +++ b/crates/s3s-fs/src/fs.rs @@ -7,10 +7,11 @@ use s3s::dto; use std::env; use std::ops::Not; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; use tokio::fs; use tokio::fs::File; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, BufWriter}; use md5::{Digest, Md5}; use path_absolutize::Absolutize; @@ -19,14 +20,35 @@ use uuid::Uuid; #[derive(Debug)] pub struct FileSystem { pub(crate) root: PathBuf, + tmp_file_counter: AtomicU64, } pub(crate) type InternalInfo = serde_json::Map; +fn clean_old_tmp_files(root: &Path) -> std::io::Result<()> { + let entries = match std::fs::read_dir(root) { + Ok(entries) => Ok(entries), + Err(ref io_err) if io_err.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(io_err) => Err(io_err), + }?; + for entry in entries { + let entry = entry?; + let file_name = entry.file_name(); + let Some(file_name) = file_name.to_str() else { continue }; + // See `FileSystem::write_file` + if file_name.starts_with(".tmp.") && file_name.ends_with(".internal.part") { + std::fs::remove_file(entry.path())?; + } + } + Ok(()) +} + impl FileSystem { pub fn new(root: impl AsRef) -> Result { let root = env::current_dir()?.join(root).canonicalize()?; - Ok(Self { root }) + clean_old_tmp_files(&root)?; + let tmp_file_counter = AtomicU64::new(0); + Ok(Self { root, tmp_file_counter }) } pub(crate) fn resolve_abs_path(&self, path: impl AsRef) -> Result { @@ -146,4 +168,60 @@ impl FileSystem { } Ok(()) } + + /// Write to the filesystem atomically. + /// This is done by first writing to a temporary location and then moving the file. + /// Returns the final disk location and size. + pub(crate) async fn prepare_file_write(&self, bucket: &str, key: &str) -> Result { + let final_path = Some(self.get_object_path(bucket, key)?); + let tmp_name = format!(".tmp.{}.internal.part", self.tmp_file_counter.fetch_add(1, Ordering::SeqCst)); + let tmp_path = self.resolve_abs_path(tmp_name)?; + let file = File::create(&tmp_path).await?; + let writer = BufWriter::new(file); + Ok(FileWriter { + tmp_path, + final_path, + writer, + clean_tmp: true, + }) + } +} + +pub(crate) struct FileWriter { + tmp_path: PathBuf, + final_path: Option, + writer: BufWriter, + clean_tmp: bool, +} + +impl FileWriter { + pub(crate) fn tmp_path(&self) -> &Path { + &self.tmp_path + } + + pub(crate) fn final_path(&self) -> &Path { + self.final_path.as_ref().unwrap() + } + + pub(crate) fn writer(&mut self) -> &mut BufWriter { + &mut self.writer + } + + pub(crate) async fn done(mut self) -> Result { + if let Some(final_dir_path) = self.final_path().parent() { + fs::create_dir_all(&final_dir_path).await?; + } + + fs::rename(&self.tmp_path, &self.final_path()).await?; + self.clean_tmp = false; + Ok(self.final_path.take().unwrap()) + } +} + +impl Drop for FileWriter { + fn drop(&mut self) { + if self.clean_tmp { + let _ = std::fs::remove_file(&self.tmp_path); + } + } } diff --git a/crates/s3s-fs/src/s3.rs b/crates/s3s-fs/src/s3.rs index 0b9165e1..49ca2d06 100644 --- a/crates/s3s-fs/src/s3.rs +++ b/crates/s3s-fs/src/s3.rs @@ -466,10 +466,7 @@ impl S3 for FileSystem { return Ok(S3Response::new(output)); } - let object_path = self.get_object_path(&bucket, &key)?; - if let Some(dir_path) = object_path.parent() { - try_!(fs::create_dir_all(&dir_path).await); - } + let mut file_writer = self.prepare_file_write(&bucket, &key).await?; let mut md5_hash = Md5::new(); let stream = body.inspect_ok(|bytes| { @@ -477,10 +474,9 @@ impl S3 for FileSystem { checksum.update(bytes.as_ref()); }); - let file = try_!(fs::File::create(&object_path).await); - let mut writer = BufWriter::new(file); + let size = copy_bytes(stream, file_writer.writer()).await?; + let object_path = file_writer.done().await?; - let size = copy_bytes(stream, &mut writer).await?; let md5_sum = hex(md5_hash.finalize()); let checksum = checksum.finalize(); @@ -711,9 +707,7 @@ impl S3 for FileSystem { self.delete_upload_id(&upload_id).await?; - let object_path = self.get_object_path(&bucket, &key)?; - let file = try_!(fs::File::create(&object_path).await); - let mut writer = BufWriter::new(file); + let mut file_writer = self.prepare_file_write(&bucket, &key).await?; let mut cnt: i32 = 0; for part in multipart_upload.parts.into_iter().flatten() { @@ -726,12 +720,12 @@ impl S3 for FileSystem { let part_path = self.resolve_abs_path(format!(".upload_id-{upload_id}.part-{part_number}"))?; let mut reader = try_!(fs::File::open(&part_path).await); - let size = try_!(tokio::io::copy(&mut reader, &mut writer).await); + let size = try_!(tokio::io::copy(&mut reader, &mut file_writer.writer()).await); - debug!(from = %part_path.display(), to = %object_path.display(), ?size, "write file"); + debug!(from = %part_path.display(), tmp = %file_writer.tmp_path().display(), to = %file_writer.final_path().display(), ?size, "write file"); try_!(fs::remove_file(&part_path).await); } - drop(writer); + let object_path = file_writer.done().await?; let file_size = try_!(fs::metadata(&object_path).await).len(); let md5_sum = self.get_md5_sum(&bucket, &key).await?;