Skip to content

Commit

Permalink
Fixing 2 bugs that is preventing the deletion of files in mrecordlog.
Browse files Browse the repository at this point in the history
- the GC should happen after truncating in the memory queue.
- GC should happen when deleting a queue (not just upon truncation)
  • Loading branch information
fulmicoton committed Dec 15, 2023
1 parent ebee0fd commit c922214
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
42 changes: 31 additions & 11 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ impl MultiRecordLog {
}
// io errors are non-recoverable
let record_log_writer: RecordWriter<RollingWriter> = record_reader.into_writer().await?;
Ok(MultiRecordLog {
let mut multi_record_log = MultiRecordLog {
record_log_writer,
in_mem_queues,
next_sync: sync_policy.into(),
multi_record_spare_buffer: Vec::new(),
})
};
multi_record_log.run_gc_if_necessary().await?;
Ok(multi_record_log)
}

#[cfg(test)]
Expand All @@ -169,8 +171,9 @@ impl MultiRecordLog {
let position = self.in_mem_queues.next_position(queue)?;
let record = MultiPlexedRecord::DeleteQueue { queue, position };
self.record_log_writer.write_record(record).await?;
self.sync().await?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary().await?;
self.sync().await?;
Ok(())
}

Expand Down Expand Up @@ -252,14 +255,21 @@ impl MultiRecordLog {
Ok(Some(max_position))
}

async fn record_empty_queues_position(&mut self) -> Result<(), TruncateError> {
async fn record_empty_queues_position(&mut self) -> io::Result<()> {
let mut has_empty_queues = false;
for (queue_id, queue) in self.in_mem_queues.empty_queues() {
let next_position = queue.next_position();
let record = MultiPlexedRecord::RecordPosition {
queue: queue_id,
position: next_position,
};
self.record_log_writer.write_record(record).await?;
has_empty_queues = true
}
if has_empty_queues {
// We need to sync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.sync().await?;
}
Ok(())
}
Expand All @@ -277,6 +287,17 @@ impl MultiRecordLog {
self.record_log_writer
.write_record(MultiPlexedRecord::Truncate { position, queue })
.await?;
let removed_count = self
.in_mem_queues
.truncate(queue, position)
.await
.unwrap_or(0);
self.run_gc_if_necessary().await?;
self.sync_on_policy().await?;
Ok(removed_count)
}

async fn run_gc_if_necessary(&mut self) -> io::Result<()> {
if self
.record_log_writer
.directory()
Expand All @@ -285,16 +306,15 @@ impl MultiRecordLog {
// We are about to delete files.
// Let's make sure we record the offsets of the empty queues
// so that we don't lose that information after dropping the files.
//
// But first we clone the current file number to make sure that the file that will
// contain the truncate positions it self won't be GC'ed.
let _file_number = self.record_log_writer.current_file().clone();
self.record_empty_queues_position().await?;

self.record_log_writer.directory().gc().await?;
}
self.sync_on_policy().await?;
let removed_count = self
.in_mem_queues
.truncate(queue, position)
.await
.unwrap_or(0);
Ok(removed_count)
Ok(())
}

pub fn range<R>(
Expand Down
19 changes: 19 additions & 0 deletions src/proptests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,22 @@ async fn test_multi_record() {
);
}
}

/// Unit tests reproducing bugs found with proptest in the past.
#[tokio::test]
async fn test_proptest_multirecord_reproduce_1() {
let block_size = 32_731;
let mut env = PropTestEnv::new(block_size).await;
env.apply(Operation::MultiAppend {
queue: "q1",
count: 4,
skip_one_pos: false,
})
.await;
env.apply(Operation::Truncate {
queue: "q1",
pos: 3,
})
.await;
env.apply(Operation::Reopen {}).await;
}

0 comments on commit c922214

Please sign in to comment.