Skip to content

Commit

Permalink
Added a summary method to extract meta information about the queue
Browse files Browse the repository at this point in the history
preventing GC.
  • Loading branch information
fulmicoton committed Mar 22, 2024
1 parent 187486f commit 1dae21d
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 11 deletions.
10 changes: 4 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ use mrecordlog::MultiRecordLog;

fn main() -> Result<(), Box<dyn Error>> {
let multi_record_log = MultiRecordLog::open(Path::new("."))?;
for queue in multi_record_log.list_queues() {
println!("queue {queue}");
let mut range = multi_record_log.range(queue, ..).unwrap();
let first = range.next().map(|record| record.position);
let last = range.last().map(|record| record.position);
println!("{first:?}..{last:?}");
let summary = multi_record_log.summary();
for (queue, summary) in summary.queues {
println!("{}", queue);
println!("{summary:?}");
}
Ok(())
}
2 changes: 2 additions & 0 deletions src/mem/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
mod queue;
mod queues;
mod summary;

pub(crate) use self::queue::MemQueue;
pub(crate) use self::queues::MemQueues;
pub use self::summary::{QueueSummary, QueuesSummary};

#[cfg(test)]
mod tests;
22 changes: 22 additions & 0 deletions src/mem/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};

use crate::error::AppendError;
use crate::mem::QueueSummary;
use crate::rolling::FileNumber;
use crate::Record;

Expand Down Expand Up @@ -114,10 +115,31 @@ impl MemQueue {
}
}

pub fn summary(&self) -> QueueSummary {
QueueSummary {
start: self.start_position(),
end: self.last_position(),
file_number: self.first_file_number(),
}
}

pub fn is_empty(&self) -> bool {
self.record_metas.is_empty()
}

pub(crate) fn first_file_number(&self) -> Option<u64> {
let file_number: &FileNumber = self
.record_metas
.iter()
.filter_map(|record_meta| record_meta.file_number.as_ref())
.next()?;
Some(file_number.file_number())
}

pub(crate) fn start_position(&self) -> u64 {
self.start_position
}

/// Returns the position of the last record appended to the queue.
pub fn last_position(&self) -> Option<u64> {
self.next_position().checked_sub(1)
Expand Down
10 changes: 9 additions & 1 deletion src/mem/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::ops::RangeBounds;
use tracing::{info, warn};

use crate::error::{AlreadyExists, AppendError, MissingQueue};
use crate::mem::MemQueue;
use crate::mem::{MemQueue, QueuesSummary};
use crate::rolling::FileNumber;
use crate::Record;

Expand All @@ -23,6 +23,14 @@ impl MemQueues {
Ok(())
}

pub fn summary(&self) -> QueuesSummary {
let mut summary = QueuesSummary::default();
for (queue_name, queue) in &self.queues {
summary.queues.insert(queue_name.clone(), queue.summary());
}
summary
}

pub fn delete_queue(&mut self, queue: &str) -> Result<(), MissingQueue> {
info!(queue = queue, "deleting queue");
if self.queues.remove(queue).is_none() {
Expand Down
15 changes: 15 additions & 0 deletions src/mem/summary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use std::collections::BTreeMap;

use serde::Serialize;

#[derive(Default, Serialize, Debug)]
pub struct QueueSummary {
pub start: u64,
pub end: Option<u64>,
pub file_number: Option<u64>,
}

#[derive(Default, Serialize)]
pub struct QueuesSummary {
pub queues: BTreeMap<String, QueueSummary>,
}
8 changes: 6 additions & 2 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::{debug, event_enabled, info, warn, Level};
use crate::error::{
AppendError, CreateQueueError, DeleteQueueError, MissingQueue, ReadRecordError, TruncateError,
};
use crate::mem::MemQueue;
use crate::mem::{MemQueue, QueuesSummary};
use crate::record::{MultiPlexedRecord, MultiRecord};
use crate::recordlog::RecordWriter;
use crate::rolling::RollingWriter;
Expand Down Expand Up @@ -79,6 +79,10 @@ impl MultiRecordLog {
Self::open_with_prefs(directory_path, SyncPolicy::OnAppend)
}

pub fn summary(&self) -> QueuesSummary {
self.in_mem_queues.summary()
}

/// Open the multi record log, syncing following the provided policy.
pub fn open_with_prefs(
directory_path: &Path,
Expand All @@ -91,7 +95,7 @@ impl MultiRecordLog {
debug!("loading wal");
loop {
let file_number = record_reader.read().current_file().clone();
let Ok(record) = record_reader.read_record() else {
let Ok(record) = record_reader.read_record::<MultiPlexedRecord>() else {
warn!("Detected corrupted record: some data may have been lost");
continue;
};
Expand Down
46 changes: 45 additions & 1 deletion src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::error;
use crate::error::MultiRecordCorruption;
use crate::Serializable;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) enum MultiPlexedRecord<'a> {
/// Adds multiple records to a specific queue.
AppendRecords {
Expand All @@ -27,6 +27,50 @@ pub(crate) enum MultiPlexedRecord<'a> {
},
}

impl<'a> std::fmt::Debug for MultiPlexedRecord<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::AppendRecords {
queue,
position,
records,
} => f
.debug_struct("AppendRecords")
.field("queue", queue)
.field("position", position)
.field("records_len", &records.count())
.finish(),
Self::Truncate { queue, position } => f
.debug_struct("Truncate")
.field("queue", queue)
.field("position", position)
.finish(),
Self::RecordPosition { queue, position } => f
.debug_struct("RecordPosition")
.field("queue", queue)
.field("position", position)
.finish(),
Self::DeleteQueue { queue, position } => f
.debug_struct("DeleteQueue")
.field("queue", queue)
.field("position", position)
.finish(),
}
}
}

impl<'a> MultiPlexedRecord<'a> {
#[allow(dead_code)]
pub fn queue_id(&self) -> &'a str {
match self {
Self::AppendRecords { queue, .. } => queue,
Self::Truncate { queue, .. } => queue,
Self::RecordPosition { queue, .. } => queue,
Self::DeleteQueue { queue, .. } => queue,
}
}
}

#[repr(u8)]
#[derive(Clone, Copy, Debug)]
enum RecordType {
Expand Down
1 change: 0 additions & 1 deletion src/rolling/file_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl FileNumber {
format!("wal-{:020}", self.file_number)
}

#[cfg(test)]
pub fn file_number(&self) -> u64 {
*self.file_number
}
Expand Down

0 comments on commit 1dae21d

Please sign in to comment.