diff --git a/nomt/src/beatree/ops/update/leaf_stage.rs b/nomt/src/beatree/ops/update/leaf_stage.rs index 855341d6..ba26e669 100644 --- a/nomt/src/beatree/ops/update/leaf_stage.rs +++ b/nomt/src/beatree/ops/update/leaf_stage.rs @@ -22,7 +22,7 @@ use crate::beatree::{ }, Key, ValueChange, }; -use crate::io::{IoCommand, IoHandle, IoKind, IoPool}; +use crate::io::{IoCommand, IoHandle, IoKind}; use crate::task::{join_task, spawn_task}; /// Tracker of all changes that happen to leaves during an update @@ -135,7 +135,7 @@ pub fn run( &leaf_cache, &leaf_reader, &bbn_index, - io_handle.io_pool(), + io_handle.make_new_sibiling_handle(), changeset[worker_params.op_range.clone()] .iter() .map(|(k, _)| *k), @@ -699,11 +699,9 @@ fn preload_and_prepare( leaf_cache: &LeafCache, leaf_reader: &StoreReader, bbn_index: &Index, - io_pool: &IoPool, + io_handle: IoHandle, changeset: impl IntoIterator, ) -> std::io::Result> { - let io_handle = io_pool.make_handle(); - let mut changeset_leaves: Vec = Vec::new(); let mut submissions = 0; for key in changeset { diff --git a/nomt/src/io/linux.rs b/nomt/src/io/linux.rs index 9e0ebf27..b8c48fc9 100644 --- a/nomt/src/io/linux.rs +++ b/nomt/src/io/linux.rs @@ -3,6 +3,7 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError}; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use slab::Slab; use std::collections::VecDeque; +use threadpool::ThreadPool; const RING_CAPACITY: u32 = 128; @@ -14,30 +15,33 @@ struct PendingIo { completion_sender: Sender, } -pub fn start_io_worker(page_pool: PagePool, io_workers: usize, iopoll: bool) -> Sender { +pub fn start_io_worker( + page_pool: PagePool, + io_workers_tp: &ThreadPool, + io_workers: usize, + iopoll: bool, +) -> Sender { // main bound is from the pending slab. let (command_tx, command_rx) = crossbeam_channel::unbounded(); - start_workers(page_pool, command_rx, io_workers, iopoll); + start_workers(page_pool, io_workers_tp, command_rx, io_workers, iopoll); command_tx } fn start_workers( page_pool: PagePool, + io_workers_tp: &ThreadPool, command_rx: Receiver, io_workers: usize, iopoll: bool, ) { - for i in 0..io_workers { - let _ = std::thread::Builder::new() - .name(format!("io_worker-{i}")) - .spawn({ - let page_pool = page_pool.clone(); - let command_rx = command_rx.clone(); - move || run_worker(page_pool, command_rx, iopoll) - }) - .unwrap(); + for _ in 0..io_workers { + io_workers_tp.execute({ + let page_pool = page_pool.clone(); + let command_rx = command_rx.clone(); + move || run_worker(page_pool, command_rx, iopoll) + }); } } diff --git a/nomt/src/io/mod.rs b/nomt/src/io/mod.rs index 9f1d0ef8..4b2e8ba5 100644 --- a/nomt/src/io/mod.rs +++ b/nomt/src/io/mod.rs @@ -3,7 +3,13 @@ std::compile_error!("NOMT only supports Unix-based OSs"); use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; use page_pool::Page; -use std::{fmt, fs::File, os::fd::RawFd, sync::Arc}; +use std::{ + fmt, + fs::File, + os::fd::RawFd, + sync::{Arc, Weak}, +}; +use threadpool::ThreadPool; #[cfg(target_os = "linux")] #[path = "linux.rs"] @@ -97,31 +103,41 @@ struct IoPacket { /// Create an I/O worker managing an io_uring and sending responses back via channels to a number /// of handles. pub fn start_io_pool(io_workers: usize, iopoll: bool, page_pool: PagePool) -> IoPool { - let sender = platform::start_io_worker(page_pool.clone(), io_workers, iopoll); - IoPool { sender, page_pool } + let io_workers_tp = ThreadPool::with_name("io-worker".to_string(), io_workers); + let sender = platform::start_io_worker(page_pool.clone(), &io_workers_tp, io_workers, iopoll); + let sender = Some(Arc::new(sender)); + IoPool { + sender, + page_pool, + io_workers_tp, + } } #[cfg(test)] pub fn start_test_io_pool(io_workers: usize, page_pool: PagePool) -> IoPool { - let sender = platform::start_io_worker(page_pool.clone(), io_workers, false); - IoPool { sender, page_pool } + start_io_pool(io_workers, false, page_pool) } /// A manager for the broader I/O pool. This can be used to create new I/O handles. -/// -/// Dropping this does not close any outstanding I/O handles or shut down I/O workers. -#[derive(Clone)] pub struct IoPool { - sender: Sender, + sender: Option>>, page_pool: PagePool, + io_workers_tp: ThreadPool, } impl IoPool { /// Create a new I/O handle. + /// + /// This will panic if the I/O pool has been shut down. pub fn make_handle(&self) -> IoHandle { let (completion_sender, completion_receiver) = crossbeam_channel::unbounded(); + let sender = self + .sender + .as_ref() + .expect("call to make_handle after shutdown"); + let sender = Arc::downgrade(sender); IoHandle { - io_pool: self.clone(), + sender, completion_sender, completion_receiver, } @@ -130,6 +146,18 @@ impl IoPool { pub fn page_pool(&self) -> &PagePool { &self.page_pool } + + /// Initiate the shutdown procedure. + /// + /// This will return only after all the I/O workers are shut down. + pub fn shutdown(&mut self) { + // There is only a single strong reference to the sender, dropping it will close the + // channel, causing the I/O workers to shut down. + let sender = self.sender.take().unwrap(); + assert_eq!(Arc::strong_count(&sender), 1); + drop(sender); + self.io_workers_tp.join(); + } } /// A handle for submitting I/O commands and receiving their completions. @@ -143,7 +171,7 @@ impl IoPool { /// This is safe to use across multiple threads, but care must be taken by the user for correctness. #[derive(Clone)] pub struct IoHandle { - io_pool: IoPool, + sender: Weak>, completion_sender: Sender, completion_receiver: Receiver, } @@ -151,8 +179,11 @@ pub struct IoHandle { impl IoHandle { /// Send an I/O command. This fails if the channel has hung up, but does not block the thread. pub fn send(&self, command: IoCommand) -> Result<(), SendError> { - self.io_pool - .sender + let sender = match self.sender.upgrade() { + Some(sender) => sender, + None => return Err(SendError(command)), + }; + sender .send(IoPacket { command, completion_sender: self.completion_sender.clone(), @@ -176,8 +207,19 @@ impl IoHandle { &self.completion_receiver } - pub fn io_pool(&self) -> &IoPool { - &self.io_pool + /// Creates a new handle that can be used to submit I/O commands. + /// + /// Unlike [`Self::clone`] this creates a new handle that can be used independently of the + /// original handle. + /// + /// This will panic if the I/O pool has been shut down. + pub fn make_new_sibiling_handle(&self) -> IoHandle { + let (completion_sender, completion_receiver) = crossbeam_channel::unbounded(); + IoHandle { + sender: self.sender.clone(), + completion_sender, + completion_receiver, + } } } diff --git a/nomt/src/io/unix.rs b/nomt/src/io/unix.rs index ee1b4297..061104a4 100644 --- a/nomt/src/io/unix.rs +++ b/nomt/src/io/unix.rs @@ -1,17 +1,27 @@ use super::{CompleteIo, IoCommand, IoKind, IoKindResult, IoPacket, PagePool, PAGE_SIZE}; use crossbeam_channel::{Receiver, Sender}; +use threadpool::ThreadPool; -pub fn start_io_worker(page_pool: PagePool, io_workers: usize, _iopoll: bool) -> Sender { +pub fn start_io_worker( + page_pool: PagePool, + io_workers_tp: &ThreadPool, + io_workers: usize, + _iopoll: bool, +) -> Sender { let (command_tx, command_rx) = crossbeam_channel::unbounded(); for _ in 0..io_workers { - spawn_worker_thread(page_pool.clone(), command_rx.clone()); + spawn_worker_thread(page_pool.clone(), io_workers_tp, command_rx.clone()); } command_tx } -fn spawn_worker_thread(page_pool: PagePool, command_rx: Receiver) { +fn spawn_worker_thread( + page_pool: PagePool, + io_workers_tp: &ThreadPool, + command_rx: Receiver, +) { let work = move || loop { let Ok(packet) = command_rx.recv() else { // Why the `drop` here? @@ -29,10 +39,7 @@ fn spawn_worker_thread(page_pool: PagePool, command_rx: Receiver) { let _ = packet.completion_sender.send(complete); }; - std::thread::Builder::new() - .name("nomt-io-worker".to_string()) - .spawn(work) - .unwrap(); + io_workers_tp.execute(work); } fn execute(mut command: IoCommand) -> CompleteIo { diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index ea302019..c5ae39f1 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -44,8 +44,7 @@ struct Shared { rollback: Option, io_pool: IoPool, meta_fd: File, - #[allow(unused)] - flock: flock::Flock, + flock: Option, poisoned: AtomicBool, // Retained for the lifetime of the store. @@ -199,7 +198,7 @@ impl Store { io_pool, _db_dir_fd: db_dir_fd, meta_fd, - flock, + flock: Some(flock), poisoned: false.into(), }), }) @@ -312,6 +311,18 @@ impl Store { } } +impl Drop for Shared { + fn drop(&mut self) { + // `Shared` is dropped, meaning no more commits are expected. Therefore, we can + // wait for IO workers to finish their work and then drop the flock. The order is important + // because we need to ensure that the flock is only dropped after the IO workers are done. + // Otherwise, these IO workers might still be writing to the files while another process + // acquired the flock. + self.io_pool.shutdown(); + drop(self.flock.take()); + } +} + /// An atomic transaction on raw key/value pairs to be applied against the store /// with [`Store::commit`]. pub struct ValueTransaction {