Skip to content

Commit

Permalink
Improve IndexWriter customisation via builder (#2562)
Browse files Browse the repository at this point in the history
* Improve `IndexWriter` customisation via builder

* Remove change noise from PR

* Correct documentation

* Resolve comments and add test
  • Loading branch information
ChillFish8 authored Jan 2, 2025
1 parent 8edb439 commit 148594f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 56 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ fastdivide = "0.4.0"
itertools = "0.13.0"
measure_time = "0.9.0"
arc-swap = "1.5.0"
bon = "3.3.1"

columnar = { version = "0.3", path = "./columnar", package = "tantivy-columnar" }
sstable = { version = "0.3", path = "./sstable", package = "tantivy-sstable", optional = true }
Expand Down
62 changes: 32 additions & 30 deletions src/index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::index::{IndexMeta, SegmentId, SegmentMeta, SegmentMetaInventory};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::index_writer::{
IndexWriterOptions, MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN,
};
use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
Expand All @@ -24,8 +26,6 @@ use crate::schema::{Field, FieldType, Schema};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::SegmentReader;

const DEFAULT_NUM_MERGE_THREADS: usize = 4;

fn load_metas(
directory: &dyn Directory,
inventory: &SegmentMetaInventory,
Expand Down Expand Up @@ -521,30 +521,24 @@ impl Index {
load_metas(self.directory(), &self.inventory)
}

/// Open a new index writer. Attempts to acquire a lockfile.
/// Open a new index writer with the given options. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `num_threads` defines the number of indexing workers that should work at the same time.
///
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
/// thread.
///
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
/// - `options` defines the writer configuration which includes things like buffer sizes,
/// indexer threads, etc...
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the lockfile already exists, returns `TantivyError::LockFailure`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads_and_num_merge_threads<D: Document>(
pub fn writer_with_options<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
num_merge_threads: usize,
options: IndexWriterOptions,
) -> crate::Result<IndexWriter<D>> {
let directory_lock = self
.directory
Expand All @@ -560,32 +554,40 @@ impl Index {
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
memory_arena_in_bytes_per_thread,
directory_lock,
num_merge_threads,
)

IndexWriter::new(self, options, directory_lock)
}

/// Creates a multithreaded writer with 4 merge threads.
/// Open a new index writer. Attempts to acquire a lockfile.
///
/// The lockfile should be deleted on drop, but it is possible
/// that due to a panic or other error, a stale lockfile will be
/// left in the index directory. If you are sure that no other
/// `IndexWriter` on the system is accessing the index directory,
/// it is safe to manually delete the lockfile.
///
/// - `num_threads` defines the number of indexing workers that should work at the same time.
///
/// - `overall_memory_budget_in_bytes` sets the amount of memory allocated for all indexing
/// thread.
///
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer_with_num_threads<D: Document>(
&self,
num_threads: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter<D>> {
self.writer_with_num_threads_and_num_merge_threads(
num_threads,
overall_memory_budget_in_bytes,
DEFAULT_NUM_MERGE_THREADS,
)
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
let options = IndexWriterOptions::builder()
.num_worker_threads(num_threads)
.memory_budget_per_thread(memory_arena_in_bytes_per_thread)
.build();
self.writer_with_options(options)
}

/// Helper to create an index writer for tests.
Expand Down
89 changes: 64 additions & 25 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ fn error_in_index_worker_thread(context: &str) -> TantivyError {
))
}

#[derive(Clone, bon::Builder)]
/// A builder for creating a new [IndexWriter] for an index.
pub struct IndexWriterOptions {
#[builder(default = MEMORY_BUDGET_NUM_BYTES_MIN)]
/// The memory budget per indexer thread.
///
/// When an indexer thread has buffered this much data in memory
/// it will flush the segment to disk (although this is not searchable until commit is called.)
memory_budget_per_thread: usize,
#[builder(default = 1)]
/// The number of indexer worker threads to use.
num_worker_threads: usize,
#[builder(default = 4)]
/// Defines the number of merger threads to use.
num_merge_threads: usize,
}

/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
Expand All @@ -58,8 +75,7 @@ pub struct IndexWriter<D: Document = TantivyDocument> {

index: Index,

// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,

workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,

Expand All @@ -70,9 +86,6 @@ pub struct IndexWriter<D: Document = TantivyDocument> {

worker_id: usize,

num_threads: usize,
num_merge_threads: usize,

delete_queue: DeleteQueue,

stamper: Stamper,
Expand Down Expand Up @@ -266,24 +279,27 @@ impl<D: Document> IndexWriter<D> {
/// `TantivyError::InvalidArgument`
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_budget_in_bytes_per_thread: usize,
options: IndexWriterOptions,
directory_lock: DirectoryLock,
num_merge_threads: usize,
) -> crate::Result<Self> {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
if options.memory_budget_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
if options.memory_budget_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if options.num_worker_threads == 0 {
let err_msg = "At least one worker thread is required, got 0".to_string();
return Err(TantivyError::InvalidArgument(err_msg));
}

let (document_sender, document_receiver) =
crossbeam_channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);

Expand All @@ -297,23 +313,20 @@ impl<D: Document> IndexWriter<D> {
index.clone(),
stamper.clone(),
&delete_queue.cursor(),
num_merge_threads,
options.num_merge_threads,
)?;

let mut index_writer = Self {
_directory_lock: Some(directory_lock),

memory_budget_in_bytes_per_thread,
options: options.clone(),
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,

segment_updater,

workers_join_handle: vec![],
num_threads,

num_merge_threads,

delete_queue,

Expand Down Expand Up @@ -406,7 +419,7 @@ impl<D: Document> IndexWriter<D> {

let mut delete_cursor = self.delete_queue.cursor();

let mem_budget = self.memory_budget_in_bytes_per_thread;
let mem_budget = self.options.memory_budget_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
Expand Down Expand Up @@ -459,7 +472,7 @@ impl<D: Document> IndexWriter<D> {
}

fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
for _ in 0..self.options.num_worker_threads {
self.add_indexing_worker()?;
}
Ok(())
Expand Down Expand Up @@ -561,13 +574,7 @@ impl<D: Document> IndexWriter<D> {
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");

let new_index_writer = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_budget_in_bytes_per_thread,
directory_lock,
self.num_merge_threads,
)?;
let new_index_writer = IndexWriter::new(&self.index, self.options.clone(), directory_lock)?;

// the current `self` is dropped right away because of this call.
//
Expand Down Expand Up @@ -821,7 +828,7 @@ mod tests {
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
Expand Down Expand Up @@ -2542,4 +2549,36 @@ mod tests {
index_writer.commit().unwrap();
Ok(())
}

#[test]
fn test_writer_options_validation() {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bool_field("example", STORED);

Check warning on line 2556 in src/indexer/index_writer.rs

View workflow job for this annotation

GitHub Actions / clippy

unused variable: `field`

warning: unused variable: `field` --> src/indexer/index_writer.rs:2556:13 | 2556 | let field = schema_builder.add_bool_field("example", STORED); | ^^^^^ help: if this is intentional, prefix it with an underscore: `_field` | = note: `#[warn(unused_variables)]` on by default
let index = Index::create_in_ram(schema_builder.build());

let opt_wo_threads = IndexWriterOptions::builder().num_worker_threads(0).build();
let result = index.writer_with_options::<TantivyDocument>(opt_wo_threads);
assert!(result.is_err(), "Writer should reject 0 thread count");
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));

let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(10 << 10)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too low memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));

let opt_with_low_memory = IndexWriterOptions::builder()
.memory_budget_per_thread(5 << 30)
.build();
let result = index.writer_with_options::<TantivyDocument>(opt_with_low_memory);
assert!(
result.is_err(),
"Writer should reject options with too high memory size"
);
assert!(matches!(result, Err(TantivyError::InvalidArgument(_))));
}
}
2 changes: 1 addition & 1 deletion src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;

pub use self::index_writer::IndexWriter;
pub use self::index_writer::{IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
Expand Down

0 comments on commit 148594f

Please sign in to comment.