Skip to content

Commit

Permalink
simplified (hopefully)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Mar 16, 2024
1 parent e69fcc6 commit 2d8c989
Show file tree
Hide file tree
Showing 10 changed files with 195 additions and 208 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ thiserror = "1"
tracing = "0.1.37"

[dev-dependencies]
mock_instant = "0.3"
criterion = "0.4"
futures = "0.3"
proptest = "1"
Expand Down
30 changes: 3 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,16 @@ pub use multi_record_log::{MultiRecordLog, SyncPolicy};
#[derive(Debug)]
pub struct Record<'a> {
pub position: u64,
payload: PagesBuf<'a>,
pub payload: PagesBuf<'a>,
}


impl<'a> Record<'a> {
#[cfg(test)]
pub fn payload_equal(&self, mut payload: &[u8]) -> bool {
use bytes::Buf;
let mut self_payload = self.payload;
if self_payload.remaining() != payload.len() {
return false;
}
while self_payload.has_remaining() {
let chunk = self_payload.chunk();
let chunk_len = chunk.len();
if chunk != &payload[..chunk_len] {
return false;
}
self_payload.advance(chunk_len);
payload = &payload[chunk_len..];
}
true
pub fn payload_equal(&self, payload: &[u8]) -> bool {
self.payload.to_cow() == payload
}
}

// impl<'a> Record<'a> {
// pub fn new(position: u64, payload: &'a [u8]) -> Self {
// Record {
// position,
// payload: Cow::Borrowed(payload),
// }
// }
// }

#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub struct FileNumber {
file_number: Arc<u64>,
Expand Down
87 changes: 57 additions & 30 deletions src/mem/arena.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::time::{Duration, Instant};
use std::time::Duration;
#[cfg(not(test))]
use std::time::Instant;

#[cfg(test)]
use mock_instant::Instant;

#[cfg(not(test))]
pub const PAGE_SIZE: usize = 1 << 20;
Expand Down Expand Up @@ -60,7 +65,7 @@ impl Default for ArenaStats {
fn default() -> ArenaStats {
ArenaStats {
// We arbitrarily initialize num used pages former to 100.
max_num_used_pages_former: 100,
max_num_used_pages_former: 0,
max_num_used_pages_current: 0,
call_counter: 0u8,
next_window_start: Instant::now(),
Expand All @@ -76,48 +81,43 @@ impl ArenaStats {
self.next_window_start = now + WINDOW;
}

/// Records the number of used pages, and returns an estimation of the maximum number of pages
/// in the last 5 minutes.
pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize {
// The only function of the call counter is to avoid calling `Instant::now()`
// at every single call.
self.call_counter = self.call_counter.wrapping_add(1);
self.call_counter = (self.call_counter + 1) % 64;
if self.call_counter == 0u8 {
let now = Instant::now();
if now > self.next_window_start {
self.roll(now);
}
}
self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages);
self.target_num_pages()
}

// This method returns a target number of pages.
//
// If we currently have a number of allocated pages higher than this, we need to free
// pages until we reach this number.
fn target_num_pages(&self) -> usize {
let max_over_both_windows = self
.max_num_used_pages_former
.max(self.max_num_used_pages_current);
(max_over_both_windows + 10).max(max_over_both_windows * 105 / 100)
self.max_num_used_pages_former
.max(self.max_num_used_pages_current)
}
}

impl Arena {
/// Returns an allocated page id.
pub fn get_page_id(&mut self) -> PageId {
pub fn acquire_page(&mut self) -> PageId {
if let Some(page_id) = self.free_page_ids.pop() {
assert!(self.pages[page_id.0].is_some());
self.gc();
return page_id;
}
let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice();
if let Some(free_slot) = self.free_slots.pop() {
let slot = &mut self.pages[free_slot.0];
assert!(slot.is_none());
*slot = Some(page);
return free_slot;
self.gc();
free_slot
} else {
let new_page_id = self.pages.len();
self.pages.push(Some(page));
self.gc();
PageId(new_page_id)
}
}
Expand All @@ -141,7 +141,11 @@ impl Arena {
/// `gc` releases memory by deallocating ALL of the free pages.
pub fn gc(&mut self) {
let num_used_pages = self.num_used_pages();
let target_num_pages = self.stats.record_num_used_page(num_used_pages);
let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages);
// We pick a target slightly higher than the maximum number of pages used in the last 5
// minutes to avoid needless allocations when we are experience a general increase
// in memory usage.
let target_num_pages = (max_used_num_pages_in_last_5_min * 105 / 100).max(10);
let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages);
assert!(num_pages_to_free <= self.free_page_ids.len());
for _ in 0..num_pages_to_free {
Expand All @@ -162,40 +166,63 @@ impl Arena {
self.pages.len() - self.free_slots.len() - self.free_page_ids.len()
}

pub fn capacity(&self) -> usize {
self.num_allocated_pages() * PAGE_SIZE
}

pub fn unused_capacity(&self) -> usize {
self.free_page_ids.len() * PAGE_SIZE
}
}

#[cfg(test)]
mod tests {
use mock_instant::MockClock;

use super::*;

#[test]
fn test_arena_simple() {
let mut arena = Arena::default();
assert_eq!(arena.capacity(), 0);
assert_eq!(arena.get_page_id(), PageId(0));
assert_eq!(arena.get_page_id(), PageId(1));
assert_eq!(arena.num_allocated_pages(), 0);
assert_eq!(arena.acquire_page(), PageId(0));
assert_eq!(arena.acquire_page(), PageId(1));
arena.release_page(PageId(0));
assert_eq!(arena.get_page_id(), PageId(0));
assert_eq!(arena.acquire_page(), PageId(0));
}

#[test]
fn test_arena_gc() {
let mut arena = Arena::default();
assert_eq!(arena.capacity(), 0);
assert_eq!(arena.get_page_id(), PageId(0));
assert_eq!(arena.get_page_id(), PageId(1));
assert_eq!(arena.num_allocated_pages(), 0);
assert_eq!(arena.acquire_page(), PageId(0));
assert_eq!(arena.acquire_page(), PageId(1));
arena.release_page(PageId(1));
assert_eq!(arena.num_allocated_pages(), 2);
arena.gc();
assert_eq!(arena.num_allocated_pages(), 2);
assert_eq!(arena.get_page_id(), PageId(1));
assert_eq!(arena.acquire_page(), PageId(1));
assert_eq!(arena.num_allocated_pages(), 2);
}

#[test]
fn test_arena_stats() {
let mut arena_stats = ArenaStats::default();
for _ in 0..256 {
assert_eq!(arena_stats.record_num_used_page(10), 10);
}
MockClock::advance(WINDOW.mul_f32(1.1f32));
for _ in 0..256 {
assert_eq!(arena_stats.record_num_used_page(1), 10);
}
MockClock::advance(WINDOW.mul_f32(1.1f32));
for _ in 0..256 {
arena_stats.record_num_used_page(1);
}
assert_eq!(arena_stats.record_num_used_page(1), 1);
assert_eq!(arena_stats.record_num_used_page(2), 2);
for _ in 0..256 {
assert_eq!(arena_stats.record_num_used_page(1), 2);
}
MockClock::advance(WINDOW);
for _ in 0..256 {
assert_eq!(arena_stats.record_num_used_page(1), 2);
}
}
}
4 changes: 2 additions & 2 deletions src/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ mod queue;
mod queues;
mod rolling_buffer;

use self::arena::{Arena, PAGE_SIZE};
use self::arena::Arena;
pub(crate) use self::queue::MemQueue;
pub(crate) use self::queues::MemQueues;
use self::rolling_buffer::RollingBuffer;
pub use self::rolling_buffer::PagesBuf;
use self::rolling_buffer::RollingBuffer;

#[cfg(test)]
mod tests;
42 changes: 14 additions & 28 deletions src/mem/queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::VecDeque;
use std::ops::{Bound, RangeBounds};

use crate::error::AppendError;
use crate::mem::{Arena, RollingBuffer};
use crate::{FileNumber, Record};
Expand All @@ -16,30 +18,17 @@ struct RecordMeta {
pub(crate) struct MemQueue {
// Concatenated records
concatenated_records: RollingBuffer,
// If `record_metas` is not empty, `start_position` should be the position of the first record.
start_position: u64,
record_metas: Vec<RecordMeta>,
record_metas: VecDeque<RecordMeta>,
}

// fn concatenate_buffers<'a>(mut buf: impl Buf + 'a) -> Cow<'a, [u8]> {
// let first_chunk: &'a buf = buf.chunk();
// if buf.remaining() == first_chunk.len() {
// return Cow::Borrowed(first_chunk);
// }
// let mut concatenated_buffer: Vec<u8> = Vec::with_capacity(buf.remaining());
// while buf.has_remaining() {
// let chunk = buf.chunk();
// concatenated_buffer.extend_from_slice(chunk);
// buf.advance(chunk.len());
// }
// Cow::Owned(concatenated_buffer)
// }

impl MemQueue {
pub fn with_next_position(next_position: u64) -> Self {
MemQueue {
concatenated_records: RollingBuffer::new(),
start_position: next_position,
record_metas: Vec::new(),
record_metas: Default::default(),
}
}

Expand All @@ -54,10 +43,10 @@ impl MemQueue {

/// Returns the last record stored in the queue.
pub fn last_record<'a>(&'a self, arena: &'a Arena) -> Option<Record<'a>> {
let record = self.record_metas.last()?;
let record = self.record_metas.back()?;
let record_payload = self
.concatenated_records
.get_range_buf(record.start_offset.., arena);
.get_range(record.start_offset.., arena);
Some(Record {
position: record.position,
payload: record_payload,
Expand All @@ -67,7 +56,7 @@ impl MemQueue {
/// Returns what the next position should be.
pub fn next_position(&self) -> u64 {
self.record_metas
.last()
.back()
.map(|record| record.position + 1)
.unwrap_or(self.start_position)
}
Expand All @@ -92,7 +81,7 @@ impl MemQueue {
self.start_position = target_position;
}

let file_number = if let Some(record_meta) = self.record_metas.last_mut() {
let file_number = if let Some(record_meta) = self.record_metas.back_mut() {
if record_meta.file_number.as_ref() == Some(file_number) {
record_meta.file_number.take().unwrap()
} else {
Expand All @@ -103,11 +92,11 @@ impl MemQueue {
};

let record_meta = RecordMeta {
start_offset: self.concatenated_records.len(),
start_offset: self.concatenated_records.end_offset(),
file_number: Some(file_number),
position: target_position,
};
self.record_metas.push(record_meta);
self.record_metas.push_back(record_meta);
self.concatenated_records.extend_from_slice(payload, arena);
Ok(())
}
Expand Down Expand Up @@ -149,9 +138,9 @@ impl MemQueue {
} else {
Bound::Unbounded
};
let payload= self
let payload = self
.concatenated_records
.get_range_buf((start_bound, end_bound), arena);
.get_range((start_bound, end_bound), arena);
// let payload = concatenate_buffers(payload_buf);
Record { position, payload }
})
Expand All @@ -178,11 +167,8 @@ impl MemQueue {

let start_offset_to_keep: usize = self.record_metas[first_record_to_keep].start_offset;
self.record_metas.drain(..first_record_to_keep);
// for record_meta in &mut self.record_metas {
// record_meta.start_offset -= start_offset_to_keep;
// }
self.concatenated_records
.truncate_up_to_included(start_offset_to_keep, arena);
.truncate_up_to_excluded(start_offset_to_keep, arena);
self.start_position = truncate_up_to_pos + 1;
first_record_to_keep
}
Expand Down
5 changes: 1 addition & 4 deletions src/mem/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,7 @@ impl MemQueues {
let size = self
.queues
.iter()
.map(|(name, queue)| {
dbg!(queue.size());
name.len() + queue.size()
})
.map(|(name, queue)| name.len() + queue.size())
.sum();

let capacity = self
Expand Down
Loading

0 comments on commit 2d8c989

Please sign in to comment.