From 7c870fab91e633b44175fa9806d46b87868125eb Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 4 Dec 2023 23:27:03 +0800 Subject: [PATCH 01/10] feat: implement WAL file pipeline Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/storage/wal/mod.rs | 3 + .../curp/src/server/storage/wal/pipeline.rs | 169 ++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 crates/curp/src/server/storage/wal/pipeline.rs diff --git a/crates/curp/src/server/storage/wal/mod.rs b/crates/curp/src/server/storage/wal/mod.rs index 38897fb62..edd644e8b 100644 --- a/crates/curp/src/server/storage/wal/mod.rs +++ b/crates/curp/src/server/storage/wal/mod.rs @@ -6,5 +6,8 @@ mod codec; /// WAL errors mod error; +/// File pipeline +mod pipeline; + /// File utils mod util; diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs new file mode 100644 index 000000000..2eb0ab1f4 --- /dev/null +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -0,0 +1,169 @@ +use std::{ + io, + path::{Path, PathBuf}, + task::Poll, +}; + +use clippy_utilities::OverflowArithmetic; +use event_listener::Event; +use flume::r#async::RecvStream; +use futures::{FutureExt, StreamExt}; +use thiserror::Error; +use tokio::task::JoinHandle; +use tokio_stream::Stream; +use tracing::error; + +use super::util::LockedFile; + +/// The temp file extension +const TEMP_FILE_EXT: &str = ".tmp"; + +/// The file pipeline, used for pipelining the creation of temp file +pub(super) struct FilePipeline { + /// The directory where the temp files are created + dir: PathBuf, + /// The size of the temp file + file_size: u64, + /// The file receive stream + file_stream: RecvStream<'static, LockedFile>, + /// The stop event listener + stop_event: Event, + /// The handle of the background file creation task + handle: JoinHandle>, +} + +impl FilePipeline { + /// Creates a new `FilePipeline` + #[allow(clippy::integer_arithmetic)] // Introduced by tokio::select! macro + pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { + Self::clean_up(&dir)?; + + let (file_tx, file_rx) = flume::bounded(1); + let stop_event = Event::new(); + let mut stop_listener = stop_event.listen(); + let dir_c = dir.clone(); + + let handle = tokio::spawn(async move { + let mut file_count = 0; + loop { + let file = Self::alloc(&dir_c, file_size, &mut file_count)?; + tokio::select! { + _ = &mut stop_listener => { + break; + } + result = file_tx.send_async(file) => { + // The receiver is already dropped, stop this task + if let Err(e) = result { + break; + } + } + } + } + Self::clean_up(&dir_c)?; + Ok(()) + }); + + Ok(Self { + dir, + file_size, + file_stream: file_rx.into_stream(), + stop_event, + handle, + }) + } + + /// Stops the pipeline + pub(super) fn stop(&self) { + self.stop_event.notify(1); + } + + /// Allocates a a new tempfile + fn alloc( + dir: impl AsRef, + file_size: u64, + file_count: &mut usize, + ) -> io::Result { + let fpath = PathBuf::from(dir.as_ref()).join(format!("{}{TEMP_FILE_EXT}", *file_count)); + let mut locked_file = LockedFile::open_rw(fpath)?; + locked_file.preallocate(file_size)?; + *file_count = file_count.overflow_add(1); + Ok(locked_file) + } + + /// Cleans up all unused tempfiles + fn clean_up(dir: &PathBuf) -> io::Result<()> { + for result in std::fs::read_dir(dir)? { + let file = result?; + if let Some(filename) = file.file_name().to_str() { + if filename.ends_with(TEMP_FILE_EXT) { + std::fs::remove_file(file.path())?; + } + } + } + Ok(()) + } +} + +impl Stream for FilePipeline { + type Item = io::Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.handle.is_finished() { + if let Poll::Ready(result) = self.handle.poll_unpin(cx) { + match result { + Ok(Err(e)) => { + return Poll::Ready(Some(Err(e))); + } + Err(e) => { + return Poll::Ready(Some(Err(e.into()))); + } + Ok(Ok(_)) => return Poll::Ready(None), + } + } + return Poll::Ready(None); + } + + self.file_stream.poll_next_unpin(cx).map(|opt| opt.map(Ok)) + } +} + +impl std::fmt::Debug for FilePipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FilePipeline") + .field("dir", &self.dir) + .field("file_size", &self.file_size) + .finish() + } +} + +#[cfg(test)] +mod tests { + use crate::server::storage::wal::util::get_file_paths_with_ext; + + use super::*; + + #[tokio::test] + async fn file_pipeline_is_ok() { + let file_size = 1024; + let dir = tempfile::tempdir().unwrap(); + let mut pipeline = FilePipeline::new(dir.as_ref().into(), file_size).unwrap(); + + let check_size = |mut file: LockedFile| { + let file = file.into_std(); + assert_eq!(file.metadata().unwrap().len(), file_size,); + }; + let file0 = pipeline.next().await.unwrap().unwrap(); + check_size(file0); + let file1 = pipeline.next().await.unwrap().unwrap(); + check_size(file1); + let paths = get_file_paths_with_ext(&dir, TEMP_FILE_EXT).unwrap(); + assert_eq!(paths.len(), 2); + pipeline.stop(); + assert!(pipeline.next().await.is_none()); + let paths_cleaned = get_file_paths_with_ext(dir, TEMP_FILE_EXT).unwrap(); + assert_eq!(paths_cleaned.len(), 0); + } +} From 3fc542c9222ffd9ac3bbf176da41ace7cade0746 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 21 Dec 2023 12:04:11 +0800 Subject: [PATCH 02/10] chore: move file count Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/storage/wal/pipeline.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 2eb0ab1f4..a4f2fc142 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -46,7 +46,8 @@ impl FilePipeline { let handle = tokio::spawn(async move { let mut file_count = 0; loop { - let file = Self::alloc(&dir_c, file_size, &mut file_count)?; + let file = Self::alloc(&dir_c, file_size, file_count)?; + file_count += 1; tokio::select! { _ = &mut stop_listener => { break; @@ -78,15 +79,10 @@ impl FilePipeline { } /// Allocates a a new tempfile - fn alloc( - dir: impl AsRef, - file_size: u64, - file_count: &mut usize, - ) -> io::Result { - let fpath = PathBuf::from(dir.as_ref()).join(format!("{}{TEMP_FILE_EXT}", *file_count)); + fn alloc(dir: impl AsRef, file_size: u64, file_count: usize) -> io::Result { + let fpath = PathBuf::from(dir.as_ref()).join(format!("{file_count}{TEMP_FILE_EXT}")); let mut locked_file = LockedFile::open_rw(fpath)?; locked_file.preallocate(file_size)?; - *file_count = file_count.overflow_add(1); Ok(locked_file) } From 498690c4792a8666099a71753a58eb5eb1283b64 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 21 Dec 2023 12:07:04 +0800 Subject: [PATCH 03/10] chore: remove stop listener Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index a4f2fc142..87d47acfc 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -26,8 +26,6 @@ pub(super) struct FilePipeline { file_size: u64, /// The file receive stream file_stream: RecvStream<'static, LockedFile>, - /// The stop event listener - stop_event: Event, /// The handle of the background file creation task handle: JoinHandle>, } @@ -40,7 +38,6 @@ impl FilePipeline { let (file_tx, file_rx) = flume::bounded(1); let stop_event = Event::new(); - let mut stop_listener = stop_event.listen(); let dir_c = dir.clone(); let handle = tokio::spawn(async move { @@ -48,16 +45,9 @@ impl FilePipeline { loop { let file = Self::alloc(&dir_c, file_size, file_count)?; file_count += 1; - tokio::select! { - _ = &mut stop_listener => { - break; - } - result = file_tx.send_async(file) => { - // The receiver is already dropped, stop this task - if let Err(e) = result { - break; - } - } + if let Err(e) = file_tx.send_async(file).await { + // The receiver is already dropped, stop this task + break; } } Self::clean_up(&dir_c)?; @@ -68,14 +58,13 @@ impl FilePipeline { dir, file_size, file_stream: file_rx.into_stream(), - stop_event, handle, }) } /// Stops the pipeline pub(super) fn stop(&self) { - self.stop_event.notify(1); + self.handle.abort(); } /// Allocates a a new tempfile From 0cae4eccb6f965888d6125db18f32f4826c233bb Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 2 Jan 2024 15:10:47 +0800 Subject: [PATCH 04/10] refactor: check stopped flag in wal file pipeline Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 87d47acfc..0592e1380 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -28,6 +28,8 @@ pub(super) struct FilePipeline { file_stream: RecvStream<'static, LockedFile>, /// The handle of the background file creation task handle: JoinHandle>, + /// Stopped flag + stopped: bool, } impl FilePipeline { @@ -50,7 +52,6 @@ impl FilePipeline { break; } } - Self::clean_up(&dir_c)?; Ok(()) }); @@ -59,12 +60,17 @@ impl FilePipeline { file_size, file_stream: file_rx.into_stream(), handle, + stopped: false, }) } /// Stops the pipeline - pub(super) fn stop(&self) { + pub(super) fn stop(&mut self) { self.handle.abort(); + self.stopped = true; + if let Err(e) = Self::clean_up(&self.dir) { + error!("failed to clean up pipeline files: {e}"); + } } /// Allocates a a new tempfile @@ -89,6 +95,12 @@ impl FilePipeline { } } +impl Drop for FilePipeline { + fn drop(&mut self) { + self.stop(); + } +} + impl Stream for FilePipeline { type Item = io::Result; @@ -111,6 +123,10 @@ impl Stream for FilePipeline { return Poll::Ready(None); } + if self.stopped { + return Poll::Ready(None); + } + self.file_stream.poll_next_unpin(cx).map(|opt| opt.map(Ok)) } } From 6fd37ecafaf27ac2b110ccd6a7d4d8455b91b0b3 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Wed, 3 Jan 2024 14:55:30 +0800 Subject: [PATCH 05/10] refactor: remove join handle of file allocation task in pipeline Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 0592e1380..968b5ad73 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -1,6 +1,10 @@ use std::{ io, path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::Poll, }; @@ -26,10 +30,8 @@ pub(super) struct FilePipeline { file_size: u64, /// The file receive stream file_stream: RecvStream<'static, LockedFile>, - /// The handle of the background file creation task - handle: JoinHandle>, /// Stopped flag - stopped: bool, + stopped: Arc, } impl FilePipeline { @@ -41,36 +43,46 @@ impl FilePipeline { let (file_tx, file_rx) = flume::bounded(1); let stop_event = Event::new(); let dir_c = dir.clone(); + let stopped = Arc::new(AtomicBool::new(false)); + let stopped_c = Arc::clone(&stopped); - let handle = tokio::spawn(async move { + let _ignore = tokio::spawn(async move { let mut file_count = 0; loop { - let file = Self::alloc(&dir_c, file_size, file_count)?; + let file = match Self::alloc(&dir_c, file_size, file_count) { + Ok(f) => f, + Err(e) => { + error!("failed to allocate new file: {e}"); + break; + } + }; file_count += 1; + + if stopped_c.load(Ordering::SeqCst) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline files: {e}"); + } + break; + } + if let Err(e) = file_tx.send_async(file).await { // The receiver is already dropped, stop this task break; } } - Ok(()) }); Ok(Self { dir, file_size, file_stream: file_rx.into_stream(), - handle, - stopped: false, + stopped, }) } /// Stops the pipeline pub(super) fn stop(&mut self) { - self.handle.abort(); - self.stopped = true; - if let Err(e) = Self::clean_up(&self.dir) { - error!("failed to clean up pipeline files: {e}"); - } + self.stopped.store(true, Ordering::SeqCst); } /// Allocates a a new tempfile @@ -108,22 +120,7 @@ impl Stream for FilePipeline { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - if self.handle.is_finished() { - if let Poll::Ready(result) = self.handle.poll_unpin(cx) { - match result { - Ok(Err(e)) => { - return Poll::Ready(Some(Err(e))); - } - Err(e) => { - return Poll::Ready(Some(Err(e.into()))); - } - Ok(Ok(_)) => return Poll::Ready(None), - } - } - return Poll::Ready(None); - } - - if self.stopped { + if self.stopped.load(Ordering::SeqCst) { return Poll::Ready(None); } From fced04c56a3309b08766ac8304706884666c0247 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Thu, 4 Jan 2024 17:45:37 +0800 Subject: [PATCH 06/10] chore: use relaxed ordering for stop flag Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/storage/wal/pipeline.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 968b5ad73..d9cdb37e2 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -58,7 +58,7 @@ impl FilePipeline { }; file_count += 1; - if stopped_c.load(Ordering::SeqCst) { + if stopped_c.load(Ordering::Relaxed) { if let Err(e) = Self::clean_up(&dir_c) { error!("failed to clean up pipeline files: {e}"); } @@ -82,7 +82,7 @@ impl FilePipeline { /// Stops the pipeline pub(super) fn stop(&mut self) { - self.stopped.store(true, Ordering::SeqCst); + self.stopped.store(true, Ordering::Relaxed); } /// Allocates a a new tempfile @@ -120,7 +120,7 @@ impl Stream for FilePipeline { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - if self.stopped.load(Ordering::SeqCst) { + if self.stopped.load(Ordering::Relaxed) { return Poll::Ready(None); } From b7705cb8d8b9db9cff71513036c7c06372d5fea2 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:24:58 +0800 Subject: [PATCH 07/10] chore: fix clippy Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- crates/curp/src/server/storage/wal/pipeline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index d9cdb37e2..79009f898 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -36,7 +36,7 @@ pub(super) struct FilePipeline { impl FilePipeline { /// Creates a new `FilePipeline` - #[allow(clippy::integer_arithmetic)] // Introduced by tokio::select! macro + #[allow(clippy::arithmetic_side_effects)] // Introduced by tokio::select! macro pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { Self::clean_up(&dir)?; From 19fc16ea764eca25d511ef958d5dc047b06b620b Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 26 Feb 2024 15:31:54 +0800 Subject: [PATCH 08/10] refactor: move file allocation task to a thread as it may block Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 79009f898..924be0fe0 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -36,7 +36,6 @@ pub(super) struct FilePipeline { impl FilePipeline { /// Creates a new `FilePipeline` - #[allow(clippy::arithmetic_side_effects)] // Introduced by tokio::select! macro pub(super) fn new(dir: PathBuf, file_size: u64) -> io::Result { Self::clean_up(&dir)?; @@ -46,28 +45,40 @@ impl FilePipeline { let stopped = Arc::new(AtomicBool::new(false)); let stopped_c = Arc::clone(&stopped); - let _ignore = tokio::spawn(async move { + #[cfg(not(madsim))] + let _ignore = std::thread::spawn(move || { let mut file_count = 0; loop { - let file = match Self::alloc(&dir_c, file_size, file_count) { - Ok(f) => f, + match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + Ok(file) => { + if file_tx.send(file).is_err() { + // The receiver is already dropped, stop this task + break; + } + } Err(e) => { - error!("failed to allocate new file: {e}"); + error!("failed to allocate file: {e}"); break; } - }; - file_count += 1; - - if stopped_c.load(Ordering::Relaxed) { - if let Err(e) = Self::clean_up(&dir_c) { - error!("failed to clean up pipeline files: {e}"); - } - break; } + } + }); - if let Err(e) = file_tx.send_async(file).await { - // The receiver is already dropped, stop this task - break; + #[cfg(madsim)] + let _ignore = tokio::spawn(async move { + let mut file_count = 0; + loop { + match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + Ok(file) => { + if file_tx.send_async(file).await.is_err() { + // The receiver is already dropped, stop this task + break; + } + } + Err(e) => { + error!("failed to allocate file: {e}"); + break; + } } } }); @@ -86,11 +97,20 @@ impl FilePipeline { } /// Allocates a a new tempfile - fn alloc(dir: impl AsRef, file_size: u64, file_count: usize) -> io::Result { - let fpath = PathBuf::from(dir.as_ref()).join(format!("{file_count}{TEMP_FILE_EXT}")); - let mut locked_file = LockedFile::open_rw(fpath)?; - locked_file.preallocate(file_size)?; - Ok(locked_file) + fn alloc( + dir: &PathBuf, + file_size: u64, + file_count: &mut usize, + stopped: &AtomicBool, + ) -> io::Result { + let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}")); + let mut file = LockedFile::open_rw(fpath)?; + file.preallocate(file_size)?; + *file_count = file_count.wrapping_add(1); + if stopped.load(Ordering::Relaxed) { + Self::clean_up(dir)?; + } + Ok(file) } /// Cleans up all unused tempfiles @@ -157,11 +177,7 @@ mod tests { check_size(file0); let file1 = pipeline.next().await.unwrap().unwrap(); check_size(file1); - let paths = get_file_paths_with_ext(&dir, TEMP_FILE_EXT).unwrap(); - assert_eq!(paths.len(), 2); pipeline.stop(); assert!(pipeline.next().await.is_none()); - let paths_cleaned = get_file_paths_with_ext(dir, TEMP_FILE_EXT).unwrap(); - assert_eq!(paths_cleaned.len(), 0); } } From 941449e4b32f3d4bc56cd842cd1d9ec32c50bf08 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Mon, 4 Mar 2024 10:50:11 +0800 Subject: [PATCH 09/10] refactor: exit allocation task on stop Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- .../curp/src/server/storage/wal/pipeline.rs | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/crates/curp/src/server/storage/wal/pipeline.rs b/crates/curp/src/server/storage/wal/pipeline.rs index 924be0fe0..534cf0daa 100644 --- a/crates/curp/src/server/storage/wal/pipeline.rs +++ b/crates/curp/src/server/storage/wal/pipeline.rs @@ -40,7 +40,6 @@ impl FilePipeline { Self::clean_up(&dir)?; let (file_tx, file_rx) = flume::bounded(1); - let stop_event = Event::new(); let dir_c = dir.clone(); let stopped = Arc::new(AtomicBool::new(false)); let stopped_c = Arc::clone(&stopped); @@ -49,12 +48,18 @@ impl FilePipeline { let _ignore = std::thread::spawn(move || { let mut file_count = 0; loop { - match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + match Self::alloc(&dir_c, file_size, &mut file_count) { Ok(file) => { if file_tx.send(file).is_err() { // The receiver is already dropped, stop this task break; } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; + } } Err(e) => { error!("failed to allocate file: {e}"); @@ -68,12 +73,18 @@ impl FilePipeline { let _ignore = tokio::spawn(async move { let mut file_count = 0; loop { - match Self::alloc(&dir_c, file_size, &mut file_count, &stopped_c) { + match Self::alloc(&dir_c, file_size, &mut file_count) { Ok(file) => { if file_tx.send_async(file).await.is_err() { // The receiver is already dropped, stop this task break; } + if stopped_c.load(Ordering::Relaxed) { + if let Err(e) = Self::clean_up(&dir_c) { + error!("failed to clean up pipeline temp files: {e}"); + } + break; + } } Err(e) => { error!("failed to allocate file: {e}"); @@ -97,19 +108,11 @@ impl FilePipeline { } /// Allocates a a new tempfile - fn alloc( - dir: &PathBuf, - file_size: u64, - file_count: &mut usize, - stopped: &AtomicBool, - ) -> io::Result { + fn alloc(dir: &PathBuf, file_size: u64, file_count: &mut usize) -> io::Result { let fpath = PathBuf::from(dir).join(format!("{file_count}{TEMP_FILE_EXT}")); let mut file = LockedFile::open_rw(fpath)?; file.preallocate(file_size)?; *file_count = file_count.wrapping_add(1); - if stopped.load(Ordering::Relaxed) { - Self::clean_up(dir)?; - } Ok(file) } From 296105fe01aafbd473d47fde088f32094892c5f3 Mon Sep 17 00:00:00 2001 From: bsbds <69835502+bsbds@users.noreply.github.com> Date: Tue, 5 Mar 2024 12:05:20 +0800 Subject: [PATCH 10/10] chore: bump mio version to 0.8.11 fix: https://rustsec.org/advisories/RUSTSEC-2024-0019 Signed-off-by: bsbds <69835502+bsbds@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 75b829687..9aecd41b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1615,9 +1615,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "wasi",