Skip to content

Commit

Permalink
feat: read and delete overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Aug 25, 2024
1 parent 45c77dc commit 8c2797d
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 9 deletions.
84 changes: 80 additions & 4 deletions nomt/src/beatree/leaf/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

use super::{
node::MAX_OVERFLOW_CELL_NODE_POINTERS,
store::LeafStoreWriter,
store::{LeafStoreReader, LeafStoreWriter},
};

const BODY_SIZE: usize = PAGE_SIZE - 4;
Expand Down Expand Up @@ -72,10 +72,24 @@ pub fn chunk(value: &[u8], leaf_writer: &mut LeafStoreWriter) -> Vec<PageNumber>
cell
}

/// Decode an overflow cell, returning the size of the value plus the pages numbers within the cell.
pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, impl Iterator<Item=PageNumber> + 'a) {
assert!(raw.len() >= 12);
assert_eq!(raw.len() % 4, 0);

let value_size = u64::from_le_bytes(raw[0..8].try_into().unwrap());

let iter = raw[8..].chunks(4)
.map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap())));

(value_size as usize, iter)
}

/// Encode a list of page numbers into an overflow cell.
pub fn encode_cell(pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; pages.len() * 4];
for (pn, slice) in pages.iter().zip(v.chunks_mut(4)) {
pub fn encode_cell(value_size: usize, pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; 8 + pages.len() * 4];
v[0..8].copy_from_slice(&(value_size as u64).to_le_bytes());
for (pn, slice) in pages[8..].iter().zip(v.chunks_mut(4)) {
slice.copy_from_slice(&pn.0.to_le_bytes());
}

Expand Down Expand Up @@ -106,3 +120,65 @@ fn total_needed_pages(value_size: usize) -> usize {
fn needed_pages(size: usize) -> usize {
(size + BODY_SIZE - 1) / BODY_SIZE
}

pub fn read(cell: &[u8], leaf_reader: &LeafStoreReader) -> Vec<u8> {
let (value_size, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let mut value = Vec::with_capacity(value_size);

let mut page_numbers = Vec::with_capacity(total_pages);
page_numbers.extend(cell_pages);

for i in 0..total_pages {
let page = leaf_reader.query(page_numbers[i]);
let (page_pns, bytes) = read_page(&page);
page_numbers.extend(page_pns);
value.extend(bytes);

}

assert_eq!(page_numbers.len(), total_pages);
assert_eq!(value.len(), value_size);

value
}

pub fn delete(
cell: &[u8],
leaf_reader: &LeafStoreReader,
leaf_writer: &mut LeafStoreWriter,
) {
let (value_size, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let mut page_numbers = Vec::with_capacity(total_pages);
page_numbers.extend(cell_pages);

for i in 0..total_pages {
let page = leaf_reader.query(page_numbers[i]);
let (page_pns, bytes) = read_page(&page);
page_numbers.extend(page_pns);

// stop at the first page containing value data. no more pages will have more
// page numbers to release.
if bytes.len() > 0 { break }
}

assert_eq!(page_numbers.len(), total_pages);

for pn in page_numbers {
leaf_writer.release(pn);
}
}

fn read_page<'a>(page: &'a Page) -> (impl Iterator<Item=PageNumber> + 'a, &'a [u8]) {
let n_pages = u16::from_le_bytes(page[0..2].try_into().unwrap()) as usize;
let n_bytes = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;

let iter = page[2..][..n_pages * 4].chunks(4)
.map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap())));

let bytes = &page[2 + n_pages * 4..][..n_bytes];
(iter, bytes)
}
8 changes: 6 additions & 2 deletions nomt/src/beatree/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ pub fn lookup(

let leaf = LeafNode { inner: leaf_store.query(leaf_pn) };

// TODO: handle overflow.
let maybe_value = leaf.get(&key).map(|(v, _is_overflow)| v.to_vec());
let maybe_value = leaf.get(&key).map(|(v, is_overflow)| if is_overflow {
leaf::overflow::read(v, leaf_store)
} else {
v.to_vec()
});

Ok(maybe_value)
}

Expand Down
10 changes: 7 additions & 3 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,17 @@ impl Updater {
let (value_change, overflow) = if let Some(ref large_value) =
value_change.as_ref().filter(|v| v.len() > MAX_LEAF_VALUE_SIZE) {
let pages = overflow::chunk(large_value, &mut ctx.leaf_writer);
(Some(overflow::encode_cell(&pages)), true)
(Some(overflow::encode_cell(large_value.len(), &pages)), true)
} else {
(value_change, false)
};

// TODO: delete or schedule delete of all pages used by the given overflow cell.
self.leaf_updater.ingest(key, value_change, overflow, |_overflow_cell| {});
let delete_overflow = |overflow_cell: &[u8]| overflow::delete(
overflow_cell,
&ctx.leaf_reader,
&mut ctx.leaf_writer,
);
self.leaf_updater.ingest(key, value_change, overflow, delete_overflow);
}

fn complete(&mut self, ctx: &mut Ctx) {
Expand Down

0 comments on commit 8c2797d

Please sign in to comment.