Skip to content

Commit

Permalink
fix(fulltext-index): single segment is not sufficient for >50M rows S…
Browse files Browse the repository at this point in the history
…ST (#4552)

* fix(fulltext-index): single segment is not sufficient for a >50M rows SST

Signed-off-by: Zhenchi <[email protected]>

* fix: update doc comment

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Aug 16, 2024
1 parent ec59ce5 commit c8de8b8
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
fst.workspace = true
futures.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/fulltext_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mod tantivy;

use async_trait::async_trait;
pub use tantivy::{TantivyFulltextIndexCreator, TEXT_FIELD_NAME};
pub use tantivy::{TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME};

use crate::fulltext_index::error::Result;

Expand Down
65 changes: 47 additions & 18 deletions src/index/src/fulltext_index/create/tantivy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@ use std::path::Path;

use async_trait::async_trait;
use snafu::{OptionExt, ResultExt};
use tantivy::schema::{Schema, TEXT};
use tantivy::indexer::NoMergePolicy;
use tantivy::schema::{Schema, STORED, TEXT};
use tantivy::store::{Compressor, ZstdCompressor};
use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager};
use tantivy::{doc, Index, SingleSegmentIndexWriter};
use tantivy::{doc, Index, IndexWriter};
use tantivy_jieba::JiebaTokenizer;

use crate::fulltext_index::create::FulltextIndexCreator;
use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, Result, TantivySnafu};
use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu};
use crate::fulltext_index::{Analyzer, Config};

pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text";
pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid";

/// `TantivyFulltextIndexCreator` is a fulltext index creator using tantivy.
///
/// Here use a single segment to store the index so the maximum capacity for
/// the index is limited to 2<<31 rows (around 2 billion rows).
pub struct TantivyFulltextIndexCreator {
/// The tantivy index writer.
writer: Option<SingleSegmentIndexWriter>,
writer: Option<IndexWriter>,

/// The field for the text.
text_field: tantivy::schema::Field,

/// The field for the row id.
rowid_field: tantivy::schema::Field,

/// The current max row id.
max_rowid: u64,
}

impl TantivyFulltextIndexCreator {
Expand All @@ -51,6 +56,7 @@ impl TantivyFulltextIndexCreator {

let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT);
let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED);
let schema = schema_builder.build();

let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?;
Expand All @@ -59,10 +65,17 @@ impl TantivyFulltextIndexCreator {

let memory_limit = Self::sanitize_memory_limit(memory_limit);

let writer = SingleSegmentIndexWriter::new(index, memory_limit).context(TantivySnafu)?;
// Use one thread to keep order of the row id.
let writer = index
.writer_with_num_threads(1, memory_limit)
.context(TantivySnafu)?;
writer.set_merge_policy(Box::new(NoMergePolicy));

Ok(Self {
writer: Some(writer),
text_field,
rowid_field,
max_rowid: 0,
})
}

Expand All @@ -86,7 +99,7 @@ impl TantivyFulltextIndexCreator {
// Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX}
const MARGIN_IN_BYTES: usize = 1_000_000;
const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES - 1;

memory_limit.clamp(MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX)
}
Expand All @@ -96,20 +109,25 @@ impl TantivyFulltextIndexCreator {
impl FulltextIndexCreator for TantivyFulltextIndexCreator {
async fn push_text(&mut self, text: &str) -> Result<()> {
let writer = self.writer.as_mut().context(FinishedSnafu)?;
let doc = doc!(self.text_field => text);
writer.add_document(doc).context(TantivySnafu)
let doc = doc!(self.text_field => text, self.rowid_field => self.max_rowid);
self.max_rowid += 1;
writer.add_document(doc).context(TantivySnafu)?;
Ok(())
}

async fn finish(&mut self) -> Result<()> {
let writer = self.writer.take().context(FinishedSnafu)?;
writer.finalize().map(|_| ()).context(TantivySnafu)
let mut writer = self.writer.take().context(FinishedSnafu)?;
common_runtime::spawn_blocking_global(move || {
writer.commit().context(TantivySnafu)?;
writer.wait_merging_threads().context(TantivySnafu)
})
.await
.context(JoinSnafu)?
}

fn memory_usage(&self) -> usize {
self.writer
.as_ref()
.map(|writer| writer.mem_usage())
.unwrap_or(0)
// Unable to get the memory usage of `IndexWriter`.
0
}
}

Expand All @@ -118,6 +136,8 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use tantivy::collector::DocSetCollector;
use tantivy::query::QueryParser;
use tantivy::schema::Value;
use tantivy::TantivyDocument;

use super::*;

Expand Down Expand Up @@ -235,7 +255,16 @@ mod tests {
);
let query = query_parser.parse_query(query).unwrap();
let docs = searcher.search(&query, &DocSetCollector).unwrap();
let mut res = docs.into_iter().map(|d| d.doc_id).collect::<Vec<_>>();

let mut res = vec![];
let rowid_field = searcher.schema().get_field(ROWID_FIELD_NAME).unwrap();
for doc_addr in &docs {
let doc: TantivyDocument = searcher.doc(*doc_addr).unwrap();
let rowid = doc.get_first(rowid_field).unwrap().as_u64().unwrap();
assert_eq!(rowid as u32, doc_addr.doc_id);
res.push(rowid as u32);
}

res.sort();
assert_eq!(res, *expected);
}
Expand Down
20 changes: 18 additions & 2 deletions src/index/src/fulltext_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use tantivy::DocAddress;

