Skip to content

Commit

Permalink
refactor: wal pipeline will exit task on drop
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Apr 19, 2024
1 parent 1c442b6 commit 710ffb1
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use std::{
Arc,
},
task::Poll,
thread::JoinHandle,
};

use clippy_utilities::OverflowArithmetic;
use event_listener::Event;
use flume::r#async::RecvStream;
use futures::{FutureExt, StreamExt};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio_stream::Stream;
use tracing::error;

Expand All @@ -28,10 +28,12 @@ pub(super) struct FilePipeline {
dir: PathBuf,
/// The size of the temp file
file_size: u64,
/// The file receive stream
file_stream: flume::IntoIter<LockedFile>,
/// The file receive iterator
file_iter: Option<flume::IntoIter<LockedFile>>,
/// Stopped flag
stopped: Arc<AtomicBool>,
/// Join handle of the allocation task
file_alloc_task_handle: Option<JoinHandle<()>>,
}

impl FilePipeline {
Expand All @@ -47,7 +49,7 @@ impl FilePipeline {
let stopped_c = Arc::clone(&stopped);

#[cfg(not(madsim))]
let _ignore = std::thread::spawn(move || {
let file_alloc_task_handle = std::thread::spawn(move || {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Expand Down Expand Up @@ -99,8 +101,9 @@ impl FilePipeline {
Ok(Self {
dir,
file_size,
file_stream: file_rx.into_iter(),
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: Some(file_alloc_task_handle),
})
}

Expand Down Expand Up @@ -132,20 +135,27 @@ impl FilePipeline {
}
}

#[allow(clippy::unwrap_used)] // Option is always `Some`
impl Drop for FilePipeline {
fn drop(&mut self) {
self.stop();
// Drops the file rx so that the allocation task could exit
drop(self.file_iter.take());
if let Err(e) = self.file_alloc_task_handle.take().unwrap().join() {
error!("failed to join file allocation task: {e:?}");
}
}
}

#[allow(clippy::unwrap_used)] // Option is always `Some`
impl Iterator for FilePipeline {
type Item = io::Result<LockedFile>;

fn next(&mut self) -> Option<Self::Item> {
if self.stopped.load(Ordering::Relaxed) {
return None;
}
self.file_stream.next().map(Ok)
self.file_iter.as_mut().unwrap().next().map(Ok)
}
}

Expand Down

0 comments on commit 710ffb1

Please sign in to comment.