Skip to content

Commit

Permalink
feat: wait for shutdown
Browse files Browse the repository at this point in the history
This changeset stops `Nomt::drop` from returning until all the IO
workers finished. Also, this only releases flock on the database
directory only after the IO workers finished.

This matters when creating a NOMT instance right after shutting down the
previous one, as might happen upon recovery after poisoning.
  • Loading branch information
pepyakin committed Feb 10, 2025
1 parent 0fdf074 commit 9960dec
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 41 deletions.
8 changes: 3 additions & 5 deletions nomt/src/beatree/ops/update/leaf_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::beatree::{
},
Key, ValueChange,
};
use crate::io::{IoCommand, IoHandle, IoKind, IoPool};
use crate::io::{IoCommand, IoHandle, IoKind};
use crate::task::{join_task, spawn_task};

/// Tracker of all changes that happen to leaves during an update
Expand Down Expand Up @@ -135,7 +135,7 @@ pub fn run(
&leaf_cache,
&leaf_reader,
&bbn_index,
io_handle.io_pool(),
io_handle.make_new_sibiling_handle(),
changeset[worker_params.op_range.clone()]
.iter()
.map(|(k, _)| *k),
Expand Down Expand Up @@ -699,11 +699,9 @@ fn preload_and_prepare(
leaf_cache: &LeafCache,
leaf_reader: &StoreReader,
bbn_index: &Index,
io_pool: &IoPool,
io_handle: IoHandle,
changeset: impl IntoIterator<Item = Key>,
) -> std::io::Result<Vec<PreparedLeaf>> {
let io_handle = io_pool.make_handle();

let mut changeset_leaves: Vec<PreparedLeaf> = Vec::new();
let mut submissions = 0;
for key in changeset {
Expand Down
26 changes: 15 additions & 11 deletions nomt/src/io/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crossbeam_channel::{Receiver, Sender, TryRecvError};
use io_uring::{cqueue, opcode, squeue, types, IoUring};
use slab::Slab;
use std::collections::VecDeque;
use threadpool::ThreadPool;

const RING_CAPACITY: u32 = 128;

Expand All @@ -14,30 +15,33 @@ struct PendingIo {
completion_sender: Sender<CompleteIo>,
}

pub fn start_io_worker(page_pool: PagePool, io_workers: usize, iopoll: bool) -> Sender<IoPacket> {
pub fn start_io_worker(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
io_workers: usize,
iopoll: bool,
) -> Sender<IoPacket> {
// main bound is from the pending slab.
let (command_tx, command_rx) = crossbeam_channel::unbounded();

start_workers(page_pool, command_rx, io_workers, iopoll);
start_workers(page_pool, io_workers_tp, command_rx, io_workers, iopoll);

command_tx
}

fn start_workers(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
command_rx: Receiver<IoPacket>,
io_workers: usize,
iopoll: bool,
) {
for i in 0..io_workers {
let _ = std::thread::Builder::new()
.name(format!("io_worker-{i}"))
.spawn({
let page_pool = page_pool.clone();
let command_rx = command_rx.clone();
move || run_worker(page_pool, command_rx, iopoll)
})
.unwrap();
for _ in 0..io_workers {
io_workers_tp.execute({
let page_pool = page_pool.clone();
let command_rx = command_rx.clone();
move || run_worker(page_pool, command_rx, iopoll)
});
}
}

Expand Down
72 changes: 57 additions & 15 deletions nomt/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ std::compile_error!("NOMT only supports Unix-based OSs");

use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError};
use page_pool::Page;
use std::{fmt, fs::File, os::fd::RawFd, sync::Arc};
use std::{
fmt,
fs::File,
os::fd::RawFd,
sync::{Arc, Weak},
};
use threadpool::ThreadPool;

#[cfg(target_os = "linux")]
#[path = "linux.rs"]
Expand Down Expand Up @@ -97,31 +103,41 @@ struct IoPacket {
/// Create an I/O worker managing an io_uring and sending responses back via channels to a number
/// of handles.
pub fn start_io_pool(io_workers: usize, iopoll: bool, page_pool: PagePool) -> IoPool {
let sender = platform::start_io_worker(page_pool.clone(), io_workers, iopoll);
IoPool { sender, page_pool }
let io_workers_tp = ThreadPool::with_name("io-worker".to_string(), io_workers);
let sender = platform::start_io_worker(page_pool.clone(), &io_workers_tp, io_workers, iopoll);
let sender = Some(Arc::new(sender));
IoPool {
sender,
page_pool,
io_workers_tp,
}
}

#[cfg(test)]
pub fn start_test_io_pool(io_workers: usize, page_pool: PagePool) -> IoPool {
let sender = platform::start_io_worker(page_pool.clone(), io_workers, false);
IoPool { sender, page_pool }
start_io_pool(io_workers, false, page_pool)
}

/// A manager for the broader I/O pool. This can be used to create new I/O handles.
///
/// Dropping this does not close any outstanding I/O handles or shut down I/O workers.
#[derive(Clone)]
pub struct IoPool {
sender: Sender<IoPacket>,
sender: Option<Arc<Sender<IoPacket>>>,
page_pool: PagePool,
io_workers_tp: ThreadPool,
}

impl IoPool {
/// Create a new I/O handle.
///
/// This will panic if the I/O pool has been shut down.
pub fn make_handle(&self) -> IoHandle {
let (completion_sender, completion_receiver) = crossbeam_channel::unbounded();
let sender = self
.sender
.as_ref()
.expect("call to make_handle after shutdown");
let sender = Arc::downgrade(sender);
IoHandle {
io_pool: self.clone(),
sender,
completion_sender,
completion_receiver,
}
Expand All @@ -130,6 +146,18 @@ impl IoPool {
pub fn page_pool(&self) -> &PagePool {
&self.page_pool
}

/// Initiate the shutdown procedure.
///
/// This will return only after all the I/O workers are shut down.
pub fn shutdown(&mut self) {
// There is only a single strong reference to the sender, dropping it will close the
// channel, causing the I/O workers to shut down.
let sender = self.sender.take().unwrap();
assert_eq!(Arc::strong_count(&sender), 1);
drop(sender);
self.io_workers_tp.join();
}
}

/// A handle for submitting I/O commands and receiving their completions.
Expand All @@ -143,16 +171,19 @@ impl IoPool {
/// This is safe to use across multiple threads, but care must be taken by the user for correctness.
#[derive(Clone)]
pub struct IoHandle {
io_pool: IoPool,
sender: Weak<Sender<IoPacket>>,
completion_sender: Sender<CompleteIo>,
completion_receiver: Receiver<CompleteIo>,
}

impl IoHandle {
/// Send an I/O command. This fails if the channel has hung up, but does not block the thread.
pub fn send(&self, command: IoCommand) -> Result<(), SendError<IoCommand>> {
self.io_pool
.sender
let sender = match self.sender.upgrade() {
Some(sender) => sender,
None => return Err(SendError(command)),
};
sender
.send(IoPacket {
command,
completion_sender: self.completion_sender.clone(),
Expand All @@ -176,8 +207,19 @@ impl IoHandle {
&self.completion_receiver
}

pub fn io_pool(&self) -> &IoPool {
&self.io_pool
/// Creates a new handle that can be used to submit I/O commands.
///
/// Unlike [`Self::clone`] this creates a new handle that can be used independently of the
/// original handle.
///
/// This will panic if the I/O pool has been shut down.
pub fn make_new_sibiling_handle(&self) -> IoHandle {
let (completion_sender, completion_receiver) = crossbeam_channel::unbounded();
IoHandle {
sender: self.sender.clone(),
completion_sender,
completion_receiver,
}
}
}

Expand Down
21 changes: 14 additions & 7 deletions nomt/src/io/unix.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
use super::{CompleteIo, IoCommand, IoKind, IoKindResult, IoPacket, PagePool, PAGE_SIZE};
use crossbeam_channel::{Receiver, Sender};
use threadpool::ThreadPool;

pub fn start_io_worker(page_pool: PagePool, io_workers: usize, _iopoll: bool) -> Sender<IoPacket> {
pub fn start_io_worker(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
io_workers: usize,
_iopoll: bool,
) -> Sender<IoPacket> {
let (command_tx, command_rx) = crossbeam_channel::unbounded();

for _ in 0..io_workers {
spawn_worker_thread(page_pool.clone(), command_rx.clone());
spawn_worker_thread(page_pool.clone(), io_workers_tp, command_rx.clone());
}

command_tx
}

fn spawn_worker_thread(page_pool: PagePool, command_rx: Receiver<IoPacket>) {
fn spawn_worker_thread(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
command_rx: Receiver<IoPacket>,
) {
let work = move || loop {
let Ok(packet) = command_rx.recv() else {
// Why the `drop` here?
Expand All @@ -29,10 +39,7 @@ fn spawn_worker_thread(page_pool: PagePool, command_rx: Receiver<IoPacket>) {
let _ = packet.completion_sender.send(complete);
};

std::thread::Builder::new()
.name("nomt-io-worker".to_string())
.spawn(work)
.unwrap();
io_workers_tp.execute(work);
}

fn execute(mut command: IoCommand) -> CompleteIo {
Expand Down
17 changes: 14 additions & 3 deletions nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ struct Shared {
rollback: Option<Rollback>,
io_pool: IoPool,
meta_fd: File,
#[allow(unused)]
flock: flock::Flock,
flock: Option<flock::Flock>,
poisoned: AtomicBool,

// Retained for the lifetime of the store.
Expand Down Expand Up @@ -199,7 +198,7 @@ impl Store {
io_pool,
_db_dir_fd: db_dir_fd,
meta_fd,
flock,
flock: Some(flock),
poisoned: false.into(),
}),
})
Expand Down Expand Up @@ -312,6 +311,18 @@ impl Store {
}
}

impl Drop for Shared {
fn drop(&mut self) {
// `Shared` is dropped, meaning no more commits are expected. Therefore, we can
// wait for IO workers to finish their work and then drop the flock. The order is important
// because we need to ensure that the flock is only dropped after the IO workers are done.
// Otherwise, these IO workers might still be writing to the files while another process
// acquired the flock.
self.io_pool.shutdown();
drop(self.flock.take());
}
}

/// An atomic transaction on raw key/value pairs to be applied against the store
/// with [`Store::commit`].
pub struct ValueTransaction {
Expand Down

0 comments on commit 9960dec

Please sign in to comment.