diff --git a/nomt/src/beatree/leaf/node.rs b/nomt/src/beatree/leaf/node.rs index 5a42803f..eac90e24 100644 --- a/nomt/src/beatree/leaf/node.rs +++ b/nomt/src/beatree/leaf/node.rs @@ -43,6 +43,9 @@ pub const MAX_LEAF_VALUE_SIZE: usize = (LEAF_NODE_BODY_SIZE / 3) - 32; /// Note that this gives an overflow value cell maximum size of 100 bytes. pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 23; +/// We use the high bit to encode whether a cell is an overflow cell. +const OVERFLOW_BIT: u16 = 1 << 15; + pub struct LeafNode { pub inner: Box, } @@ -62,29 +65,30 @@ impl LeafNode { key } - pub fn value(&self, i: usize) -> &[u8] { - let range = self.value_range(self.cell_pointers(), i); - &self.inner[range] + pub fn value(&self, i: usize) -> (&[u8], bool) { + let (range, overflow) = self.value_range(self.cell_pointers(), i); + (&self.inner[range], overflow) } - pub fn get(&self, key: &Key) -> Option<&[u8]> { + pub fn get(&self, key: &Key) -> Option<(&[u8], bool)> { let cell_pointers = self.cell_pointers(); search(cell_pointers, key) .ok() - .map(|index| &self.inner[self.value_range(cell_pointers, index)]) + .map(|index| self.value_range(cell_pointers, index)) + .map(|(range, overflow)| (&self.inner[range], overflow)) } // returns the range at which the value of a cell is stored - fn value_range(&self, cell_pointers: &[[u8; 34]], index: usize) -> Range { - let start = cell_offset(cell_pointers, index); + fn value_range(&self, cell_pointers: &[[u8; 34]], index: usize) -> (Range, bool) { + let (start, overflow) = cell_offset(cell_pointers, index); let end = if index == cell_pointers.len() - 1 { PAGE_SIZE } else { - cell_offset(cell_pointers, index + 1) + cell_offset(cell_pointers, index + 1).0 }; - start..end + (start..end, overflow) } fn cell_pointers(&self) -> &[[u8; 34]] { @@ -122,13 +126,13 @@ impl LeafBuilder { } } - pub fn push(&mut self, key: Key, value: &[u8]) { + pub fn push_cell(&mut self, key: Key, value: &[u8], overflow: bool) { assert!(self.index < self.leaf.n()); let offset = PAGE_SIZE - self.remaining_value_size; let cell_pointer = &mut self.leaf.cell_pointers_mut()[self.index]; - encode_cell_pointer(&mut cell_pointer[..], key, offset); + encode_cell_pointer(&mut cell_pointer[..], key, offset, overflow); self.leaf.inner[offset..][..value.len()].copy_from_slice(value); self.index += 1; @@ -145,16 +149,28 @@ pub fn body_size(n: usize, value_size_sum: usize) -> usize { n * 34 + value_size_sum } -fn cell_offset(cell_pointers: &[[u8; 34]], index: usize) -> usize { +// get the cell offset and whether the cell is an overflow cell. +fn cell_offset(cell_pointers: &[[u8; 34]], index: usize) -> (usize, bool) { let mut buf = [0; 2]; buf.copy_from_slice(&cell_pointers[index][32..34]); - u16::from_le_bytes(buf) as usize + let val = u16::from_le_bytes(buf); + ( + (val & !OVERFLOW_BIT) as usize, + val & OVERFLOW_BIT == OVERFLOW_BIT, + ) } -// panics if offset is bigger then u16 or `cell` length is less than 34. -fn encode_cell_pointer(cell: &mut [u8], key: [u8; 32], offset: usize) { +// panics if offset is bigger than 2^15 - 1. +fn encode_cell_pointer(cell: &mut [u8], key: [u8; 32], offset: usize, overflow: bool) { + let mut val = u16::try_from(offset).unwrap(); + assert!(val < OVERFLOW_BIT); + + if overflow { + val |= OVERFLOW_BIT; + } + cell[0..32].copy_from_slice(&key); - cell[32..34].copy_from_slice(&(u16::try_from(offset).unwrap()).to_le_bytes()); + cell[32..34].copy_from_slice(&val.to_le_bytes()); } // look for key in the node. the return value has the same semantics as std binary_search*. diff --git a/nomt/src/beatree/leaf/overflow.rs b/nomt/src/beatree/leaf/overflow.rs index cd6ea7e9..55066df3 100644 --- a/nomt/src/beatree/leaf/overflow.rs +++ b/nomt/src/beatree/leaf/overflow.rs @@ -73,6 +73,16 @@ pub fn chunk(value: &[u8], leaf_writer: &mut LeafStoreWriter) -> Vec cell } +/// Encode a list of page numbers into an overflow cell. +pub fn encode_cell(pages: &[PageNumber]) -> Vec { + let mut v = vec![0u8; pages.len() * 4]; + for (pn, slice) in pages.iter().zip(v.chunks_mut(4)) { + slice.copy_from_slice(&pn.0.to_le_bytes()); + } + + v +} + fn total_needed_pages(value_size: usize) -> usize { let mut encoded_size = value_size; let mut total_pages = needed_pages(encoded_size); diff --git a/nomt/src/beatree/ops/mod.rs b/nomt/src/beatree/ops/mod.rs index 248e81b3..37580ec2 100644 --- a/nomt/src/beatree/ops/mod.rs +++ b/nomt/src/beatree/ops/mod.rs @@ -45,7 +45,8 @@ pub fn lookup( }; // TODO: handle overflow. - Ok(leaf.get(&key).map(|v| v.to_vec())) + let maybe_value = leaf.get(&key).map(|(v, _is_overflow)| v.to_vec()); + Ok(maybe_value) } /// Binary search a branch node for the child node containing the key. This returns the last child diff --git a/nomt/src/beatree/ops/update/leaf.rs b/nomt/src/beatree/ops/update/leaf.rs index 2763b2d2..6d055725 100644 --- a/nomt/src/beatree/ops/update/leaf.rs +++ b/nomt/src/beatree/ops/update/leaf.rs @@ -34,11 +34,12 @@ impl BaseLeaf { self.node.key(i) } - fn key_value(&self, i: usize) -> (Key, &[u8]) { - (self.node.key(i), self.node.value(i)) + fn key_cell(&self, i: usize) -> (Key, &[u8], bool) { + let (value, overflow) = self.node.value(i); + (self.node.key(i), value, overflow) } - fn next_value(&self) -> &[u8] { + fn next_cell(&self) -> (&[u8], bool) { self.node.value(self.iter_pos) } @@ -48,7 +49,7 @@ impl BaseLeaf { } enum LeafOp { - Insert(Key, Vec), + Insert(Key, Vec, bool), Keep(usize, usize), } @@ -95,11 +96,19 @@ impl LeafUpdater { self.cutoff = cutoff; } - pub fn ingest(&mut self, key: Key, value_change: Option>) { - self.keep_up_to(Some(&key)); + /// Ingest a key/cell pair. Provide a callback which is called if this deletes an existing + /// overflow cell. + pub fn ingest( + &mut self, + key: Key, + value_change: Option>, + overflow: bool, + with_deleted_overflow: impl FnMut(&[u8]), + ) { + self.keep_up_to(Some(&key), with_deleted_overflow); if let Some(value) = value_change { - self.ops.push(LeafOp::Insert(key, value)); + self.ops.push(LeafOp::Insert(key, value, overflow)); self.bulk_split_step(self.ops.len() - 1); } } @@ -116,7 +125,8 @@ impl LeafUpdater { branch_updater.possibly_delete(base.separator); } - self.keep_up_to(None); + // no cells are going to be deleted from this point onwards - this keeps everything. + self.keep_up_to(None, |_| {}); // note: if we need a merge, it'd be more efficient to attempt to combine it with the last // leaf of the bulk split first rather than pushing the ops onwards. probably irrelevant @@ -163,7 +173,7 @@ impl LeafUpdater { } } - fn keep_up_to(&mut self, up_to: Option<&Key>) { + fn keep_up_to(&mut self, up_to: Option<&Key>, mut with_deleted_overflow: impl FnMut(&[u8])) { while let Some(next_key) = self.base.as_ref().and_then(|b| b.next_key()) { let Some(ref mut base_node) = self.base else { return; @@ -175,12 +185,15 @@ impl LeafUpdater { break; } + let (val, overflow) = base_node.next_cell(); if order == Ordering::Greater { - let size = base_node.next_value().len(); - self.ops.push(LeafOp::Keep(base_node.iter_pos, size)); + self.ops.push(LeafOp::Keep(base_node.iter_pos, val.len())); base_node.advance_iter(); self.bulk_split_step(self.ops.len() - 1); } else { + if overflow { + with_deleted_overflow(val); + } base_node.advance_iter(); } } @@ -191,7 +204,7 @@ impl LeafUpdater { fn bulk_split_step(&mut self, op_index: usize) { let item_size = match self.ops[op_index] { LeafOp::Keep(_, size) => size, - LeafOp::Insert(_, ref val) => val.len(), + LeafOp::Insert(_, ref val, _) => val.len(), }; let body_size_after = self.gauge.body_size_after(item_size); @@ -292,7 +305,7 @@ impl LeafUpdater { while left_gauge.body_size() < midpoint { let item_size = match self.ops[split_point] { LeafOp::Keep(_, size) => size, - LeafOp::Insert(_, ref val) => val.len(), + LeafOp::Insert(_, ref val, _) => val.len(), }; if left_gauge.body_size_after(item_size) > LEAF_NODE_BODY_SIZE { @@ -327,7 +340,7 @@ impl LeafUpdater { for op in &self.ops[split_point..] { let item_size = match op { LeafOp::Keep(_, size) => *size, - LeafOp::Insert(_, ref val) => val.len(), + LeafOp::Insert(_, ref val, _) => val.len(), }; right_gauge.ingest(item_size); @@ -365,24 +378,24 @@ impl LeafUpdater { // then replace `Keep` ops with pure key-value ops, preparing for the base to be changed. for op in self.ops.iter_mut() { let LeafOp::Keep(i, _) = *op else { continue }; - let (k, v) = base.key_value(i); - *op = LeafOp::Insert(k, v.to_vec()); + let (k, v, o) = base.key_cell(i); + *op = LeafOp::Insert(k, v.to_vec(), o); } } fn op_key(&self, leaf_op: &LeafOp) -> Key { // UNWRAP: `Keep` leaf ops only exist when base is `Some`. match leaf_op { - LeafOp::Insert(k, _) => *k, + LeafOp::Insert(k, _, _) => *k, LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key(*i), } } - fn op_key_value<'a>(&'a self, leaf_op: &'a LeafOp) -> (Key, &'a [u8]) { + fn op_cell<'a>(&'a self, leaf_op: &'a LeafOp) -> (Key, &'a [u8], bool) { // UNWRAP: `Keep` leaf ops only exist when base is `Some`. match leaf_op { - LeafOp::Insert(k, v) => (*k, &v[..]), - LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key_value(*i), + LeafOp::Insert(k, v, o) => (*k, &v[..], *o), + LeafOp::Keep(i, _) => self.base.as_ref().unwrap().key_cell(*i), } } @@ -391,15 +404,15 @@ impl LeafUpdater { .iter() .map(|op| match op { LeafOp::Keep(_, size) => *size, - LeafOp::Insert(_, v) => v.len(), + LeafOp::Insert(_, v, _) => v.len(), }) .sum(); let mut leaf_builder = LeafBuilder::new(ops.len(), total_value_size); for op in ops { - let (k, v) = self.op_key_value(op); + let (k, v, o) = self.op_cell(op); - leaf_builder.push(k, v); + leaf_builder.push_cell(k, v, o); } leaf_builder.finish() } diff --git a/nomt/src/beatree/ops/update/mod.rs b/nomt/src/beatree/ops/update/mod.rs index 074b1d69..85d87a7c 100644 --- a/nomt/src/beatree/ops/update/mod.rs +++ b/nomt/src/beatree/ops/update/mod.rs @@ -9,7 +9,8 @@ use crate::beatree::{ branch::{BranchNodePool, BRANCH_NODE_BODY_SIZE}, index::Index, leaf::{ - node::{LeafNode, LEAF_NODE_BODY_SIZE}, + node::{LeafNode, LEAF_NODE_BODY_SIZE, MAX_LEAF_VALUE_SIZE}, + overflow, store::{LeafStoreReader, LeafStoreWriter}, }, Key, @@ -148,7 +149,20 @@ impl Updater { fn ingest(&mut self, key: Key, value_change: Option>, ctx: &mut Ctx) { self.digest_until(Some(key), ctx); - self.leaf_updater.ingest(key, value_change); + + let (value_change, overflow) = if let Some(ref large_value) = value_change + .as_ref() + .filter(|v| v.len() > MAX_LEAF_VALUE_SIZE) + { + let pages = overflow::chunk(large_value, &mut ctx.leaf_writer); + (Some(overflow::encode_cell(&pages)), true) + } else { + (value_change, false) + }; + + // TODO: delete or schedule delete of all pages used by the given overflow cell. + self.leaf_updater + .ingest(key, value_change, overflow, |_overflow_cell| {}); } fn complete(&mut self, ctx: &mut Ctx) {