Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: increase io_uring ring capacity #807

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions nomt/src/io/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use slab::Slab;
use std::collections::VecDeque;
use threadpool::ThreadPool;

const RING_CAPACITY: u32 = 128;
const RING_CAPACITY: u32 = 1024;

// max number of inflight requests is bounded by the slab.
const MAX_IN_FLIGHT: usize = RING_CAPACITY as usize;
Expand All @@ -19,12 +19,11 @@ pub fn start_io_worker(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
io_workers: usize,
iopoll: bool,
) -> Sender<IoPacket> {
// main bound is from the pending slab.
let (command_tx, command_rx) = crossbeam_channel::unbounded();

start_workers(page_pool, io_workers_tp, command_rx, io_workers, iopoll);
start_workers(page_pool, io_workers_tp, command_rx, io_workers);

command_tx
}
Expand All @@ -34,25 +33,20 @@ fn start_workers(
io_workers_tp: &ThreadPool,
command_rx: Receiver<IoPacket>,
io_workers: usize,
iopoll: bool,
) {
for _ in 0..io_workers {
io_workers_tp.execute({
let page_pool = page_pool.clone();
let command_rx = command_rx.clone();
move || run_worker(page_pool, command_rx, iopoll)
move || run_worker(page_pool, command_rx)
});
}
}

fn run_worker(page_pool: PagePool, command_rx: Receiver<IoPacket>, iopoll: bool) {
fn run_worker(page_pool: PagePool, command_rx: Receiver<IoPacket>) {
let mut pending: Slab<PendingIo> = Slab::with_capacity(MAX_IN_FLIGHT);

let mut ring_builder = IoUring::<squeue::Entry, cqueue::Entry>::builder();
if iopoll {
ring_builder.setup_iopoll();
}
let mut ring = ring_builder
let mut ring = IoUring::<squeue::Entry, cqueue::Entry>::builder()
.build(RING_CAPACITY)
.expect("Error building io_uring");

Expand Down
6 changes: 3 additions & 3 deletions nomt/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ struct IoPacket {

/// Create an I/O worker managing an io_uring and sending responses back via channels to a number
/// of handles.
pub fn start_io_pool(io_workers: usize, iopoll: bool, page_pool: PagePool) -> IoPool {
pub fn start_io_pool(io_workers: usize, page_pool: PagePool) -> IoPool {
let io_workers_tp = ThreadPool::with_name("io-worker".to_string(), io_workers);
let sender = platform::start_io_worker(page_pool.clone(), &io_workers_tp, io_workers, iopoll);
let sender = platform::start_io_worker(page_pool.clone(), &io_workers_tp, io_workers);
let sender = Some(Arc::new(sender));
IoPool {
sender,
Expand All @@ -115,7 +115,7 @@ pub fn start_io_pool(io_workers: usize, iopoll: bool, page_pool: PagePool) -> Io

#[cfg(test)]
pub fn start_test_io_pool(io_workers: usize, page_pool: PagePool) -> IoPool {
start_io_pool(io_workers, false, page_pool)
start_io_pool(io_workers, page_pool)
}

/// A manager for the broader I/O pool. This can be used to create new I/O handles.
Expand Down
1 change: 0 additions & 1 deletion nomt/src/io/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub fn start_io_worker(
page_pool: PagePool,
io_workers_tp: &ThreadPool,
io_workers: usize,
_iopoll: bool,
) -> Sender<IoPacket> {
let (command_tx, command_rx) = crossbeam_channel::unbounded();

Expand Down
8 changes: 1 addition & 7 deletions nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,20 @@ impl Store {

cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
// iopoll does not play nice with FUSE and tmpfs. A symptom is ENOSUPP.
// O_DIRECT is not supported on tmpfs.
let iopoll: bool;
let o_direct: bool;
match crate::sys::linux::fs_check(&db_dir_fd) {
Ok(fsck) => {
iopoll = !(fsck.is_fuse() || fsck.is_tmpfs());
o_direct = !fsck.is_tmpfs();
},
Err(_) => {
iopoll = false;
o_direct = false;
},
}
} else {
let iopoll = true;
}
}

let io_pool = io::start_io_pool(o.io_workers, iopoll, page_pool.clone());
let io_pool = io::start_io_pool(o.io_workers, page_pool.clone());

let meta_fd = {
let mut options = OpenOptions::new();
Expand Down
5 changes: 0 additions & 5 deletions nomt/src/sys/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ impl FsCheck {
pub fn is_tmpfs(&self) -> bool {
self.stat.f_type == libc::TMPFS_MAGIC
}

/// Returns true if the filesystem is backed by FUSE.
pub fn is_fuse(&self) -> bool {
self.stat.f_type == libc::FUSE_SUPER_MAGIC
}
}

/// fallocate changes the size of the file to the given length if it's less than the current size.
Expand Down