Skip to content

Commit

Permalink
Small changes in the Executor API. (#2391)
Browse files Browse the repository at this point in the history
Warning, this change is mildly not backward compatible
so I bumped tantivy's version.
  • Loading branch information
fulmicoton authored May 10, 2024
1 parent 1ee5f90 commit 6181c1e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 87 deletions.
44 changes: 29 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.22.0"
version = "0.23.0"
authors = ["Paul Masurel <[email protected]>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
Expand All @@ -15,12 +15,16 @@ rust-version = "1.63"
exclude = ["benches/*.json", "benches/*.txt"]

[dependencies]
oneshot = "0.1.5"
# Switch back to the non-forked oneshot crate once https://github.com/faern/oneshot/pull/35 is merged
oneshot = { git = "https://github.com/fulmicoton/oneshot.git", rev = "c10a3ba" }
base64 = "0.22.0"
byteorder = "1.4.3"
crc32fast = "1.3.2"
once_cell = "1.10.0"
regex = { version = "1.5.5", default-features = false, features = ["std", "unicode"] }
regex = { version = "1.5.5", default-features = false, features = [
"std",
"unicode",
] }
aho-corasick = "1.0"
tantivy-fst = "0.5"
memmap2 = { version = "0.9.0", optional = true }
Expand All @@ -36,7 +40,9 @@ uuid = { version = "1.0.0", features = ["v4", "serde"] }
crossbeam-channel = "0.5.4"
rust-stemmers = "1.2.0"
downcast-rs = "1.2.0"
bitpacking = { version = "0.9.2", default-features = false, features = ["bitpacker4x"] }
bitpacking = { version = "0.9.2", default-features = false, features = [
"bitpacker4x",
] }
census = "0.4.2"
rustc-hash = "1.1.0"
thiserror = "1.0.30"
Expand All @@ -51,13 +57,13 @@ itertools = "0.12.0"
measure_time = "0.8.2"
arc-swap = "1.5.0"

columnar = { version= "0.3", path="./columnar", package ="tantivy-columnar" }
sstable = { version= "0.3", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version= "0.3", path="./stacker", package ="tantivy-stacker" }
query-grammar = { version= "0.22.0", path="./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version= "0.6", path="./bitpacker" }
common = { version= "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version= "0.3", path="./tokenizer-api", package="tantivy-tokenizer-api" }
columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
stacker = { version = "0.3", path = "./stacker", package = "tantivy-stacker" }
query-grammar = { version = "0.22.0", path = "./query-grammar", package = "tantivy-query-grammar" }
tantivy-bitpacker = { version = "0.6", path = "./bitpacker" }
common = { version = "0.7", path = "./common/", package = "tantivy-common" }
tokenizer-api = { version = "0.3", path = "./tokenizer-api", package = "tantivy-tokenizer-api" }
sketches-ddsketch = { version = "0.2.1", features = ["use_serde"] }
futures-util = { version = "0.3.28", optional = true }
fnv = "1.0.7"
Expand All @@ -66,7 +72,7 @@ fnv = "1.0.7"
winapi = "0.3.9"

[dev-dependencies]
binggan = "0.5.1"
binggan = "0.5.2"
rand = "0.8.5"
maplit = "1.0.2"
matches = "0.1.9"
Expand Down Expand Up @@ -112,7 +118,7 @@ lz4-compression = ["lz4_flex"]
zstd-compression = ["zstd"]

failpoints = ["fail", "fail/failpoints"]
unstable = [] # useful for benches.
unstable = [] # useful for benches.

quickwit = ["sstable", "futures-util"]

Expand All @@ -122,7 +128,16 @@ quickwit = ["sstable", "futures-util"]
compare_hash_only = ["stacker/compare_hash_only"]

[workspace]
members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sstable", "tokenizer-api", "columnar"]
members = [
"query-grammar",
"bitpacker",
"common",
"ownedbytes",
"stacker",
"sstable",
"tokenizer-api",
"columnar",
]

# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
Expand All @@ -147,4 +162,3 @@ harness = false
[[bench]]
name = "agg_bench"
harness = false

76 changes: 16 additions & 60 deletions src/core/executor.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use std::sync::Arc;

#[cfg(feature = "quickwit")]
use futures_util::{future::Either, FutureExt};
use rayon::{ThreadPool, ThreadPoolBuilder};

use crate::TantivyError;

/// Search executor whether search request are single thread or multithread.
///
/// We don't expose Rayon thread pool directly here for several reasons.
///
/// First dependency hell. It is not a good idea to expose the
/// API of a dependency, knowing it might conflict with a different version
/// used by the client. Second, we may stop using rayon in the future.
/// Executor makes it possible to run tasks in single thread or
/// in a thread pool.
#[derive(Clone)]
pub enum Executor {
/// Single thread variant of an Executor
SingleThread,
/// Thread pool variant of an Executor
ThreadPool(ThreadPool),
ThreadPool(Arc<rayon::ThreadPool>),
}

#[cfg(feature = "quickwit")]
impl From<Arc<rayon::ThreadPool>> for Executor {
fn from(thread_pool: Arc<rayon::ThreadPool>) -> Self {
Executor::ThreadPool(thread_pool)
}
}

impl Executor {
Expand All @@ -26,11 +30,11 @@ impl Executor {

/// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> crate::Result<Executor> {
let pool = ThreadPoolBuilder::new()
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{prefix}{num}"))
.build()?;
Ok(Executor::ThreadPool(pool))
Ok(Executor::ThreadPool(Arc::new(pool)))
}

/// Perform a map in the thread pool.
Expand Down Expand Up @@ -105,7 +109,7 @@ impl Executor {
match self {
Executor::SingleThread => Either::Left(std::future::ready(Ok(cpu_intensive_task()))),
Executor::ThreadPool(pool) => {
let (sender, receiver) = oneshot_with_sentinel::channel();
let (sender, receiver) = oneshot::channel();
pool.spawn(|| {
if sender.is_closed() {
return;
Expand All @@ -121,54 +125,6 @@ impl Executor {
}
}

#[cfg(feature = "quickwit")]
mod oneshot_with_sentinel {
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
// TODO get ride of this if oneshot ever gains a is_closed()

pub struct SenderWithSentinel<T> {
tx: oneshot::Sender<T>,
guard: Arc<()>,
}

pub struct ReceiverWithSentinel<T> {
rx: oneshot::Receiver<T>,
_guard: Arc<()>,
}

pub fn channel<T>() -> (SenderWithSentinel<T>, ReceiverWithSentinel<T>) {
let (tx, rx) = oneshot::channel();
let guard = Arc::new(());
(
SenderWithSentinel {
tx,
guard: guard.clone(),
},
ReceiverWithSentinel { rx, _guard: guard },
)
}

impl<T> SenderWithSentinel<T> {
pub fn send(self, message: T) -> Result<(), oneshot::SendError<T>> {
self.tx.send(message)
}

pub fn is_closed(&self) -> bool {
Arc::strong_count(&self.guard) == 1
}
}

impl<T> std::future::Future for ReceiverWithSentinel<T> {
type Output = Result<T, oneshot::RecvError>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.rx).poll(ctx)
}
}
}

#[cfg(test)]
mod tests {
use super::Executor;
Expand Down
17 changes: 6 additions & 11 deletions src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::fmt;
#[cfg(feature = "mmap")]
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::available_parallelism;

use super::segment::Segment;
Expand Down Expand Up @@ -294,7 +293,7 @@ pub struct Index {
directory: ManagedDirectory,
schema: Schema,
settings: IndexSettings,
executor: Arc<Executor>,
executor: Executor,
tokenizers: TokenizerManager,
fast_field_tokenizers: TokenizerManager,
inventory: SegmentMetaInventory,
Expand All @@ -319,23 +318,19 @@ impl Index {
///
/// By default the executor is single thread, and simply runs in the calling thread.
pub fn search_executor(&self) -> &Executor {
self.executor.as_ref()
&self.executor
}

/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) -> crate::Result<()> {
self.executor = Arc::new(Executor::multi_thread(num_threads, "tantivy-search-")?);
self.executor = Executor::multi_thread(num_threads, "tantivy-search-")?;
Ok(())
}

/// Custom thread pool by a outer thread pool.
pub fn set_shared_multithread_executor(
&mut self,
shared_thread_pool: Arc<Executor>,
) -> crate::Result<()> {
self.executor = shared_thread_pool.clone();
Ok(())
pub fn set_executor(&mut self, executor: Executor) {
self.executor = executor;
}

/// Replace the default single thread search executor pool
Expand Down Expand Up @@ -419,7 +414,7 @@ impl Index {
schema,
tokenizers: TokenizerManager::default(),
fast_field_tokenizers: TokenizerManager::default(),
executor: Arc::new(Executor::single_thread()),
executor: Executor::single_thread(),
inventory,
}
}
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ pub mod tests {
}

#[test]
#[cfg(not(feature = "lz4"))]
fn test_version_string() {
use regex::Regex;
let regex_ptn = Regex::new(
Expand Down

0 comments on commit 6181c1e

Please sign in to comment.