#[derive(Snafu)]
#[snafu(visibility(pub))]
Expand Down Expand Up @@ -48,6 +49,13 @@ pub enum Error {
location: Location,
},

#[snafu(display("Tantivy doc not found"))]
TantivyDocNotFound {
#[snafu(implicit)]
location: Location,
doc_addr: DocAddress,
},

#[snafu(display("Operate on a finished creator"))]
Finished {
#[snafu(implicit)]
Expand All @@ -60,17 +68,25 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Join error"))]
Join {
#[snafu(source)]
error: tokio::task::JoinError,
#[snafu(implicit)]
location: Location,
},
}

impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;

match self {
Tantivy { .. } => StatusCode::Internal,
Tantivy { .. } | TantivyDocNotFound { .. } => StatusCode::Internal,
TantivyParser { .. } => StatusCode::InvalidSyntax,

Io { .. } | Finished { .. } => StatusCode::Unexpected,
Io { .. } | Finished { .. } | Join { .. } => StatusCode::Unexpected,

External { source, .. } => source.status_code(),
}
Expand Down
57 changes: 49 additions & 8 deletions src/index/src/fulltext_index/search/tantivy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap};
use std::path::Path;
use std::time::Instant;

use async_trait::async_trait;
use common_telemetry::debug;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use tantivy::collector::DocSetCollector;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::{Index, IndexReader, ReloadPolicy};
use tantivy::schema::{Field, Value};
use tantivy::{Index, IndexReader, ReloadPolicy, TantivyDocument};

use crate::fulltext_index::create::TEXT_FIELD_NAME;
use crate::fulltext_index::error::{Result, TantivyParserSnafu, TantivySnafu};
use crate::fulltext_index::create::{ROWID_FIELD_NAME, TEXT_FIELD_NAME};
use crate::fulltext_index::error::{
Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu,
};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};

/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy.
Expand Down Expand Up @@ -77,9 +79,48 @@ impl FulltextIndexSearcher for TantivyFulltextIndexSearcher {
let query = query_parser
.parse_query(query)
.context(TantivyParserSnafu)?;
let docs = searcher
let doc_addrs = searcher
.search(&query, &DocSetCollector)
.context(TantivySnafu)?;
Ok(docs.into_iter().map(|d| d.doc_id).collect())

let seg_metas = self
.index
.searchable_segment_metas()
.context(TantivySnafu)?;

// FAST PATH: only one segment, the doc id is the same as the row id.
// Also for compatibility with the old version.
if seg_metas.len() == 1 {
return Ok(doc_addrs.into_iter().map(|d| d.doc_id).collect());
}

// SLOW PATH: multiple segments, need to calculate the row id.
let rowid_field = searcher
.schema()
.get_field(ROWID_FIELD_NAME)
.context(TantivySnafu)?;
let mut seg_offsets = HashMap::with_capacity(seg_metas.len());
let mut res = BTreeSet::new();
for doc_addr in doc_addrs {
let offset = if let Some(offset) = seg_offsets.get(&doc_addr.segment_ord) {
*offset
} else {
// Calculate the offset at the first time meeting the segment and cache it since
// the offset is the same for all rows in the same segment.
let doc: TantivyDocument = searcher.doc(doc_addr).context(TantivySnafu)?;
let rowid = doc
.get_first(rowid_field)
.and_then(|v| v.as_u64())
.context(TantivyDocNotFoundSnafu { doc_addr })?;

let offset = rowid as u32 - doc_addr.doc_id;
seg_offsets.insert(doc_addr.segment_ord, offset);
offset
};

res.insert(doc_addr.doc_id + offset);
}

Ok(res)
}
}

0 comments on commit c8de8b8

Please sign in to comment.