From c8de8b80f441f0bcb9088cfaa94121e87bbc0375 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 16 Aug 2024 17:14:33 +0800 Subject: [PATCH] fix(fulltext-index): single segment is not sufficient for >50M rows SST (#4552) * fix(fulltext-index): single segment is not sufficient for a >50M rows SST Signed-off-by: Zhenchi * fix: update doc comment Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 1 + src/index/Cargo.toml | 1 + src/index/src/fulltext_index/create.rs | 2 +- .../src/fulltext_index/create/tantivy.rs | 65 ++++++++++++++----- src/index/src/fulltext_index/error.rs | 20 +++++- .../src/fulltext_index/search/tantivy.rs | 57 +++++++++++++--- 6 files changed, 117 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12a4fdca9890..bdd59547a9e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5020,6 +5020,7 @@ dependencies = [ "common-base", "common-error", "common-macro", + "common-runtime", "common-telemetry", "common-test-util", "fst", diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 8e2c05c6ad05..772177147ae2 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -15,6 +15,7 @@ bytes.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true fst.workspace = true futures.workspace = true diff --git a/src/index/src/fulltext_index/create.rs b/src/index/src/fulltext_index/create.rs index 43b905968816..99567a3f723e 100644 --- a/src/index/src/fulltext_index/create.rs +++ b/src/index/src/fulltext_index/create.rs @@ -15,7 +15,7 @@ mod tantivy; use async_trait::async_trait; -pub use tantivy::{TantivyFulltextIndexCreator, TEXT_FIELD_NAME}; +pub use tantivy::{TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME}; use crate::fulltext_index::error::Result; diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 16064e5aeeac..aa5966b218c3 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -16,28 +16,33 @@ use std::path::Path; use async_trait::async_trait; use snafu::{OptionExt, ResultExt}; -use tantivy::schema::{Schema, TEXT}; +use tantivy::indexer::NoMergePolicy; +use tantivy::schema::{Schema, STORED, TEXT}; use tantivy::store::{Compressor, ZstdCompressor}; use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager}; -use tantivy::{doc, Index, SingleSegmentIndexWriter}; +use tantivy::{doc, Index, IndexWriter}; use tantivy_jieba::JiebaTokenizer; use crate::fulltext_index::create::FulltextIndexCreator; -use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, Result, TantivySnafu}; +use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu}; use crate::fulltext_index::{Analyzer, Config}; pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; +pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid"; /// `TantivyFulltextIndexCreator` is a fulltext index creator using tantivy. -/// -/// Here use a single segment to store the index so the maximum capacity for -/// the index is limited to 2<<31 rows (around 2 billion rows). pub struct TantivyFulltextIndexCreator { /// The tantivy index writer. - writer: Option, + writer: Option, /// The field for the text. text_field: tantivy::schema::Field, + + /// The field for the row id. + rowid_field: tantivy::schema::Field, + + /// The current max row id. + max_rowid: u64, } impl TantivyFulltextIndexCreator { @@ -51,6 +56,7 @@ impl TantivyFulltextIndexCreator { let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT); + let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED); let schema = schema_builder.build(); let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?; @@ -59,10 +65,17 @@ impl TantivyFulltextIndexCreator { let memory_limit = Self::sanitize_memory_limit(memory_limit); - let writer = SingleSegmentIndexWriter::new(index, memory_limit).context(TantivySnafu)?; + // Use one thread to keep order of the row id. + let writer = index + .writer_with_num_threads(1, memory_limit) + .context(TantivySnafu)?; + writer.set_merge_policy(Box::new(NoMergePolicy)); + Ok(Self { writer: Some(writer), text_field, + rowid_field, + max_rowid: 0, }) } @@ -86,7 +99,7 @@ impl TantivyFulltextIndexCreator { // Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX} const MARGIN_IN_BYTES: usize = 1_000_000; const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize; - const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES; + const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES - 1; memory_limit.clamp(MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX) } @@ -96,20 +109,25 @@ impl TantivyFulltextIndexCreator { impl FulltextIndexCreator for TantivyFulltextIndexCreator { async fn push_text(&mut self, text: &str) -> Result<()> { let writer = self.writer.as_mut().context(FinishedSnafu)?; - let doc = doc!(self.text_field => text); - writer.add_document(doc).context(TantivySnafu) + let doc = doc!(self.text_field => text, self.rowid_field => self.max_rowid); + self.max_rowid += 1; + writer.add_document(doc).context(TantivySnafu)?; + Ok(()) } async fn finish(&mut self) -> Result<()> { - let writer = self.writer.take().context(FinishedSnafu)?; - writer.finalize().map(|_| ()).context(TantivySnafu) + let mut writer = self.writer.take().context(FinishedSnafu)?; + common_runtime::spawn_blocking_global(move || { + writer.commit().context(TantivySnafu)?; + writer.wait_merging_threads().context(TantivySnafu) + }) + .await + .context(JoinSnafu)? } fn memory_usage(&self) -> usize { - self.writer - .as_ref() - .map(|writer| writer.mem_usage()) - .unwrap_or(0) + // Unable to get the memory usage of `IndexWriter`. + 0 } } @@ -118,6 +136,8 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use tantivy::collector::DocSetCollector; use tantivy::query::QueryParser; + use tantivy::schema::Value; + use tantivy::TantivyDocument; use super::*; @@ -235,7 +255,16 @@ mod tests { ); let query = query_parser.parse_query(query).unwrap(); let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + + let mut res = vec![]; + let rowid_field = searcher.schema().get_field(ROWID_FIELD_NAME).unwrap(); + for doc_addr in &docs { + let doc: TantivyDocument = searcher.doc(*doc_addr).unwrap(); + let rowid = doc.get_first(rowid_field).unwrap().as_u64().unwrap(); + assert_eq!(rowid as u32, doc_addr.doc_id); + res.push(rowid as u32); + } + res.sort(); assert_eq!(res, *expected); } diff --git a/src/index/src/fulltext_index/error.rs b/src/index/src/fulltext_index/error.rs index e4ecfc24fc80..26a433110449 100644 --- a/src/index/src/fulltext_index/error.rs +++ b/src/index/src/fulltext_index/error.rs @@ -19,6 +19,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; +use tantivy::DocAddress; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -48,6 +49,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Tantivy doc not found"))] + TantivyDocNotFound { + #[snafu(implicit)] + location: Location, + doc_addr: DocAddress, + }, + #[snafu(display("Operate on a finished creator"))] Finished { #[snafu(implicit)] @@ -60,6 +68,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Join error"))] + Join { + #[snafu(source)] + error: tokio::task::JoinError, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -67,10 +83,10 @@ impl ErrorExt for Error { use Error::*; match self { - Tantivy { .. } => StatusCode::Internal, + Tantivy { .. } | TantivyDocNotFound { .. } => StatusCode::Internal, TantivyParser { .. } => StatusCode::InvalidSyntax, - Io { .. } | Finished { .. } => StatusCode::Unexpected, + Io { .. } | Finished { .. } | Join { .. } => StatusCode::Unexpected, External { source, .. } => source.status_code(), } diff --git a/src/index/src/fulltext_index/search/tantivy.rs b/src/index/src/fulltext_index/search/tantivy.rs index 0927f5dbfdef..61c87e863f13 100644 --- a/src/index/src/fulltext_index/search/tantivy.rs +++ b/src/index/src/fulltext_index/search/tantivy.rs @@ -12,20 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use std::path::Path; use std::time::Instant; use async_trait::async_trait; use common_telemetry::debug; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tantivy::collector::DocSetCollector; use tantivy::query::QueryParser; -use tantivy::schema::Field; -use tantivy::{Index, IndexReader, ReloadPolicy}; +use tantivy::schema::{Field, Value}; +use tantivy::{Index, IndexReader, ReloadPolicy, TantivyDocument}; -use crate::fulltext_index::create::TEXT_FIELD_NAME; -use crate::fulltext_index::error::{Result, TantivyParserSnafu, TantivySnafu}; +use crate::fulltext_index::create::{ROWID_FIELD_NAME, TEXT_FIELD_NAME}; +use crate::fulltext_index::error::{ + Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu, +}; use crate::fulltext_index::search::{FulltextIndexSearcher, RowId}; /// `TantivyFulltextIndexSearcher` is a searcher using Tantivy. @@ -77,9 +79,48 @@ impl FulltextIndexSearcher for TantivyFulltextIndexSearcher { let query = query_parser .parse_query(query) .context(TantivyParserSnafu)?; - let docs = searcher + let doc_addrs = searcher .search(&query, &DocSetCollector) .context(TantivySnafu)?; - Ok(docs.into_iter().map(|d| d.doc_id).collect()) + + let seg_metas = self + .index + .searchable_segment_metas() + .context(TantivySnafu)?; + + // FAST PATH: only one segment, the doc id is the same as the row id. + // Also for compatibility with the old version. + if seg_metas.len() == 1 { + return Ok(doc_addrs.into_iter().map(|d| d.doc_id).collect()); + } + + // SLOW PATH: multiple segments, need to calculate the row id. + let rowid_field = searcher + .schema() + .get_field(ROWID_FIELD_NAME) + .context(TantivySnafu)?; + let mut seg_offsets = HashMap::with_capacity(seg_metas.len()); + let mut res = BTreeSet::new(); + for doc_addr in doc_addrs { + let offset = if let Some(offset) = seg_offsets.get(&doc_addr.segment_ord) { + *offset + } else { + // Calculate the offset at the first time meeting the segment and cache it since + // the offset is the same for all rows in the same segment. + let doc: TantivyDocument = searcher.doc(doc_addr).context(TantivySnafu)?; + let rowid = doc + .get_first(rowid_field) + .and_then(|v| v.as_u64()) + .context(TantivyDocNotFoundSnafu { doc_addr })?; + + let offset = rowid as u32 - doc_addr.doc_id; + seg_offsets.insert(doc_addr.segment_ord, offset); + offset + }; + + res.insert(doc_addr.doc_id + offset); + } + + Ok(res) } }