From 8c2797d8878bf74db462e03bced850b6ef7269b7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 25 Aug 2024 12:29:34 -0500 Subject: [PATCH] feat: read and delete overflow --- nomt/src/beatree/leaf/overflow.rs | 84 ++++++++++++++++++++++++++++-- nomt/src/beatree/ops/mod.rs | 8 ++- nomt/src/beatree/ops/update/mod.rs | 10 ++-- 3 files changed, 93 insertions(+), 9 deletions(-) diff --git a/nomt/src/beatree/leaf/overflow.rs b/nomt/src/beatree/leaf/overflow.rs index f6185ae2..921e249b 100644 --- a/nomt/src/beatree/leaf/overflow.rs +++ b/nomt/src/beatree/leaf/overflow.rs @@ -17,7 +17,7 @@ use crate::{ use super::{ node::MAX_OVERFLOW_CELL_NODE_POINTERS, - store::LeafStoreWriter, + store::{LeafStoreReader, LeafStoreWriter}, }; const BODY_SIZE: usize = PAGE_SIZE - 4; @@ -72,10 +72,24 @@ pub fn chunk(value: &[u8], leaf_writer: &mut LeafStoreWriter) -> Vec cell } +/// Decode an overflow cell, returning the size of the value plus the pages numbers within the cell. +pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, impl Iterator + 'a) { + assert!(raw.len() >= 12); + assert_eq!(raw.len() % 4, 0); + + let value_size = u64::from_le_bytes(raw[0..8].try_into().unwrap()); + + let iter = raw[8..].chunks(4) + .map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap()))); + + (value_size as usize, iter) +} + /// Encode a list of page numbers into an overflow cell. -pub fn encode_cell(pages: &[PageNumber]) -> Vec { - let mut v = vec![0u8; pages.len() * 4]; - for (pn, slice) in pages.iter().zip(v.chunks_mut(4)) { +pub fn encode_cell(value_size: usize, pages: &[PageNumber]) -> Vec { + let mut v = vec![0u8; 8 + pages.len() * 4]; + v[0..8].copy_from_slice(&(value_size as u64).to_le_bytes()); + for (pn, slice) in pages[8..].iter().zip(v.chunks_mut(4)) { slice.copy_from_slice(&pn.0.to_le_bytes()); } @@ -106,3 +120,65 @@ fn total_needed_pages(value_size: usize) -> usize { fn needed_pages(size: usize) -> usize { (size + BODY_SIZE - 1) / BODY_SIZE } + +pub fn read(cell: &[u8], leaf_reader: &LeafStoreReader) -> Vec { + let (value_size, cell_pages) = decode_cell(cell); + let total_pages = total_needed_pages(value_size); + + let mut value = Vec::with_capacity(value_size); + + let mut page_numbers = Vec::with_capacity(total_pages); + page_numbers.extend(cell_pages); + + for i in 0..total_pages { + let page = leaf_reader.query(page_numbers[i]); + let (page_pns, bytes) = read_page(&page); + page_numbers.extend(page_pns); + value.extend(bytes); + + } + + assert_eq!(page_numbers.len(), total_pages); + assert_eq!(value.len(), value_size); + + value +} + +pub fn delete( + cell: &[u8], + leaf_reader: &LeafStoreReader, + leaf_writer: &mut LeafStoreWriter, +) { + let (value_size, cell_pages) = decode_cell(cell); + let total_pages = total_needed_pages(value_size); + + let mut page_numbers = Vec::with_capacity(total_pages); + page_numbers.extend(cell_pages); + + for i in 0..total_pages { + let page = leaf_reader.query(page_numbers[i]); + let (page_pns, bytes) = read_page(&page); + page_numbers.extend(page_pns); + + // stop at the first page containing value data. no more pages will have more + // page numbers to release. + if bytes.len() > 0 { break } + } + + assert_eq!(page_numbers.len(), total_pages); + + for pn in page_numbers { + leaf_writer.release(pn); + } +} + +fn read_page<'a>(page: &'a Page) -> (impl Iterator + 'a, &'a [u8]) { + let n_pages = u16::from_le_bytes(page[0..2].try_into().unwrap()) as usize; + let n_bytes = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize; + + let iter = page[2..][..n_pages * 4].chunks(4) + .map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap()))); + + let bytes = &page[2 + n_pages * 4..][..n_bytes]; + (iter, bytes) +} diff --git a/nomt/src/beatree/ops/mod.rs b/nomt/src/beatree/ops/mod.rs index 89130057..dfc24d10 100644 --- a/nomt/src/beatree/ops/mod.rs +++ b/nomt/src/beatree/ops/mod.rs @@ -41,8 +41,12 @@ pub fn lookup( let leaf = LeafNode { inner: leaf_store.query(leaf_pn) }; - // TODO: handle overflow. - let maybe_value = leaf.get(&key).map(|(v, _is_overflow)| v.to_vec()); + let maybe_value = leaf.get(&key).map(|(v, is_overflow)| if is_overflow { + leaf::overflow::read(v, leaf_store) + } else { + v.to_vec() + }); + Ok(maybe_value) } diff --git a/nomt/src/beatree/ops/update/mod.rs b/nomt/src/beatree/ops/update/mod.rs index fa4c2d08..4a75c718 100644 --- a/nomt/src/beatree/ops/update/mod.rs +++ b/nomt/src/beatree/ops/update/mod.rs @@ -148,13 +148,17 @@ impl Updater { let (value_change, overflow) = if let Some(ref large_value) = value_change.as_ref().filter(|v| v.len() > MAX_LEAF_VALUE_SIZE) { let pages = overflow::chunk(large_value, &mut ctx.leaf_writer); - (Some(overflow::encode_cell(&pages)), true) + (Some(overflow::encode_cell(large_value.len(), &pages)), true) } else { (value_change, false) }; - // TODO: delete or schedule delete of all pages used by the given overflow cell. - self.leaf_updater.ingest(key, value_change, overflow, |_overflow_cell| {}); + let delete_overflow = |overflow_cell: &[u8]| overflow::delete( + overflow_cell, + &ctx.leaf_reader, + &mut ctx.leaf_writer, + ); + self.leaf_updater.ingest(key, value_change, overflow, delete_overflow); } fn complete(&mut self, ctx: &mut Ctx) {