Skip to content

Commit

Permalink
feat: warm-up worker properly propagates errors
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriele-0201 authored and pepyakin committed Feb 12, 2025
1 parent ec0a0b7 commit daac3a3
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 89 deletions.
27 changes: 9 additions & 18 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,29 +545,23 @@ impl PageLoader {
}

/// Advance the state of the given page load, blocking the current thread.
/// Fails if the I/O pool is down.
///
/// Panics if the page load needs a completion.
/// Panics if the page load needs a completion or if the I/O pool is down.
///
/// This returns `Ok(true)` if the page request has been submitted and a completion will be
/// coming. `Ok(false)` means that the page is guaranteed to be fresh.
/// This returns `true` if the page request has been submitted and a completion will be
/// coming. `false` means that the page is guaranteed to be fresh.
///
/// An `IoCommand` of kind `Read` will be submitted along the I/O handle with the provided
/// user-data.
///
/// Note that the page loaded by the I/O pool may be a misprobe. You must use
/// [`PageLoad::try_complete`] to verify whether the hash-table probe has completed or must be
/// tried again.
pub fn probe(
&self,
load: &mut PageLoad,
io_handle: &IoHandle,
user_data: u64,
) -> anyhow::Result<bool> {
pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool {
let bucket = loop {
match load.probe_sequence.next(&self.meta_map) {
ProbeResult::Tombstone(_) => continue,
ProbeResult::Empty(_) => return Ok(false),
ProbeResult::Empty(_) => return false,
ProbeResult::PossibleHit(bucket) => break BucketIndex(bucket),
}
};
Expand All @@ -580,13 +574,10 @@ impl PageLoader {
user_data,
};

match io_handle.send(command) {
Ok(()) => {
load.state = PageLoadState::Submitted;
Ok(true)
}
Err(_) => anyhow::bail!("I/O pool hangup"),
}
// UNWRAP: I/O pool is not expected to hangup.
io_handle.send(command).unwrap();
load.state = PageLoadState::Submitted;
true
}
}

