Skip to content

Commit

Permalink
fix(wal): disable pipeline background tasks for madsim
Browse files Browse the repository at this point in the history
The background task will block as madsim is single threaded

Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Jun 17, 2024
1 parent b65e429 commit eacbb23
Showing 1 changed file with 16 additions and 26 deletions.
42 changes: 16 additions & 26 deletions crates/curp/src/server/storage/wal/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ pub(super) struct FilePipeline {
stopped: Arc<AtomicBool>,
/// Join handle of the allocation task
file_alloc_task_handle: Option<JoinHandle<()>>,
#[cfg_attr(not(madsim), allow(unused))]
/// File count used in madsim tests
file_count: usize,
}

impl FilePipeline {
Expand All @@ -43,13 +46,13 @@ impl FilePipeline {
error!("Failed to clean up tmp files: {e}");
}

let (file_tx, file_rx) = flume::bounded(1);
let dir_c = dir.clone();
let stopped = Arc::new(AtomicBool::new(false));
let stopped_c = Arc::clone(&stopped);

#[cfg(not(madsim))]
{
let (file_tx, file_rx) = flume::bounded(1);
let file_alloc_task_handle = std::thread::spawn(move || {
let mut file_count = 0;
loop {
Expand Down Expand Up @@ -80,41 +83,19 @@ impl FilePipeline {
file_iter: Some(file_rx.into_iter()),
stopped,
file_alloc_task_handle: Some(file_alloc_task_handle),
file_count: 0,
}
}

#[cfg(madsim)]
{
let _ignore = tokio::spawn(async move {
let mut file_count = 0;
loop {
match Self::alloc(&dir_c, file_size, &mut file_count) {
Ok(file) => {
if file_tx.send_async(file).await.is_err() {
// The receiver is already dropped, stop this task
break;
}
if stopped_c.load(Ordering::Relaxed) {
if let Err(e) = Self::clean_up(&dir_c) {
error!("failed to clean up pipeline temp files: {e}");
}
break;
}
}
Err(e) => {
error!("failed to allocate file: {e}");
break;
}
}
}
});

Self {
dir,
file_size,
file_iter: Some(file_rx.into_iter()),
file_iter: None,
stopped,
file_alloc_task_handle: None,
file_count: 0,
}
}
}
Expand Down Expand Up @@ -161,6 +142,7 @@ impl Drop for FilePipeline {
impl Iterator for FilePipeline {
type Item = io::Result<LockedFile>;

#[cfg(not(madsim))]
fn next(&mut self) -> Option<Self::Item> {
if self.stopped.load(Ordering::Relaxed) {
return None;
Expand All @@ -171,6 +153,14 @@ impl Iterator for FilePipeline {
.next()
.map(Ok)
}

#[cfg(madsim)]
fn next(&mut self) -> Option<Self::Item> {
if self.stopped.load(Ordering::Relaxed) {
return None;
}
Some(Self::alloc(&self.dir, self.file_size, &mut self.file_count))
}
}

impl std::fmt::Debug for FilePipeline {
Expand Down

0 comments on commit eacbb23

Please sign in to comment.