diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index 117bc721..b277d4ba 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -545,12 +545,11 @@ impl PageLoader { } /// Advance the state of the given page load, blocking the current thread. - /// Fails if the I/O pool is down. /// - /// Panics if the page load needs a completion. + /// Panics if the page load needs a completion or if the I/O pool is down. /// - /// This returns `Ok(true)` if the page request has been submitted and a completion will be - /// coming. `Ok(false)` means that the page is guaranteed to be fresh. + /// This returns `true` if the page request has been submitted and a completion will be + /// coming. `false` means that the page is guaranteed to be fresh. /// /// An `IoCommand` of kind `Read` will be submitted along the I/O handle with the provided /// user-data. @@ -558,16 +557,11 @@ impl PageLoader { /// Note that the page loaded by the I/O pool may be a misprobe. You must use /// [`PageLoad::try_complete`] to verify whether the hash-table probe has completed or must be /// tried again. - pub fn probe( - &self, - load: &mut PageLoad, - io_handle: &IoHandle, - user_data: u64, - ) -> anyhow::Result { + pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool { let bucket = loop { match load.probe_sequence.next(&self.meta_map) { ProbeResult::Tombstone(_) => continue, - ProbeResult::Empty(_) => return Ok(false), + ProbeResult::Empty(_) => return false, ProbeResult::PossibleHit(bucket) => break BucketIndex(bucket), } }; @@ -580,13 +574,10 @@ impl PageLoader { user_data, }; - match io_handle.send(command) { - Ok(()) => { - load.state = PageLoadState::Submitted; - Ok(true) - } - Err(_) => anyhow::bail!("I/O pool hangup"), - } + // UNWRAP: I/O pool is not expected to hangup. + io_handle.send(command).unwrap(); + load.state = PageLoadState::Submitted; + true } } diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index 450b80a6..f30df458 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -609,7 +609,7 @@ impl Session { let merkle_update_handle = self .merkle_updater - .update_and_prove::(compact_actuals, self.witness_mode.0); + .update_and_prove::(compact_actuals, self.witness_mode.0)?; let mut tx = self.store.new_value_tx(); for (path, read_write) in actuals { diff --git a/nomt/src/merkle/mod.rs b/nomt/src/merkle/mod.rs index cfba43a0..e9570953 100644 --- a/nomt/src/merkle/mod.rs +++ b/nomt/src/merkle/mod.rs @@ -22,6 +22,7 @@ use crate::{ page_cache::{Page, PageCache, ShardIndex}, rw_pass_cell::WritePassEnvelope, store::{BucketIndex, DirtyPage, SharedMaybeBucketIndex, Store}, + task::{join_task, spawn_task, TaskResult}, HashAlgorithm, Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite, }; use threadpool::ThreadPool; @@ -214,7 +215,7 @@ impl Updater { self, read_write: Vec<(KeyPath, KeyReadWrite)>, witness: bool, - ) -> UpdateHandle { + ) -> std::io::Result { if let Some(ref warm_up) = self.warm_up { let _ = warm_up.finish_tx.send(()); } @@ -229,9 +230,8 @@ impl Updater { let shard_regions = (0..num_workers).map(ShardIndex::Shard).collect::>(); // receive warm-ups from worker. - // TODO: handle error better. let (warm_ups, warm_page_set) = if let Some(ref warm_up) = self.warm_up { - let output = warm_up.output_rx.recv().unwrap(); + let output = join_task(&warm_up.output_rx)?; (output.paths, Some(output.pages)) } else { (HashMap::new(), None) @@ -262,11 +262,11 @@ impl Updater { spawn_updater::(&self.worker_tp, params, worker_tx.clone()); } - UpdateHandle { + Ok(UpdateHandle { shared, worker_rx, num_workers, - } + }) } } @@ -462,7 +462,7 @@ impl UpdateShared { struct WarmUpHandle { finish_tx: Sender<()>, warmup_tx: Sender, - output_rx: Receiver, + output_rx: Receiver>>, } fn spawn_warm_up( @@ -473,7 +473,11 @@ fn spawn_warm_up( let (output_tx, output_rx) = channel::bounded(1); let (finish_tx, finish_rx) = channel::bounded(1); - worker_tp.execute(move || worker::run_warm_up::(params, warmup_rx, finish_rx, output_tx)); + spawn_task( + &worker_tp, + move || worker::run_warm_up::(params, warmup_rx, finish_rx), + output_tx, + ); WarmUpHandle { warmup_tx, diff --git a/nomt/src/merkle/seek.rs b/nomt/src/merkle/seek.rs index 6c58f900..4d15cc61 100644 --- a/nomt/src/merkle/seek.rs +++ b/nomt/src/merkle/seek.rs @@ -329,14 +329,12 @@ impl Seeker { } /// Try to submit as many requests as possible. - pub fn submit_all(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + pub fn submit_all(&mut self, page_set: &mut PageSet) { if !self.has_room() { - return Ok(()); + return; } - self.submit_idle_page_loads()?; - self.submit_idle_key_path_requests(page_set)?; - - Ok(()) + self.submit_idle_page_loads(); + self.submit_idle_key_path_requests(page_set); } /// Take the result of a complete request. @@ -365,7 +363,7 @@ impl Seeker { } /// Try to process the next I/O. Does not block the current thread. - pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + pub fn try_recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> { if let Ok(io) = self.io_handle.try_recv() { self.handle_completion(page_set, io)?; } @@ -374,8 +372,11 @@ impl Seeker { } /// Block on processing the next I/O. Blocks the current thread. - pub fn recv_page(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { - let io = self.io_handle.recv()?; + /// + /// Panics if the I/O pool is down. + pub fn recv_page(&mut self, page_set: &mut PageSet) -> std::io::Result<()> { + // UNWRAP: I/O pool is not expected to hangup. + let io = self.io_handle.recv().unwrap(); self.handle_completion(page_set, io)?; Ok(()) } @@ -393,35 +394,31 @@ impl Seeker { } // resubmit all idle page loads until no more remain. - fn submit_idle_page_loads(&mut self) -> anyhow::Result<()> { + fn submit_idle_page_loads(&mut self) { while let Some(slab_index) = self.idle_page_loads.pop_front() { - self.submit_idle_page_load(slab_index)?; + self.submit_idle_page_load(slab_index); } - - Ok(()) } // submit the next page for each idle key path request until backpressuring or no more progress // can be made. - fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) -> anyhow::Result<()> { + fn submit_idle_key_path_requests(&mut self, page_set: &mut PageSet) { while self.has_room() { match self.idle_requests.pop_front() { - None => return Ok(()), + None => return, Some(request_index) => { - self.submit_key_path_request(page_set, request_index)?; + self.submit_key_path_request(page_set, request_index); } } } - - Ok(()) } // submit a page load which is currently in the slab, but idle. - fn submit_idle_page_load(&mut self, slab_index: usize) -> anyhow::Result<()> { + fn submit_idle_page_load(&mut self, slab_index: usize) { if let IoRequest::Merkle(ref mut page_load) = self.io_slab[slab_index] { if !self .page_loader - .probe(page_load, &self.io_handle, slab_index as u64)? + .probe(page_load, &self.io_handle, slab_index as u64) { // PANIC: seek should really never reach fresh pages. we only request a page if // we seek to an internal node above it, and in that case the page really should @@ -434,18 +431,12 @@ impl Seeker { unreachable!() } } - - Ok(()) } // submit the next page for this key path request. - fn submit_key_path_request( - &mut self, - page_set: &mut PageSet, - request_index: usize, - ) -> anyhow::Result<()> { + fn submit_key_path_request(&mut self, page_set: &mut PageSet, request_index: usize) { let i = if request_index < self.processed { - return Ok(()); + return; } else { request_index - self.processed }; @@ -484,7 +475,8 @@ impl Seeker { let load = self.page_loader.start_load(page_id.clone()); vacant_entry.insert(vec![request_index]); let slab_index = self.io_slab.insert(IoRequest::Merkle(load)); - return self.submit_idle_page_load(slab_index); + self.submit_idle_page_load(slab_index); + return; } IoQuery::LeafPage(page_number) => { let vacant_entry = match self.io_waiters.entry(IoQuery::LeafPage(page_number)) { @@ -511,17 +503,15 @@ impl Seeker { vacant_entry.insert(vec![request_index]); assert_eq!(slab_index, self.io_slab.insert(IoRequest::Leaf(leaf_load))); - return Ok(()); + return; } } } } } - - Ok(()) } - fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> anyhow::Result<()> { + fn handle_completion(&mut self, page_set: &mut PageSet, io: CompleteIo) -> std::io::Result<()> { io.result?; let slab_index = io.command.user_data as usize; diff --git a/nomt/src/merkle/worker.rs b/nomt/src/merkle/worker.rs index cebd4e69..832ccf67 100644 --- a/nomt/src/merkle/worker.rs +++ b/nomt/src/merkle/worker.rs @@ -10,7 +10,7 @@ //! Updates are performed while the next fetch is pending, unless all fetches in //! the range have completed. -use crossbeam::channel::{Receiver, Select, Sender, TryRecvError}; +use crossbeam::channel::{Receiver, Select, TryRecvError}; use nomt_core::{ page_id::ROOT_PAGE_ID, @@ -62,8 +62,7 @@ pub(super) fn run_warm_up( params: WarmUpParams, warmup_rx: Receiver, finish_rx: Receiver<()>, - output_tx: Sender, -) { +) -> std::io::Result { let page_loader = params.store.page_loader(); let io_handle = params.store.io_pool().make_handle(); let page_io_receiver = io_handle.receiver().clone(); @@ -81,14 +80,7 @@ pub(super) fn run_warm_up( true, ); - let result = warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx); - - match result { - Err(_) => return, - Ok(res) => { - let _ = output_tx.send(res); - } - } + warm_up_phase(page_io_receiver, seeker, page_set, warmup_rx, finish_rx) } pub(super) fn run_update(params: UpdateParams) -> anyhow::Result { @@ -130,7 +122,7 @@ fn warm_up_phase( mut page_set: PageSet, warmup_rx: Receiver, finish_rx: Receiver<()>, -) -> anyhow::Result { +) -> std::io::Result { let mut select_all = Select::new(); let warmup_idx = select_all.recv(&warmup_rx); let finish_idx = select_all.recv(&finish_rx); @@ -148,14 +140,14 @@ fn warm_up_phase( continue; } - seeker.submit_all(&mut page_set)?; + seeker.submit_all(&mut page_set); if !seeker.has_room() { // block on interrupt or next page ready. let index = select_no_work.ready(); if index == finish_no_work_idx { match finish_rx.try_recv() { Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(_) => panic!("Warm-Up worker, unexpected failure of the finish channel",), Ok(()) => break, } } else if index == page_no_work_idx { @@ -169,14 +161,14 @@ fn warm_up_phase( if index == finish_idx { match finish_rx.try_recv() { Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(_) => panic!("Warm-Up worker, unexpected failure of the finish channel",), Ok(()) => break, } } else if index == warmup_idx { let warm_up_command = match warmup_rx.try_recv() { Ok(command) => command, Err(TryRecvError::Empty) => continue, - Err(e) => anyhow::bail!(e), + Err(_) => panic!("Warm-Up worker, unexpected failure of the warmup channel"), }; seeker.push(warm_up_command.key_path); @@ -193,7 +185,7 @@ fn warm_up_phase( warm_ups.insert(result.key, result); continue; } - seeker.submit_all(&mut page_set)?; + seeker.submit_all(&mut page_set); if seeker.has_live_requests() { seeker.recv_page(&mut page_set)?; } @@ -496,7 +488,7 @@ impl RangeUpdater { } } - seeker.submit_all(page_set)?; + seeker.submit_all(page_set); if !seeker.has_room() && seeker.has_live_requests() { // no way to push work until at least one page fetch has concluded. seeker.recv_page(page_set)?; @@ -515,7 +507,7 @@ impl RangeUpdater { } } else { seeker.push(self.shared.read_write[next_push].0); - seeker.submit_all(page_set)?; + seeker.submit_all(page_set); } } diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index ea302019..d02a5011 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -231,7 +231,7 @@ impl Store { let io_handle = self.io_pool().make_handle(); let mut page_load = page_loader.start_load(page_id); loop { - if !page_loader.probe(&mut page_load, &io_handle, 0)? { + if !page_loader.probe(&mut page_load, &io_handle, 0) { return Ok(None); } diff --git a/nomt/src/store/page_loader.rs b/nomt/src/store/page_loader.rs index 477f9407..f24dff42 100644 --- a/nomt/src/store/page_loader.rs +++ b/nomt/src/store/page_loader.rs @@ -14,18 +14,12 @@ impl PageLoader { } /// Advance the state of the given page load, blocking the current thread. - /// Fails if the I/O pool is down. /// - /// Panics if the page load needs a completion. + /// Panics if the page load needs a completion or if the I/O pool is down. /// - /// This returns `Ok(true)` if the page request has been submitted and a completion will be - /// coming. `Ok(false)` means that the page is guaranteed to be fresh. - pub fn probe( - &self, - load: &mut PageLoad, - io_handle: &IoHandle, - user_data: u64, - ) -> anyhow::Result { + /// This returns `true` if the page request has been submitted and a completion will be + /// coming. `false` means that the page is guaranteed to be fresh. + pub fn probe(&self, load: &mut PageLoad, io_handle: &IoHandle, user_data: u64) -> bool { self.inner.probe(load, io_handle, user_data) } }