Skip to content

Commit

Permalink
feat: rework new seeker to be more aggressive in pushing I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Aug 23, 2024
1 parent ce77bbf commit 9ff8125
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 198 deletions.
154 changes: 73 additions & 81 deletions nomt/src/commit/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
//! Updates are performed while the next fetch is pending, unless all fetches in
//! the range have completed.
use crossbeam::channel::{Receiver, Sender, TryRecvError};
use crossbeam::channel::{Receiver, Select, Sender};

use nomt_core::{
page_id::{PageId, ROOT_PAGE_ID},
proof::PathProofTerminal,
trie::{KeyPath, Node, NodeHasher, ValueHash},
};

use std::sync::{Arc, Barrier};
use std::{
sync::{Arc, Barrier},
time::Duration,
};

use super::{
CommitCommand, CommitShared, KeyReadWrite, RootPagePending, ToWorker, WarmUpCommand,
Expand Down Expand Up @@ -93,64 +96,55 @@ fn warm_up_phase(
read_pass: ReadPass<ShardIndex>,
mut seeker: Seeker,
) -> anyhow::Result<()> {
const CHECK_PREPARE_DURATION: Duration = Duration::from_micros(250);

let mut preparing = false;
loop {
let (block, push) = match seeker.advance(&read_pass)? {
Interrupt::NoMoreWork => {
if preparing {
match seeker.advance_deadline(&read_pass, CHECK_PREPARE_DURATION)? {
Interrupt::HasRoom => {
let is_empty = seeker.is_empty();
if preparing && is_empty {
return Ok(());
} else {
(true, true)
} else if preparing {
continue;
}
}
Interrupt::HasRoom => {
if preparing {
(false, false)

let mut select = Select::new();
let commit_idx = select.recv(&comms.commit_rx);
let _ = select.recv(&comms.warmup_rx);

let selected = if is_empty {
select.select()
} else {
match select.try_select() {
Err(_) => continue,
Ok(selected) => selected,
}
};

if selected.index() == commit_idx {
match selected.recv(&comms.commit_rx)? {
ToWorker::Prepare => {
preparing = true;
if is_empty {
return Ok(());
} else {
continue;
}
}
ToWorker::Commit(_) => unreachable!(),
}
} else {
(false, true)
seeker.push(selected.recv(&comms.warmup_rx)?.key_path)
}
}
Interrupt::Completion(_) | Interrupt::SpecialPageCompletion => (false, false),
};

if preparing || !push {
continue;
}

let (commit_msg, warmup_msg) = if block {
crossbeam_channel::select! {
recv(comms.commit_rx) -> msg => (Some(msg?), None),
recv(comms.warmup_rx) -> msg => (None, Some(msg?)),
Interrupt::Deadline if !preparing => {
if let Ok(ToWorker::Prepare) = comms.commit_rx.try_recv() {
preparing = true;
}
}
} else {
(
match comms.commit_rx.try_recv() {
Ok(msg) => Some(msg),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => anyhow::bail!(TryRecvError::Disconnected),
},
match comms.warmup_rx.try_recv() {
Ok(msg) => Some(msg),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => anyhow::bail!(TryRecvError::Disconnected),
},
)
_ => continue,
};

match commit_msg {
None => {}
Some(ToWorker::Prepare) => {
preparing = true;
continue;
}
// prepare is always sent before commit.
Some(ToWorker::Commit(_)) => unreachable!(),
}

match warmup_msg {
None => {}
Some(warm_up) => seeker.push(warm_up.key_path),
}
}
}

Expand Down Expand Up @@ -467,38 +461,19 @@ impl<H: NodeHasher> RangeCommitter<H> {
let mut pushes = 0;
let mut skips = 0;

let mut conclude_needs_extra = false;

// 1. drive until work is done.
loop {
if start_index >= self.range_end && !conclude_needs_extra {
// PANIC: walker was configured with a parent page.
let (new_nodes, diffs) = match self.page_walker.conclude(&mut self.write_pass) {
Ok(Output::Root(_, _)) => unreachable!(),
Ok(Output::ChildPageRoots(new_nodes, diffs)) => (new_nodes, diffs),
Err((NeedsPage(page), page_walker)) => {
self.page_walker = page_walker;
seeker.push_single_request(page);
conclude_needs_extra = true;
continue;
}
};

assert!(!diffs.contains_key(&ROOT_PAGE_ID));
output.page_diffs = diffs;

self.shared.push_pending_root_nodes(new_nodes);

return Ok(self.write_pass.consume());
}

match seeker.advance(self.write_pass.downgrade())? {
Interrupt::NoMoreWork | Interrupt::HasRoom => {
Interrupt::HasRoom => {
let next_push = start_index + pushes;
if next_push < self.range_end {
seeker.push(self.shared.read_write[next_push].0);
pushes += 1;
seeker.push(self.shared.read_write[next_push].0);
} else if seeker.is_empty() {
break;
}
}
Interrupt::Deadline => {}
Interrupt::Completion(seek_result) => {
// skip completions until we're past the end of the last batch.
if skips > 0 {
Expand All @@ -519,14 +494,31 @@ impl<H: NodeHasher> RangeCommitter<H> {
start_index = end_index;
}
Interrupt::SpecialPageCompletion => {
if conclude_needs_extra {
conclude_needs_extra = false;
} else {
self.reattempt_advance(seeker, output);
}
self.reattempt_advance(seeker, output);
}
}
}

// 2. conclude, driving additional page fetches as necessary.
loop {
// PANIC: walker was configured with a parent page.
let (new_nodes, diffs) = match self.page_walker.conclude(&mut self.write_pass) {
Ok(Output::Root(_, _)) => unreachable!(),
Ok(Output::ChildPageRoots(new_nodes, diffs)) => (new_nodes, diffs),
Err((NeedsPage(page), page_walker)) => {
self.page_walker = page_walker;
drive_page_fetch(seeker, self.write_pass.downgrade(), page)?;
continue;
}
};

assert!(!diffs.contains_key(&ROOT_PAGE_ID));
output.page_diffs = diffs;

self.shared.push_pending_root_nodes(new_nodes);

return Ok(self.write_pass.consume());
}
}
}

Expand Down
Loading

0 comments on commit 9ff8125

Please sign in to comment.