Skip to content

Commit

Permalink
feat: overflow cells, writing overflow values
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Aug 25, 2024
1 parent d2b7ad6 commit 45c77dc
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 42 deletions.
45 changes: 29 additions & 16 deletions nomt/src/beatree/leaf/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub const MAX_LEAF_VALUE_SIZE: usize = (LEAF_NODE_BODY_SIZE / 3) - 32;
/// Note that this gives an overflow value cell maximum size of 100 bytes.
pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 23;

/// We use the high bit to encode whether a cell is an overflow cell.
const OFFSET_MASK: u16 = 1 << 15;

pub struct LeafNode {
pub inner: Box<Page>,
}
Expand All @@ -63,29 +66,30 @@ impl LeafNode {
key
}

pub fn value(&self, i: usize) -> &[u8] {
let range = self.value_range(self.cell_pointers(), i);
&self.inner[range]
pub fn value(&self, i: usize) -> (&[u8], bool) {
let (range, overflow) = self.value_range(self.cell_pointers(), i);
(&self.inner[range], overflow)
}

pub fn get(&self, key: &Key) -> Option<&[u8]> {
pub fn get(&self, key: &Key) -> Option<(&[u8], bool)> {
let cell_pointers = self.cell_pointers();

search(cell_pointers, key)
.ok()
.map(|index| &self.inner[self.value_range(cell_pointers, index)])
.map(|index| self.value_range(cell_pointers, index))
.map(|(range, overflow)| (&self.inner[range], overflow))
}

// returns the range at which the value of a cell is stored
fn value_range(&self, cell_pointers: &[[u8; 34]], index: usize) -> Range<usize> {
let start = cell_offset(cell_pointers, index);
fn value_range(&self, cell_pointers: &[[u8; 34]], index: usize) -> (Range<usize>, bool) {
let (start, overflow) = cell_offset(cell_pointers, index);
let end = if index == cell_pointers.len() - 1 {
PAGE_SIZE
} else {
cell_offset(cell_pointers, index + 1)
cell_offset(cell_pointers, index + 1).0
};

start..end
(start..end, overflow)
}

fn cell_pointers(&self) -> &[[u8; 34]] {
Expand Down Expand Up @@ -123,13 +127,13 @@ impl LeafBuilder {
}
}

pub fn push(&mut self, key: Key, value: &[u8]) {
pub fn push_cell(&mut self, key: Key, value: &[u8], overflow: bool) {
assert!(self.index < self.leaf.n());

let offset = PAGE_SIZE - self.remaining_value_size;
let cell_pointer = &mut self.leaf.cell_pointers_mut()[self.index];

encode_cell_pointer(&mut cell_pointer[..], key, offset);
encode_cell_pointer(&mut cell_pointer[..], key, offset, overflow);
self.leaf.inner[offset..][..value.len()].copy_from_slice(value);

self.index += 1;
Expand All @@ -146,16 +150,25 @@ pub fn body_size(n: usize, value_size_sum: usize) -> usize {
n * 34 + value_size_sum
}

fn cell_offset(cell_pointers: &[[u8; 34]], index: usize) -> usize {
// get the cell offset and whether the cell is an overflow cell.
fn cell_offset(cell_pointers: &[[u8; 34]], index: usize) -> (usize, bool) {
let mut buf = [0; 2];
buf.copy_from_slice(&cell_pointers[index][32..34]);
u16::from_le_bytes(buf) as usize
let val = u16::from_le_bytes(buf);
((val ^ OFFSET_MASK) as usize, val & OFFSET_MASK != 0)
}

// panics if offset is bigger then u16 or `cell` length is less than 34.
fn encode_cell_pointer(cell: &mut [u8], key: [u8; 32], offset: usize) {
// panics if offset is bigger than 2^15 - 1.
fn encode_cell_pointer(cell: &mut [u8], key: [u8; 32], offset: usize, overflow: bool) {
let mut val = u16::try_from(offset).unwrap();
assert!(val < OFFSET_MASK);

if overflow {
val |= OFFSET_MASK;
}

cell[0..32].copy_from_slice(&key);
cell[32..34].copy_from_slice(&(u16::try_from(offset).unwrap()).to_le_bytes());
cell[32..34].copy_from_slice(&val.to_le_bytes());
}

// look for key in the node. the return value has the same semantics as std binary_search*.
Expand Down
10 changes: 10 additions & 0 deletions nomt/src/beatree/leaf/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pub fn chunk(value: &[u8], leaf_writer: &mut LeafStoreWriter) -> Vec<PageNumber>
cell
}

/// Encode a list of page numbers into an overflow cell.
pub fn encode_cell(pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; pages.len() * 4];
for (pn, slice) in pages.iter().zip(v.chunks_mut(4)) {
slice.copy_from_slice(&pn.0.to_le_bytes());
}

v
}

fn total_needed_pages(value_size: usize) -> usize {
let mut encoded_size = value_size;
let mut total_pages = needed_pages(encoded_size);
Expand Down
3 changes: 2 additions & 1 deletion nomt/src/beatree/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub fn lookup(
let leaf = LeafNode { inner: leaf_store.query(leaf_pn) };

// TODO: handle overflow.
Ok(leaf.get(&key).map(|v| v.to_vec()))
let maybe_value = leaf.get(&key).map(|(v, _is_overflow)| v.to_vec());
Ok(maybe_value)
}

/// Binary search a branch node for the child node containing the key. This returns the last child
Expand Down
63 changes: 40 additions & 23 deletions nomt/src/beatree/ops/update/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ impl BaseLeaf {
self.node.key(i)
}

fn key_value(&self, i: usize) -> (Key, &[u8]) {
(self.node.key(i), self.node.value(i))
fn key_cell(&self, i: usize) -> (Key, &[u8], bool) {
let (value, overflow) = self.node.value(i);
(self.node.key(i), value, overflow)
}

fn next_value(&self) -> &[u8] {
fn next_cell(&self) -> (&[u8], bool) {
self.node.value(self.iter_pos)
}

Expand All @@ -48,7 +49,7 @@ impl BaseLeaf {
}

enum LeafOp {
Insert(Key, Vec<u8>),
Insert(Key, Vec<u8>, bool),
Keep(usize, usize),
}

Expand Down Expand Up @@ -95,11 +96,19 @@ impl LeafUpdater {
self.cutoff = cutoff;
}

pub fn ingest(&mut self, key: Key, value_change: Option<Vec<u8>>) {
self.keep_up_to(Some(&key));
/// Ingest a key/cell pair. Provide a callback which is called if this deletes an existing
/// overflow cell.
pub fn ingest(
&mut self,
key: Key,
value_change: Option<Vec<u8>>,
overflow: bool,
with_deleted_overflow: impl FnMut(&[u8]),
) {
self.keep_up_to(Some(&key), with_deleted_overflow);

if let Some(value) = value_change {
self.ops.push(LeafOp::Insert(key, value));
self.ops.push(LeafOp::Insert(key, value, overflow));
self.bulk_split_step(self.ops.len() - 1);
}
}
Expand All @@ -116,7 +125,8 @@ impl LeafUpdater {
branch_updater.possibly_delete(base.separator);
}

self.keep_up_to(None);
// no cells are going to be deleted from this point onwards - this keeps everything.
self.keep_up_to(None, |_| {});

// note: if we need a merge, it'd be more efficient to attempt to combine it with the last
// leaf of the bulk split first rather than pushing the ops onwards. probably irrelevant
Expand Down Expand Up @@ -163,7 +173,11 @@ impl LeafUpdater {
}
}

fn keep_up_to(&mut self, up_to: Option<&Key>) {
fn keep_up_to(
&mut self,
up_to: Option<&Key>,
mut with_deleted_overflow: impl FnMut(&[u8]),
) {
while let Some(next_key) = self.base.as_ref().and_then(|b| b.next_key()) {
let Some(ref mut base_node) = self.base else {
return;
Expand All @@ -175,12 +189,15 @@ impl LeafUpdater {
break;
}

let (val, overflow) = base_node.next_cell();
if order == Ordering::Greater {
let size = base_node.next_value().len();
self.ops.push(LeafOp::Keep(base_node.iter_pos, size));
self.ops.push(LeafOp::Keep(base_node.iter_pos, val.len()));
base_node.advance_iter();
self.bulk_split_step(self.ops.len() - 1);
} else {
if overflow {
with_deleted_overflow(val);
}
base_node.advance_iter();
}
}
Expand All @@ -191,7 +208,7 @@ impl LeafUpdater {
fn bulk_split_step(&mut self, op_index: usize) {
let item_size = match self.ops[op_index] {
LeafOp::Keep(_, size) => size,
LeafOp::Insert(_, ref val) => val.len(),
LeafOp::Insert(_, ref val, _) => val.len(),
};

let body_size_after = self.gauge.body_size_after(item_size);
Expand Down Expand Up @@ -292,7 +309,7 @@ impl LeafUpdater {
while left_gauge.body_size() < midpoint {
let item_size = match self.ops[split_point] {
LeafOp::Keep(_, size) => size,
LeafOp::Insert(_, ref val) => val.len(),
LeafOp::Insert(_, ref val, _) => val.len(),
};

if left_gauge.body_size_after(item_size) > LEAF_NODE_BODY_SIZE {
Expand Down Expand Up @@ -327,7 +344,7 @@ impl LeafUpdater {
for op in &self.ops[split_point..] {
let item_size = match op {
LeafOp::Keep(_, size) => *size,
LeafOp::Insert(_, ref val) => val.len(),
LeafOp::Insert(_, ref val, _) => val.len(),
};

right_gauge.ingest(item_size);
Expand Down Expand Up @@ -365,24 +382,24 @@ impl LeafUpdater {
// then replace `Keep` ops with pure key-value ops, preparing for the base to be changed.
for op in self.ops.iter_mut() {
let LeafOp::Keep(i, _) = *op else { continue };
let (k, v) = base.key_value(i);
*op = LeafOp::Insert(k, v.to_vec());
let (k, v, o) = base.key_cell(i);
*op = LeafOp::Insert(k, v.to_vec(), o);
}
}

fn op_key(&self, leaf_op: &LeafOp) -> Key {
// UNWRAP: `Keep` leaf ops only exist when base is `Some`.
match leaf_op {
LeafOp::Insert(k, _) => *k,
LeafOp::Insert(k, _, _) => *k,
LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key(*i),
}
}

fn op_key_value<'a>(&'a self, leaf_op: &'a LeafOp) -> (Key, &'a [u8]) {
fn op_cell<'a>(&'a self, leaf_op: &'a LeafOp) -> (Key, &'a [u8], bool) {
// UNWRAP: `Keep` leaf ops only exist when base is `Some`.
match leaf_op {
LeafOp::Insert(k, v) => (*k, &v[..]),
LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key_value(*i),
LeafOp::Insert(k, v, o) => (*k, &v[..], *o),
LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key_cell(*i),
}
}

Expand All @@ -391,15 +408,15 @@ impl LeafUpdater {
.iter()
.map(|op| match op {
LeafOp::Keep(_, size) => *size,
LeafOp::Insert(_, v) => v.len(),
LeafOp::Insert(_, v, _) => v.len(),
})
.sum();

let mut leaf_builder = LeafBuilder::new(ops.len(), total_value_size);
for op in ops {
let (k, v) = self.op_key_value(op);
let (k, v, o) = self.op_cell(op);

leaf_builder.push(k, v);
leaf_builder.push_cell(k, v, o);
}
leaf_builder.finish()
}
Expand Down
15 changes: 13 additions & 2 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::beatree::{
branch::{BranchNodePool, BRANCH_NODE_BODY_SIZE},
index::Index,
leaf::{
node::{LeafNode, LEAF_NODE_BODY_SIZE},
node::{LeafNode, LEAF_NODE_BODY_SIZE, MAX_LEAF_VALUE_SIZE},
overflow,
store::{LeafStoreReader, LeafStoreWriter},
},
Key,
Expand Down Expand Up @@ -143,7 +144,17 @@ impl Updater {

fn ingest(&mut self, key: Key, value_change: Option<Vec<u8>>, ctx: &mut Ctx) {
self.digest_until(Some(key), ctx);
self.leaf_updater.ingest(key, value_change);

let (value_change, overflow) = if let Some(ref large_value) =
value_change.as_ref().filter(|v| v.len() > MAX_LEAF_VALUE_SIZE) {
let pages = overflow::chunk(large_value, &mut ctx.leaf_writer);
(Some(overflow::encode_cell(&pages)), true)
} else {
(value_change, false)
};

// TODO: delete or schedule delete of all pages used by the given overflow cell.
self.leaf_updater.ingest(key, value_change, overflow, |_overflow_cell| {});
}

fn complete(&mut self, ctx: &mut Ctx) {
Expand Down

0 comments on commit 45c77dc

Please sign in to comment.