Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(injector): add an extend method to Nucleo's injector #74

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ exclude = ["/typos.toml", "/tarpaulin.toml"]

[dependencies]
nucleo-matcher = { version = "0.3.1", path = "matcher" }
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"]}
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
rayon = "1.7.0"

[workspace]
members = [ "matcher", "bench" ]
members = ["matcher", "bench"]
194 changes: 194 additions & 0 deletions src/boxcar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

use std::alloc::Layout;
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering};
use std::{ptr, slice};
Expand Down Expand Up @@ -182,6 +183,94 @@ impl<T> Vec<T> {
index
}

/// Extends the vector by appending multiple elements at once.
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
let count: u32 = values
.len()
.try_into()
.expect("overflowed maximum capacity");
if count == 0 {
assert!(
values.into_iter().next().is_none(),
"The `values` variable reported incorrect length."
);
return;
}

// Reserve all indices at once
let start_index: u32 = self
.inflight
.fetch_add(u64::from(count), Ordering::Release)
.try_into()
.expect("overflowed maximum capacity");

// Compute first and last locations
let start_location = Location::of(start_index);
let end_location = Location::of(start_index + count);

// Eagerly allocate the next bucket if the last entry is close to the end of its next bucket
let alloc_entry = end_location.alloc_next_bucket_entry();
if end_location.entry >= alloc_entry
&& (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry)
{
// This might be the last bucket, hence the check
if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
}
}

let mut bucket = unsafe { self.buckets.get_unchecked(start_location.bucket as usize) };
let mut entries = bucket.entries.load(Ordering::Acquire);
if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(start_location.bucket),
self.columns,
);
}
// Route each value to its corresponding bucket
let mut location;
let count = count as usize;
for (i, v) in values.into_iter().enumerate() {
// ExactSizeIterator is a safe trait that can have bugs/lie about it's size.
// Unsafe code cannot rely on the reported length being correct.
assert!(i < count);

location =
Location::of(start_index + u32::try_from(i).expect("overflowed maximum capacity"));

// if we're starting to insert into a different bucket, allocate it beforehand
if location.entry == 0 && i != 0 {
// safety: `location.bucket` is always in bounds
bucket = unsafe { self.buckets.get_unchecked(location.bucket as usize) };
entries = bucket.entries.load(Ordering::Acquire);

if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(location.bucket),
self.columns,
);
}
}

unsafe {
let entry = Bucket::get(entries, location.entry, self.columns);

// Initialize matcher columns
for col in Entry::matcher_cols_raw(entry, self.columns) {
col.get().write(MaybeUninit::new(Utf32String::default()));
}
fill_columns(&v, Entry::matcher_cols_mut(entry, self.columns));
(*entry).slot.get().write(MaybeUninit::new(v));
(*entry).active.store(true, Ordering::Release);
}
}
}

/// race to initialize a bucket
fn get_or_alloc(bucket: &Bucket<T>, len: u32, cols: u32) -> *mut Entry<T> {
let entries = unsafe { Bucket::alloc(len, cols) };
Expand Down Expand Up @@ -557,6 +646,16 @@ impl Location {
fn bucket_len(bucket: u32) -> u32 {
1 << (bucket + SKIP_BUCKET)
}

/// The entry index at which the next bucket should be pre-allocated.
fn alloc_next_bucket_entry(&self) -> u32 {
self.bucket_len - (self.bucket_len >> 3)
}

/// Whether the next bucket should be pre-allocated given the current position.
fn should_alloc_next_bucket(&self) -> bool {
self.entry == self.alloc_next_bucket_entry()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -594,4 +693,99 @@ mod tests {
assert_eq!(max.bucket_len, 1 << 31);
assert_eq!(max.entry, (1 << 31) - 1);
}

#[test]
fn extend_unique_bucket() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..10, |_, _| {});
assert_eq!(vec.count(), 10);
for i in 0..10 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(10).is_none());
}

#[test]
fn extend_over_two_buckets() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..100, |_, _| {});
assert_eq!(vec.count(), 100);
for i in 0..100 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(100).is_none());
}

#[test]
fn extend_over_more_than_two_buckets() {
let vec = Vec::<u32>::with_capacity(1, 1);
vec.extend(0..1000, |_, _| {});
assert_eq!(vec.count(), 1000);
for i in 0..1000 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(1000).is_none());
}

#[test]
/// test that ExactSizeIterator returning incorrect length is caught (0 AND more than reported)
fn extend_with_incorrect_reported_len_is_caught() {
struct IncorrectLenIter {
len: usize,
iter: std::ops::Range<u32>,
}

impl Iterator for IncorrectLenIter {
type Item = u32;

fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}

impl ExactSizeIterator for IncorrectLenIter {
fn len(&self) -> usize {
self.len
}
}

let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 10,
iter: (0..12),
};
// this should panic
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());

let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 12,
iter: (0..10),
};
// this shouldn't panic and should just ignore the extra elements
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_ok());
// we should reserve 12 elements but only 10 should be present
assert_eq!(vec.count(), 12);
for i in 0..10 {
assert_eq!(*vec.get(i).unwrap().data, i);
}
assert!(vec.get(10).is_none());

let vec = Vec::<u32>::with_capacity(1, 1);
let iter = IncorrectLenIter {
len: 0,
iter: (0..2),
};
// this should panic
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
}

// test |values| does not fit in the boxcar
#[test]
fn extend_over_max_capacity() {
let vec = Vec::<u32>::with_capacity(1, 1);
let count = MAX_ENTRIES as usize + 2;
let iter = std::iter::repeat(0).take(count);
assert!(std::panic::catch_unwind(|| vec.extend(iter, |_, _| {})).is_err());
}
}
17 changes: 17 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ impl<T> Injector<T> {
idx
}

/// Appends multiple elements to the list of matched items.
/// This function is lock-free and wait-free.
///
/// You should favor this function over `push` if at least one of the following is true:
/// - the number of items you're adding can be computed beforehand and is typically larger
/// than 1k
/// - you're able to batch incoming items
/// - you're adding items from multiple threads concurrently (this function results in less
/// contention)
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
self.items.extend(values, fill_columns);
(self.notify)();
}

/// Returns the total number of items injected in the matcher. This might
/// not match the number of items in the match snapshot (if the matcher
/// is still running)
Expand Down
Loading