diff --git a/src/main.rs b/src/main.rs index dfbbf5a..6e4b361 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,12 +5,10 @@ use mrecordlog::MultiRecordLog; fn main() -> Result<(), Box> { let multi_record_log = MultiRecordLog::open(Path::new("."))?; - for queue in multi_record_log.list_queues() { - println!("queue {queue}"); - let mut range = multi_record_log.range(queue, ..).unwrap(); - let first = range.next().map(|record| record.position); - let last = range.last().map(|record| record.position); - println!("{first:?}..{last:?}"); + let summary = multi_record_log.summary(); + for (queue, summary) in summary.queues { + println!("{}", queue); + println!("{summary:?}"); } Ok(()) } diff --git a/src/mem/mod.rs b/src/mem/mod.rs index 11db7db..93dc2c8 100644 --- a/src/mem/mod.rs +++ b/src/mem/mod.rs @@ -1,8 +1,10 @@ mod queue; mod queues; +mod summary; pub(crate) use self::queue::MemQueue; pub(crate) use self::queues::MemQueues; +pub use self::summary::{QueueSummary, QueuesSummary}; #[cfg(test)] mod tests; diff --git a/src/mem/queue.rs b/src/mem/queue.rs index 74b66a2..70979e2 100644 --- a/src/mem/queue.rs +++ b/src/mem/queue.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::ops::{Bound, RangeBounds}; use crate::error::AppendError; +use crate::mem::QueueSummary; use crate::rolling::FileNumber; use crate::Record; @@ -114,10 +115,31 @@ impl MemQueue { } } + pub fn summary(&self) -> QueueSummary { + QueueSummary { + start: self.start_position(), + end: self.last_position(), + file_number: self.first_file_number(), + } + } + pub fn is_empty(&self) -> bool { self.record_metas.is_empty() } + pub(crate) fn first_file_number(&self) -> Option { + let file_number: &FileNumber = self + .record_metas + .iter() + .filter_map(|record_meta| record_meta.file_number.as_ref()) + .next()?; + Some(file_number.file_number()) + } + + pub(crate) fn start_position(&self) -> u64 { + self.start_position + } + /// Returns the position of the last record appended to the queue. pub fn last_position(&self) -> Option { self.next_position().checked_sub(1) diff --git a/src/mem/queues.rs b/src/mem/queues.rs index 3adbbac..0610be6 100644 --- a/src/mem/queues.rs +++ b/src/mem/queues.rs @@ -4,7 +4,7 @@ use std::ops::RangeBounds; use tracing::{info, warn}; use crate::error::{AlreadyExists, AppendError, MissingQueue}; -use crate::mem::MemQueue; +use crate::mem::{MemQueue, QueuesSummary}; use crate::rolling::FileNumber; use crate::Record; @@ -23,6 +23,14 @@ impl MemQueues { Ok(()) } + pub fn summary(&self) -> QueuesSummary { + let mut summary = QueuesSummary::default(); + for (queue_name, queue) in &self.queues { + summary.queues.insert(queue_name.clone(), queue.summary()); + } + summary + } + pub fn delete_queue(&mut self, queue: &str) -> Result<(), MissingQueue> { info!(queue = queue, "deleting queue"); if self.queues.remove(queue).is_none() { diff --git a/src/mem/summary.rs b/src/mem/summary.rs new file mode 100644 index 0000000..71f62b2 --- /dev/null +++ b/src/mem/summary.rs @@ -0,0 +1,15 @@ +use std::collections::BTreeMap; + +use serde::Serialize; + +#[derive(Default, Serialize, Debug)] +pub struct QueueSummary { + pub start: u64, + pub end: Option, + pub file_number: Option, +} + +#[derive(Default, Serialize)] +pub struct QueuesSummary { + pub queues: BTreeMap, +} diff --git a/src/multi_record_log.rs b/src/multi_record_log.rs index 44963bc..ca3ca22 100644 --- a/src/multi_record_log.rs +++ b/src/multi_record_log.rs @@ -9,7 +9,7 @@ use tracing::{debug, event_enabled, info, warn, Level}; use crate::error::{ AppendError, CreateQueueError, DeleteQueueError, MissingQueue, ReadRecordError, TruncateError, }; -use crate::mem::MemQueue; +use crate::mem::{MemQueue, QueuesSummary}; use crate::record::{MultiPlexedRecord, MultiRecord}; use crate::recordlog::RecordWriter; use crate::rolling::RollingWriter; @@ -79,6 +79,10 @@ impl MultiRecordLog { Self::open_with_prefs(directory_path, SyncPolicy::OnAppend) } + pub fn summary(&self) -> QueuesSummary { + self.in_mem_queues.summary() + } + /// Open the multi record log, syncing following the provided policy. pub fn open_with_prefs( directory_path: &Path, @@ -91,7 +95,7 @@ impl MultiRecordLog { debug!("loading wal"); loop { let file_number = record_reader.read().current_file().clone(); - let Ok(record) = record_reader.read_record() else { + let Ok(record) = record_reader.read_record::() else { warn!("Detected corrupted record: some data may have been lost"); continue; }; diff --git a/src/record.rs b/src/record.rs index 6807820..aa5b774 100644 --- a/src/record.rs +++ b/src/record.rs @@ -6,7 +6,7 @@ use tracing::error; use crate::error::MultiRecordCorruption; use crate::Serializable; -#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Copy, Clone, Eq, PartialEq)] pub(crate) enum MultiPlexedRecord<'a> { /// Adds multiple records to a specific queue. AppendRecords { @@ -27,6 +27,50 @@ pub(crate) enum MultiPlexedRecord<'a> { }, } +impl<'a> std::fmt::Debug for MultiPlexedRecord<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::AppendRecords { + queue, + position, + records, + } => f + .debug_struct("AppendRecords") + .field("queue", queue) + .field("position", position) + .field("records_len", &records.count()) + .finish(), + Self::Truncate { queue, position } => f + .debug_struct("Truncate") + .field("queue", queue) + .field("position", position) + .finish(), + Self::RecordPosition { queue, position } => f + .debug_struct("RecordPosition") + .field("queue", queue) + .field("position", position) + .finish(), + Self::DeleteQueue { queue, position } => f + .debug_struct("DeleteQueue") + .field("queue", queue) + .field("position", position) + .finish(), + } + } +} + +impl<'a> MultiPlexedRecord<'a> { + #[allow(dead_code)] + pub fn queue_id(&self) -> &'a str { + match self { + Self::AppendRecords { queue, .. } => queue, + Self::Truncate { queue, .. } => queue, + Self::RecordPosition { queue, .. } => queue, + Self::DeleteQueue { queue, .. } => queue, + } + } +} + #[repr(u8)] #[derive(Clone, Copy, Debug)] enum RecordType { diff --git a/src/rolling/file_number.rs b/src/rolling/file_number.rs index 6f697ad..b928857 100644 --- a/src/rolling/file_number.rs +++ b/src/rolling/file_number.rs @@ -118,7 +118,6 @@ impl FileNumber { format!("wal-{:020}", self.file_number) } - #[cfg(test)] pub fn file_number(&self) -> u64 { *self.file_number }