Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: preload needed beatree leaves before update #326

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions nomt/src/beatree/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use free_list::FreeList;

mod free_list;

/// The number of a page.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
/// The number of a page
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PageNumber(pub u32);

impl PageNumber {
Expand Down Expand Up @@ -64,15 +64,8 @@ impl AllocatorReader {

/// Returns the page with the specified page number. Blocks the current thread.
pub fn query(&self, pn: PageNumber) -> Box<Page> {
let page = Box::new(Page::zeroed());

let command = IoCommand {
kind: IoKind::Read(self.store_file.as_raw_fd(), pn.0 as u64, page),
user_data: 0,
};

self.io_handle
.send(command)
.send(self.io_command(pn, 0))
.expect("I/O store worker dropped");

// wait for completion
Expand All @@ -82,6 +75,21 @@ impl AllocatorReader {

page
}

/// Get a reference to the I/O handle.
pub fn io_handle(&self) -> &IoHandle {
&self.io_handle
}

/// Create an I/O command for querying a page by number.
pub fn io_command(&self, pn: PageNumber, user_data: u64) -> IoCommand {
let page = Box::new(Page::zeroed());

IoCommand {
kind: IoKind::Read(self.store_file.as_raw_fd(), pn.0 as u64, page),
user_data,
}
}
}

impl AllocatorWriter {
Expand Down
11 changes: 10 additions & 1 deletion nomt/src/beatree/leaf/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
allocator::{AllocatorCommitOutput, AllocatorReader, AllocatorWriter, PageNumber},
leaf::node::LeafNode,
},
io::{IoPool, Page},
io::{IoPool, IoCommand, IoHandle, Page},
};

use std::fs::File;
Expand Down Expand Up @@ -58,6 +58,15 @@ impl LeafStoreReader {
pub fn query(&self, pn: PageNumber) -> Box<Page> {
self.allocator_reader.query(pn)
}

pub fn io_handle(&self) -> &IoHandle {
self.allocator_reader.io_handle()
}

/// Create an I/O command for querying a page by number.
pub fn io_command(&self, pn: PageNumber, user_data: u64) -> IoCommand {
self.allocator_reader.io_command(pn, user_data)
}
}

impl LeafStoreWriter {
Expand Down
54 changes: 45 additions & 9 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use bitvec::prelude::*;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

use crate::beatree::{
allocator::PageNumber,
Expand Down Expand Up @@ -49,6 +49,8 @@ pub fn update(
leaf_writer: &mut LeafStoreWriter,
bbn_writer: &mut bbn::BbnStoreWriter,
) -> Result<Vec<BranchId>> {
let leaf_pages = preload_leaves(leaf_reader, &bbn_index, &bnp, changeset.keys().cloned())?;

let mut ctx = Ctx {
bbn_index,
bbn_writer,
Expand All @@ -57,7 +59,7 @@ pub fn update(
leaf_writer,
};

let mut updater = Updater::new(&ctx);
let mut updater = Updater::new(&ctx, leaf_pages);
for (key, value_change) in changeset {
updater.ingest(*key, value_change.clone(), &mut ctx);
}
Expand All @@ -79,10 +81,11 @@ struct Updater {
branch_updater: BranchUpdater,
leaf_updater: LeafUpdater,
obsolete_branches: Vec<BranchId>,
leaf_pages: HashMap<PageNumber, LeafNode>,
}

impl Updater {
fn new(ctx: &Ctx) -> Self {
fn new(ctx: &Ctx, mut leaf_pages: HashMap<PageNumber, LeafNode>) -> Self {
let first = ctx.bbn_index.first();

// UNWRAP: all nodes in index must exist.
Expand Down Expand Up @@ -115,9 +118,8 @@ impl Updater {
})
.map(|(id, separator)| BaseLeaf {
id,
node: LeafNode {
inner: ctx.leaf_reader.query(id),
},
node: leaf_pages.remove(&id)
.unwrap_or_else(|| LeafNode { inner: ctx.leaf_reader.query(id) }),
iter_pos: 0,
separator,
});
Expand All @@ -140,6 +142,7 @@ impl Updater {
branch_updater,
leaf_updater,
obsolete_branches: Vec::new(),
leaf_pages,
}
}

Expand Down Expand Up @@ -209,9 +212,8 @@ impl Updater {
fn reset_leaf_base(&mut self, target: Key, ctx: &Ctx) -> Result<(), ()> {
let branch = self.branch_updater.base().ok_or(())?;
let (i, leaf_pn) = super::search_branch(&branch.node, target).ok_or(())?;
let leaf = LeafNode {
inner: ctx.leaf_reader.query(leaf_pn),
};
let leaf = self.leaf_pages.remove(&leaf_pn)
.unwrap_or_else(|| LeafNode { inner: ctx.leaf_reader.query(leaf_pn) });

let separator = reconstruct_key(branch.node.prefix(), branch.node.separator(i));

Expand Down Expand Up @@ -258,3 +260,37 @@ pub fn reconstruct_key(prefix: &BitSlice<u8, Msb0>, separator: &BitSlice<u8, Msb
key.view_bits_mut::<Msb0>()[prefix.len()..][..separator.len()].copy_from_bitslice(separator);
key
}

fn preload_leaves(
leaf_reader: &LeafStoreReader,
bbn_index: &Index,
bnp: &BranchNodePool,
keys: impl IntoIterator<Item=Key>,
) -> Result<HashMap<PageNumber, LeafNode>> {
let mut leaf_pages = HashMap::new();
let mut last_pn = None;

let mut submissions = 0;
for key in keys {
let Some(branch_id) = bbn_index.lookup(key) else { continue };
// UNWRAP: all branches in index exist.
let branch = bnp.checkout(branch_id).unwrap();
let Some((_, leaf_pn)) = super::search_branch(&branch, key) else { continue };
if last_pn == Some(leaf_pn) { continue }
last_pn = Some(leaf_pn);
leaf_reader.io_handle().send(leaf_reader.io_command(leaf_pn, leaf_pn.0 as u64))
.expect("I/O Pool Disconnected");

submissions += 1;
}

for _ in 0..submissions {
let completion = leaf_reader.io_handle().recv().expect("I/O Pool Disconnected");
completion.result?;
let pn = PageNumber(completion.command.user_data as u32);
let page = completion.command.kind.unwrap_buf();
leaf_pages.insert(pn, LeafNode { inner: page });
}

Ok(leaf_pages)
}