Skip to content

Commit

Permalink
feat: read and delete overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Aug 25, 2024
1 parent 9f01711 commit 3065ac0
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 9 deletions.
86 changes: 82 additions & 4 deletions nomt/src/beatree/leaf/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::{
io::{Page, PAGE_SIZE},
};

use super::{node::MAX_OVERFLOW_CELL_NODE_POINTERS, store::LeafStoreWriter};
use super::{
node::MAX_OVERFLOW_CELL_NODE_POINTERS,
store::{LeafStoreReader, LeafStoreWriter},
};

const BODY_SIZE: usize = PAGE_SIZE - 4;
const MAX_PNS: usize = BODY_SIZE / 4;
Expand Down Expand Up @@ -73,10 +76,25 @@ pub fn chunk(value: &[u8], leaf_writer: &mut LeafStoreWriter) -> Vec<PageNumber>
cell
}

/// Decode an overflow cell, returning the size of the value plus the pages numbers within the cell.
pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, impl Iterator<Item = PageNumber> + 'a) {
assert!(raw.len() >= 12);
assert_eq!(raw.len() % 4, 0);

let value_size = u64::from_le_bytes(raw[0..8].try_into().unwrap());

let iter = raw[8..]
.chunks(4)
.map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap())));

(value_size as usize, iter)
}

/// Encode a list of page numbers into an overflow cell.
pub fn encode_cell(pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; pages.len() * 4];
for (pn, slice) in pages.iter().zip(v.chunks_mut(4)) {
pub fn encode_cell(value_size: usize, pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; 8 + pages.len() * 4];
v[0..8].copy_from_slice(&(value_size as u64).to_le_bytes());
for (pn, slice) in pages[8..].iter().zip(v.chunks_mut(4)) {
slice.copy_from_slice(&pn.0.to_le_bytes());
}

Expand Down Expand Up @@ -109,3 +127,63 @@ fn total_needed_pages(value_size: usize) -> usize {
fn needed_pages(size: usize) -> usize {
(size + BODY_SIZE - 1) / BODY_SIZE
}

pub fn read(cell: &[u8], leaf_reader: &LeafStoreReader) -> Vec<u8> {
let (value_size, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let mut value = Vec::with_capacity(value_size);

let mut page_numbers = Vec::with_capacity(total_pages);
page_numbers.extend(cell_pages);

for i in 0..total_pages {
let page = leaf_reader.query(page_numbers[i]);
let (page_pns, bytes) = read_page(&page);
page_numbers.extend(page_pns);
value.extend(bytes);
}

assert_eq!(page_numbers.len(), total_pages);
assert_eq!(value.len(), value_size);

value
}

pub fn delete(cell: &[u8], leaf_reader: &LeafStoreReader, leaf_writer: &mut LeafStoreWriter) {
let (value_size, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let mut page_numbers = Vec::with_capacity(total_pages);
page_numbers.extend(cell_pages);

for i in 0..total_pages {
let page = leaf_reader.query(page_numbers[i]);
let (page_pns, bytes) = read_page(&page);
page_numbers.extend(page_pns);

// stop at the first page containing value data. no more pages will have more
// page numbers to release.
if bytes.len() > 0 {
break;
}
}

assert_eq!(page_numbers.len(), total_pages);

for pn in page_numbers {
leaf_writer.release(pn);
}
}

fn read_page<'a>(page: &'a Page) -> (impl Iterator<Item = PageNumber> + 'a, &'a [u8]) {
let n_pages = u16::from_le_bytes(page[0..2].try_into().unwrap()) as usize;
let n_bytes = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;

let iter = page[2..][..n_pages * 4]
.chunks(4)
.map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap())));

let bytes = &page[2 + n_pages * 4..][..n_bytes];
(iter, bytes)
}
10 changes: 8 additions & 2 deletions nomt/src/beatree/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ pub fn lookup(
inner: leaf_store.query(leaf_pn),
};

// TODO: handle overflow.
let maybe_value = leaf.get(&key).map(|(v, _is_overflow)| v.to_vec());
let maybe_value = leaf.get(&key).map(|(v, is_overflow)| {
if is_overflow {
leaf::overflow::read(v, leaf_store)
} else {
v.to_vec()
}
});

Ok(maybe_value)
}

Expand Down
8 changes: 5 additions & 3 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,16 @@ impl Updater {
.filter(|v| v.len() > MAX_LEAF_VALUE_SIZE)
{
let pages = overflow::chunk(large_value, &mut ctx.leaf_writer);
(Some(overflow::encode_cell(&pages)), true)
(Some(overflow::encode_cell(large_value.len(), &pages)), true)
} else {
(value_change, false)
};

// TODO: delete or schedule delete of all pages used by the given overflow cell.
let delete_overflow = |overflow_cell: &[u8]| {
overflow::delete(overflow_cell, &ctx.leaf_reader, &mut ctx.leaf_writer)
};
self.leaf_updater
.ingest(key, value_change, overflow, |_overflow_cell| {});
.ingest(key, value_change, overflow, delete_overflow);
}

fn complete(&mut self, ctx: &mut Ctx) {
Expand Down

0 comments on commit 3065ac0

Please sign in to comment.