Expand Down
2 changes: 1 addition & 1 deletion nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ impl<T: HashAlgorithm> Session<T> {

let merkle_update_handle = self
.merkle_updater
.update_and_prove::<T>(compact_actuals, self.witness_mode.0);
.update_and_prove::<T>(compact_actuals, self.witness_mode.0)?;

let mut tx = self.store.new_value_tx();
for (path, read_write) in actuals {
Expand Down
18 changes: 11 additions & 7 deletions nomt/src/merkle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
page_cache::{Page, PageCache, ShardIndex},
rw_pass_cell::WritePassEnvelope,
store::{BucketIndex, DirtyPage, SharedMaybeBucketIndex, Store},
task::{join_task, spawn_task, TaskResult},
HashAlgorithm, Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite,
};
use threadpool::ThreadPool;
Expand Down Expand Up @@ -214,7 +215,7 @@ impl Updater {
self,
read_write: Vec<(KeyPath, KeyReadWrite)>,
witness: bool,
) -> UpdateHandle {
) -> std::io::Result<UpdateHandle> {
if let Some(ref warm_up) = self.warm_up {
let _ = warm_up.finish_tx.send(());
}
Expand All @@ -229,9 +230,8 @@ impl Updater {
let shard_regions = (0..num_workers).map(ShardIndex::Shard).collect::<Vec<_>>();

// receive warm-ups from worker.
// TODO: handle error better.
let (warm_ups, warm_page_set) = if let Some(ref warm_up) = self.warm_up {
let output = warm_up.output_rx.recv().unwrap();
let output = join_task(&warm_up.output_rx)?;
(output.paths, Some(output.pages))
} else {
(HashMap::new(), None)
Expand Down Expand Up @@ -262,11 +262,11 @@ impl Updater {
spawn_updater::<H>(&self.worker_tp, params, worker_tx.clone());
}

UpdateHandle {
Ok(UpdateHandle {
shared,
worker_rx,
num_workers,
}
})
}
}

Expand Down Expand Up @@ -462,7 +462,7 @@ impl UpdateShared {
struct WarmUpHandle {
finish_tx: Sender<()>,
warmup_tx: Sender<WarmUpCommand>,
output_rx: Receiver<WarmUpOutput>,
output_rx: Receiver<TaskResult<std::io::Result<WarmUpOutput>>>,
}

fn spawn_warm_up<H: HashAlgorithm>(
Expand All @@ -473,7 +473,11 @@ fn spawn_warm_up<H: HashAlgorithm>(
let (output_tx, output_rx) = channel::bounded(1);
let (finish_tx, finish_rx) = channel::bounded(1);

worker_tp.execute(move || worker::run_warm_up::<H>(params, warmup_rx, finish_rx, output_tx));
spawn_task(
&worker_tp,
move || worker::run_warm_up::<H>(params, warmup_rx, finish_rx),
output_tx,
);

WarmUpHandle {
warmup_tx,
Expand Down
56 changes: 23 additions & 33 deletions nomt/src/merkle/seek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,12 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Try to submit as many requests as possible.
pub fn submit_all(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
pub fn submit_all(&mut self, page_set: &mut PageSet) {
if !self.has_room() {
return Ok(());
return;
}
self.submit_idle_page_loads()?;
self.submit_idle_key_path_requests(page_set)?;

Ok(())
self.submit_idle_page_loads();
self.submit_idle_key_path_requests(page_set);
}

/// Take the result of a complete request.
Expand Down Expand Up @@ -365,7 +363,7 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Try to process the next I/O. Does not block the current thread.
pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> {
if let Ok(io) = self.io_handle.try_recv() {
self.handle_completion(page_set, io)?;
}
Expand All @@ -374,8 +372,11 @@ impl<H: HashAlgorithm> Seeker<H> {
}

/// Block on processing the next I/O. Blocks the current thread.
pub fn recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
let io = self.io_handle.recv()?;
///
/// Panics if the I/O pool is down.
pub fn recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> {
// UNWRAP: I/O pool is not expected to hangup.
let io = self.io_handle.recv().unwrap();
self.handle_completion(page_set, io)?;
Ok(())
}
Expand All @@ -393,35 +394,31 @@ impl<H: HashAlgorithm> Seeker<H> {
}

// resubmit all idle page loads until no more remain.
fn submit_idle_page_loads(&mut self) -> anyhow::Result<()> {
fn submit_idle_page_loads(&mut self) {
while let Some(slab_index) = self.idle_page_loads.pop_front() {
self.submit_idle_page_load(slab_index)?;
self.submit_idle_page_load(slab_index);
}

Ok(())
}

// submit the next page for each idle key path request until backpressuring or no more progress
// can be made.
fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> {
fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) {
while self.has_room() {
match self.idle_requests.pop_front() {
None => return Ok(()),
None => return,
Some(request_index) => {
self.submit_key_path_request(page_set, request_index)?;
self.submit_key_path_request(page_set, request_index);
}
}
}

Ok(())
}

// submit a page load which is currently in the slab, but idle.
fn submit_idle_page_load(&mut self, slab_index: usize) -> anyhow::Result<()> {
fn submit_idle_page_load(&mut self, slab_index: usize) {
if let IoRequest::Merkle(ref mut page_load) = self.io_slab[slab_index] {
if !self
.page_loader
.probe(page_load, &self.io_handle, slab_index as u64)?
.probe(page_load, &self.io_handle, slab_index as u64)
{
// PANIC: seek should really never reach fresh pages. we only request a page if
// we seek to an internal node above it, and in that case the page really should
Expand All @@ -434,18 +431,12 @@ impl<H: HashAlgorithm> Seeker<H> {
unreachable!()
}
}

Ok(())
}

// submit the next page for this key path request.
fn submit_key_path_request(
&mut self,
page_set: &mut PageSet,
request_index: usize,
) -> anyhow::Result<()> {
fn submit_key_path_request(&mut self, page_set: &mut PageSet, request_index: usize) {
let i = if request_index < self.processed {
return Ok(());
return;
} else {
request_index - self.processed
};
Expand Down Expand Up @@ -484,7 +475,8 @@ impl<H: HashAlgorithm> Seeker<H> {
let load = self.page_loader.start_load(page_id.clone());
vacant_entry.insert(vec![request_index]);
let slab_index = self.io_slab.insert(IoRequest::Merkle(load));
return self.submit_idle_page_load(slab_index);
self.submit_idle_page_load(slab_index);
return;
}
IoQuery::LeafPage(page_number) => {
let vacant_entry = match self.io_waiters.entry(IoQuery::LeafPage(page_number)) {
Expand All @@ -511,17 +503,15 @@ impl<H: HashAlgorithm> Seeker<H> {

vacant_entry.insert(vec![request_index]);
assert_eq!(slab_index, self.io_slab.insert(IoRequest::Leaf(leaf_load)));
return Ok(());
return;
}
}
}
}
}

Ok(())
}

fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> anyhow::Result<()> {
fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> std::io::Result<()> {
io.result?;
let slab_index = io.command.user_data as usize;

Expand Down
30 changes: 11 additions & 19 deletions nomt/src/merkle/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! Updates are performed while the next fetch is pending, unless all fetches in
//! the range have completed.
use crossbeam::channel::{Receiver, Select, Sender, TryRecvError};
use crossbeam::channel::{Receiver, Select, TryRecvError};

use nomt_core::{
page_id::ROOT_PAGE_ID,
Expand Down Expand Up @@ -62,8 +62,7 @@ pub(super) fn run_warm_up<H: HashAlgorithm>(
params: WarmUpParams,
warmup_rx: Receiver<WarmUpCommand>,
finish_rx: Receiver<()>,
output_tx: Sender<WarmUpOutput>,
) {
) -> std::io::Result<WarmUpOutput> {
let page_loader = params.store.page_loader();
let io_handle = params.store.io_pool().make_handle();
let page_io_receiver = io_handle.receiver().clone();
Expand All @@ -81,14 +80,7 @@ pub(super) fn run_warm_up<H: HashAlgorithm>(
true,
);

let result = warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx);

match result {
Err(_) => return,
Ok(res) => {
let _ = output_tx.send(res);
}
}
warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx)
}

pub(super) fn run_update<H: HashAlgorithm>(params: UpdateParams) -> anyhow::Result<WorkerOutput> {
Expand Down Expand Up @@ -130,7 +122,7 @@ fn warm_up_phase<H: HashAlgorithm>(
mut page_set: PageSet,
warmup_rx: Receiver<WarmUpCommand>,
finish_rx: Receiver<()>,
) -> anyhow::Result<WarmUpOutput> {
) -> std::io::Result<WarmUpOutput> {
let mut select_all = Select::new();
let warmup_idx = select_all.recv(&warmup_rx);
let finish_idx = select_all.recv(&finish_rx);
Expand All @@ -148,14 +140,14 @@ fn warm_up_phase<H: HashAlgorithm>(
continue;
}

seeker.submit_all(&mut page_set)?;
seeker.submit_all(&mut page_set);
if !seeker.has_room() {
// block on interrupt or next page ready.
let index = select_no_work.ready();
if index == finish_no_work_idx {
match finish_rx.try_recv() {
Err(TryRecvError::Empty) => continue,
Err(e) => anyhow::bail!(e),
Err(_) => panic!("Warm-Up worker, unexpected failure of the finish channel",),
Ok(()) => break,
}
} else if index == page_no_work_idx {
Expand All @@ -169,14 +161,14 @@ fn warm_up_phase<H: HashAlgorithm>(
if index == finish_idx {
match finish_rx.try_recv() {
Err(TryRecvError::Empty) => continue,
Err(e) => anyhow::bail!(e),
Err(_) => panic!("Warm-Up worker, unexpected failure of the finish channel",),
Ok(()) => break,
}
} else if index == warmup_idx {
let warm_up_command = match warmup_rx.try_recv() {
Ok(command) => command,
Err(TryRecvError::Empty) => continue,
Err(e) => anyhow::bail!(e),
Err(_) => panic!("Warm-Up worker, unexpected failure of the warmup channel"),
};

seeker.push(warm_up_command.key_path);
Expand All @@ -193,7 +185,7 @@ fn warm_up_phase<H: HashAlgorithm>(
warm_ups.insert(result.key, result);
continue;
}
seeker.submit_all(&mut page_set)?;
seeker.submit_all(&mut page_set);
if seeker.has_live_requests() {
seeker.recv_page(&mut page_set)?;
}
Expand Down Expand Up @@ -496,7 +488,7 @@ impl<H: HashAlgorithm> RangeUpdater<H> {
}
}

seeker.submit_all(page_set)?;
seeker.submit_all(page_set);
if !seeker.has_room() && seeker.has_live_requests() {
// no way to push work until at least one page fetch has concluded.
seeker.recv_page(page_set)?;
Expand All @@ -515,7 +507,7 @@ impl<H: HashAlgorithm> RangeUpdater<H> {
}
} else {
seeker.push(self.shared.read_write[next_push].0);
seeker.submit_all(page_set)?;
seeker.submit_all(page_set);
}
}

Expand Down
2 changes: 1 addition & 1 deletion nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl Store {
let io_handle = self.io_pool().make_handle();
let mut page_load = page_loader.start_load(page_id);
loop {
if !page_loader.probe(&mut page_load, &io_handle, 0)? {
if !page_loader.probe(&mut page_load, &io_handle, 0) {
return Ok(None);
}

Expand Down
14 changes: 4 additions & 10 deletions nomt/src/store/page_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,12 @@ impl PageLoader {
}

/// Advance the state of the given page load, blocking the current thread.
/// Fails if the I/O pool is down.
///
/// Panics if the page load needs a completion.
/// Panics if the page load needs a completion or if the I/O pool is down.
///
/// This returns `Ok(true)` if the page request has been submitted and a completion will be
/// coming. `Ok(false)` means that the page is guaranteed to be fresh.
pub fn probe(
&self,
load: &mut PageLoad,
io_handle: &IoHandle,
user_data: u64,
) -> anyhow::Result<bool> {
/// This returns `true` if the page request has been submitted and a completion will be
/// coming. `false` means that the page is guaranteed to be fresh.
pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool {
self.inner.probe(load, io_handle, user_data)
}
}

0 comments on commit daac3a3

Please sign in to comment.