From cd08195510869709fcda8ef93cab00ac334050af Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 10:24:54 +0000 Subject: [PATCH 1/8] feat(bloom-filter): integrate indexer with mito2 Signed-off-by: Zhenchi --- src/datatypes/src/schema.rs | 2 +- src/index/src/bloom_filter/creator.rs | 129 ++++- .../bloom_filter/creator/finalize_segment.rs | 15 +- src/index/src/bloom_filter/reader.rs | 2 +- src/mito2/src/compaction/compactor.rs | 14 +- src/mito2/src/error.rs | 24 +- src/mito2/src/flush.rs | 14 +- src/mito2/src/sst/file.rs | 7 + src/mito2/src/sst/index.rs | 147 ++++- src/mito2/src/sst/index/bloom_filter.rs | 17 + .../src/sst/index/bloom_filter/creator.rs | 530 ++++++++++++++++++ .../sst/index/{inverted_index => }/codec.rs | 0 .../src/sst/index/fulltext_index/creator.rs | 16 +- src/mito2/src/sst/index/indexer/abort.rs | 22 + src/mito2/src/sst/index/indexer/finish.rs | 65 ++- src/mito2/src/sst/index/indexer/update.rs | 28 + src/mito2/src/sst/index/intermediate.rs | 157 ++++++ src/mito2/src/sst/index/inverted_index.rs | 1 - .../index/inverted_index/applier/builder.rs | 2 +- .../src/sst/index/inverted_index/creator.rs | 9 +- .../inverted_index/creator/temp_provider.rs | 182 ------ 21 files changed, 1145 insertions(+), 238 deletions(-) create mode 100644 src/mito2/src/sst/index/bloom_filter.rs create mode 100644 src/mito2/src/sst/index/bloom_filter/creator.rs rename src/mito2/src/sst/index/{inverted_index => }/codec.rs (100%) delete mode 100644 src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index c537a4608b42..2dcaf3ecb52a 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -28,7 +28,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ - ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions, + ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkipIndexType, SkippingIndexOptions, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index 2f10521559a5..da95334782a7 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -73,7 +73,7 @@ impl BloomFilterCreator { /// `rows_per_segment` <= 0 pub fn new( rows_per_segment: usize, - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -96,6 +96,49 @@ impl BloomFilterCreator { } } + /// Adds multiple rows of elements to the bloom filter. If the number of accumulated rows + /// reaches `rows_per_segment`, it finalizes the current segment. + pub async fn push_n_row_elems( + &mut self, + mut nrows: usize, + elems: impl IntoIterator, + ) -> Result<()> { + if nrows == 0 { + return Ok(()); + } + if nrows == 1 { + return self.push_row_elems(elems).await; + } + + let elems = elems.into_iter().collect::>(); + while nrows > 0 { + let rows_to_seg_end = + self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment); + let rows_to_push = nrows.min(rows_to_seg_end); + nrows -= rows_to_push; + + self.accumulated_row_count += rows_to_push; + + let mut mem_diff = 0; + for elem in &elems { + let len = elem.len(); + let is_new = self.cur_seg_distinct_elems.insert(elem.clone()); + if is_new { + mem_diff += len; + } + } + self.cur_seg_distinct_elems_mem_usage += mem_diff; + self.global_memory_usage + .fetch_add(mem_diff, Ordering::Relaxed); + + if self.accumulated_row_count % self.rows_per_segment == 0 { + self.finalize_segment().await?; + } + } + + Ok(()) + } + /// Adds a row of elements to the bloom filter. If the number of accumulated rows /// reaches `rows_per_segment`, it finalizes the current segment. pub async fn push_row_elems(&mut self, elems: impl IntoIterator) -> Result<()> { @@ -181,6 +224,13 @@ impl BloomFilterCreator { } } +impl Drop for BloomFilterCreator { + fn drop(&mut self) { + self.global_memory_usage + .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed); + } +} + #[cfg(test)] mod tests { use fastbloom::BloomFilter; @@ -202,7 +252,7 @@ mod tests { let mut writer = Cursor::new(Vec::new()); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); @@ -266,4 +316,79 @@ mod tests { assert!(bfs[1].contains(&b"e")); assert!(bfs[1].contains(&b"f")); } + + #[tokio::test] + async fn test_bloom_filter_creator_batch_push() { + let mut writer = Cursor::new(Vec::new()); + let mut creator = BloomFilterCreator::new( + 2, + Arc::new(MockExternalTempFileProvider::new()), + Arc::new(AtomicUsize::new(0)), + None, + ); + + creator + .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()]) + .await + .unwrap(); + assert!(creator.cur_seg_distinct_elems_mem_usage > 0); + assert!(creator.memory_usage() > 0); + + creator + .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()]) + .await + .unwrap(); + assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0); + assert!(creator.memory_usage() > 0); + + creator + .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()]) + .await + .unwrap(); + assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0); + assert!(creator.memory_usage() > 0); + + creator.finish(&mut writer).await.unwrap(); + + let bytes = writer.into_inner(); + let total_size = bytes.len(); + let meta_size_offset = total_size - 4; + let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap()); + + let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4]; + let meta: BloomFilterMeta = serde_json::from_slice(meta_bytes).unwrap(); + + assert_eq!(meta.rows_per_segment, 2); + assert_eq!(meta.seg_count, 10); + assert_eq!(meta.row_count, 20); + assert_eq!( + meta.bloom_filter_segments_size + meta_bytes.len() + 4, + total_size + ); + + let mut bfs = Vec::new(); + for segment in meta.bloom_filter_segments { + let bloom_filter_bytes = + &bytes[segment.offset as usize..(segment.offset + segment.size) as usize]; + let v = u64_vec_from_bytes(bloom_filter_bytes); + let bloom_filter = BloomFilter::from_vec(v) + .seed(&SEED) + .expected_items(segment.elem_count); + bfs.push(bloom_filter); + } + + assert_eq!(bfs.len(), 10); + for bf in bfs.iter().take(3) { + assert!(bf.contains(&b"a")); + assert!(bf.contains(&b"b")); + } + for bf in bfs.iter().take(5).skip(2) { + assert!(bf.contains(&b"c")); + assert!(bf.contains(&b"d")); + } + for bf in bfs.iter().take(10).skip(5) { + assert!(bf.contains(&b"e")); + assert!(bf.contains(&b"f")); + } + } } diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs index 65b090de3eee..e97652f5fc6a 100644 --- a/src/index/src/bloom_filter/creator/finalize_segment.rs +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -43,7 +43,7 @@ pub struct FinalizedBloomFilterStorage { intermediate_prefix: String, /// The provider for intermediate Bloom filter files. - intermediate_provider: Box, + intermediate_provider: Arc, /// The memory usage of the in-memory Bloom filters. memory_usage: usize, @@ -59,7 +59,7 @@ pub struct FinalizedBloomFilterStorage { impl FinalizedBloomFilterStorage { /// Creates a new `FinalizedBloomFilterStorage`. pub fn new( - intermediate_provider: Box, + intermediate_provider: Arc, global_memory_usage: Arc, global_memory_usage_threshold: Option, ) -> Self { @@ -132,7 +132,7 @@ impl FinalizedBloomFilterStorage { /// Drains the storage and returns a stream of finalized Bloom filter segments. pub async fn drain( &mut self, - ) -> Result> + '_>>> { + ) -> Result> + Send + '_>>> { // FAST PATH: memory only if self.intermediate_file_id_counter == 0 { return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok)))); @@ -183,6 +183,13 @@ impl FinalizedBloomFilterStorage { } } +impl Drop for FinalizedBloomFilterStorage { + fn drop(&mut self) { + self.global_memory_usage + .fetch_sub(self.memory_usage, Ordering::Relaxed); + } +} + /// A finalized Bloom filter segment. #[derive(Debug, Clone, PartialEq, Eq)] pub struct FinalizedBloomFilterSegment { @@ -250,7 +257,7 @@ mod tests { let global_memory_usage = Arc::new(AtomicUsize::new(0)); let global_memory_usage_threshold = Some(1024 * 1024); // 1MB - let provider = Box::new(mock_provider); + let provider = Arc::new(mock_provider); let mut storage = FinalizedBloomFilterStorage::new( provider, global_memory_usage.clone(), diff --git a/src/index/src/bloom_filter/reader.rs b/src/index/src/bloom_filter/reader.rs index 788afe033124..6dc592100fcf 100644 --- a/src/index/src/bloom_filter/reader.rs +++ b/src/index/src/bloom_filter/reader.rs @@ -190,7 +190,7 @@ mod tests { let mut writer = Cursor::new(vec![]); let mut creator = BloomFilterCreator::new( 2, - Box::new(MockExternalTempFileProvider::new()), + Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), None, ); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e2499140fd61..e7d5e779b675 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -21,7 +21,6 @@ use common_telemetry::{info, warn}; use common_time::TimeToLive; use object_store::manager::ObjectStoreManagerRef; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -41,7 +40,7 @@ use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; use crate::schedule::scheduler::LocalScheduler; -use crate::sst::file::{FileMeta, IndexType}; +use crate::sst::file::FileMeta; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -336,16 +335,7 @@ impl Compactor for DefaultCompactor { time_range: sst_info.time_range, level: output.output_level, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 82b86a21554c..2e7dd4a461ed 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -806,8 +806,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to retrieve fulltext options from column metadata"))] - FulltextOptions { + #[snafu(display("Failed to retrieve index options from column metadata"))] + IndexOptions { #[snafu(implicit)] location: Location, source: datatypes::error::Error, @@ -894,6 +894,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to push value to bloom filter"))] + PushBloomFilterValue { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to finish bloom filter"))] + BloomFilterFinish { + source: index::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1019,7 +1033,7 @@ impl ErrorExt for Error { UnsupportedOperation { .. } => StatusCode::Unsupported, RemoteCompaction { .. } => StatusCode::Unexpected, - FulltextOptions { source, .. } => source.status_code(), + IndexOptions { source, .. } => source.status_code(), CreateFulltextCreator { source, .. } => source.status_code(), CastVector { source, .. } => source.status_code(), FulltextPushText { source, .. } @@ -1029,6 +1043,10 @@ impl ErrorExt for Error { RegionBusy { .. } => StatusCode::RegionBusy, GetSchemaMetadata { source, .. } => source.status_code(), Timeout { .. } => StatusCode::Cancelled, + + PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => { + source.status_code() + } } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index b522f225f9f0..64a739068ad9 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{debug, error, info, trace}; -use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use strum::IntoStaticStr; @@ -45,7 +44,7 @@ use crate::request::{ SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{Job, SchedulerRef}; -use crate::sst::file::{FileId, FileMeta, IndexType}; +use crate::sst::file::{FileId, FileMeta}; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -378,16 +377,7 @@ impl RegionFlushTask { time_range: sst_info.time_range, level: 0, file_size: sst_info.file_size, - available_indexes: { - let mut indexes = SmallVec::new(); - if sst_info.index_metadata.inverted_index.is_available() { - indexes.push(IndexType::InvertedIndex); - } - if sst_info.index_metadata.fulltext_index.is_available() { - indexes.push(IndexType::FulltextIndex); - } - indexes - }, + available_indexes: sst_info.index_metadata.build_available_indexes(), index_file_size: sst_info.index_metadata.file_size, num_rows: sst_info.num_rows as u64, num_row_groups: sst_info.num_row_groups, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 5a9932ab433b..844d3e5d08f8 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -143,6 +143,8 @@ pub enum IndexType { InvertedIndex, /// Full-text index. FulltextIndex, + /// Bloom filter. + BloomFilter, } impl FileMeta { @@ -156,6 +158,11 @@ impl FileMeta { self.available_indexes.contains(&IndexType::FulltextIndex) } + /// Returns true if the file has a bloom filter + pub fn bloom_filter_available(&self) -> bool { + self.available_indexes.contains(&IndexType::BloomFilter) + } + /// Returns the size of the inverted index file pub fn inverted_index_size(&self) -> Option { if self.available_indexes.len() == 1 && self.inverted_index_available() { diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1972f3d7abb6..84f0736ac27a 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod bloom_filter; +mod codec; pub(crate) mod fulltext_index; mod indexer; pub(crate) mod intermediate; @@ -22,8 +24,10 @@ pub(crate) mod store; use std::num::NonZeroUsize; +use bloom_filter::creator::BloomFilterIndexer; use common_telemetry::{debug, warn}; use puffin_manager::SstPuffinManager; +use smallvec::SmallVec; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; @@ -33,13 +37,14 @@ use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; -use crate::sst::file::FileId; +use crate::sst::file::{FileId, IndexType}; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; +pub(crate) const TYPE_BLOOM_FILTER: &str = "bloom_filter"; /// Output of the index creation. #[derive(Debug, Clone, Default)] @@ -50,6 +55,24 @@ pub struct IndexOutput { pub inverted_index: InvertedIndexOutput, /// Fulltext index output. pub fulltext_index: FulltextIndexOutput, + /// Bloom filter output. + pub bloom_filter: BloomFilterOutput, +} + +impl IndexOutput { + pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> { + let mut indexes = SmallVec::new(); + if self.inverted_index.is_available() { + indexes.push(IndexType::InvertedIndex); + } + if self.fulltext_index.is_available() { + indexes.push(IndexType::FulltextIndex); + } + if self.bloom_filter.is_available() { + indexes.push(IndexType::BloomFilter); + } + indexes + } } /// Base output of the index creation. @@ -73,6 +96,8 @@ impl IndexBaseOutput { pub type InvertedIndexOutput = IndexBaseOutput; /// Output of the fulltext index creation. pub type FulltextIndexOutput = IndexBaseOutput; +/// Output of the bloom filter creation. +pub type BloomFilterOutput = IndexBaseOutput; /// The index creator that hides the error handling details. #[derive(Default)] @@ -86,6 +111,8 @@ pub struct Indexer { last_mem_inverted_index: usize, fulltext_indexer: Option, last_mem_fulltext_index: usize, + bloom_filter_indexer: Option, + last_mem_bloom_filter: usize, } impl Indexer { @@ -129,6 +156,15 @@ impl Indexer { .with_label_values(&[TYPE_FULLTEXT_INDEX]) .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64); self.last_mem_fulltext_index = fulltext_mem; + + let bloom_filter_mem = self + .bloom_filter_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()); + INDEX_CREATE_MEMORY_USAGE + .with_label_values(&[TYPE_BLOOM_FILTER]) + .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64); + self.last_mem_bloom_filter = bloom_filter_mem; } } @@ -158,7 +194,11 @@ impl<'a> IndexerBuilder<'a> { indexer.inverted_indexer = self.build_inverted_indexer(); indexer.fulltext_indexer = self.build_fulltext_indexer().await; - if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() { + indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(); + if indexer.inverted_indexer.is_none() + && indexer.fulltext_indexer.is_none() + && indexer.bloom_filter_indexer.is_none() + { indexer.abort().await; return Indexer::default(); } @@ -278,6 +318,53 @@ impl<'a> IndexerBuilder<'a> { None } + + fn build_bloom_filter_indexer(&self) -> Option { + let create = true; // TODO(zhongzc): add config for bloom filter + + if !create { + debug!( + "Skip creating bloom filter due to config, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return None; + } + + let mem_limit = Some(100 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let indexer = BloomFilterIndexer::new( + self.file_id, + self.metadata, + self.intermediate_manager.clone(), + mem_limit, + ); + + let err = match indexer { + Ok(indexer) => { + if indexer.is_none() { + debug!( + "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + return indexer; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create bloom filter, region_id: {}, file_id: {}, err: {}", + self.metadata.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to create bloom filter, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + } + + None + } } #[cfg(test)] @@ -286,7 +373,7 @@ mod tests { use api::v1::SemanticType; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, FulltextOptions}; + use datatypes::schema::{ColumnSchema, FulltextOptions, SkipIndexType, SkippingIndexOptions}; use object_store::services::Memory; use object_store::ObjectStore; use puffin_manager::PuffinManagerFactory; @@ -298,12 +385,14 @@ mod tests { struct MetaConfig { with_tag: bool, with_fulltext: bool, + with_skipping_bloom: bool, } fn mock_region_metadata( MetaConfig { with_tag, with_fulltext, + with_skipping_bloom, }: MetaConfig, ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); @@ -354,6 +443,24 @@ mod tests { builder.push_column_metadata(column); } + if with_skipping_bloom { + let column_schema = + ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false) + .with_skipping_options(SkippingIndexOptions { + granularity: 42, + index_type: SkipIndexType::BloomFilter, + }) + .unwrap(); + + let column = ColumnMetadata { + column_schema, + semantic_type: SemanticType::Field, + column_id: 5, + }; + + builder.push_column_metadata(column); + } + Arc::new(builder.build().unwrap()) } @@ -374,6 +481,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -392,6 +500,7 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); } #[tokio::test] @@ -403,6 +512,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -456,6 +566,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: false, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -474,10 +585,12 @@ mod tests { assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: false, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, @@ -486,7 +599,7 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), @@ -496,6 +609,31 @@ mod tests { assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + with_skipping_bloom: false, + }); + let indexer = IndexerBuilder { + op_type: OperationType::Flush, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -507,6 +645,7 @@ mod tests { let metadata = mock_region_metadata(MetaConfig { with_tag: true, with_fulltext: true, + with_skipping_bloom: true, }); let indexer = IndexerBuilder { op_type: OperationType::Flush, diff --git a/src/mito2/src/sst/index/bloom_filter.rs b/src/mito2/src/sst/index/bloom_filter.rs new file mode 100644 index 000000000000..347195a3b16b --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod creator; + +const INDEX_BLOB_TYPE: &str = "greptime-bloom-filter-v1"; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs new file mode 100644 index 000000000000..1517a38a2efc --- /dev/null +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -0,0 +1,530 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use common_telemetry::warn; +use datatypes::schema::SkipIndexType; +use index::bloom_filter::creator::BloomFilterCreator; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::ColumnId; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::error::{ + BiErrorsSnafu, BloomFilterFinishSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, + PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, +}; +use crate::read::Batch; +use crate::row_converter::SortField; +use crate::sst::file::FileId; +use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; +use crate::sst::index::TYPE_BLOOM_FILTER; + +/// The buffer size for the pipe used to send index data to the puffin blob. +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// The indexer for the bloom filter index. +pub struct BloomFilterIndexer { + /// The bloom filter creators. + creators: HashMap, + + /// The provider for intermediate files. + temp_file_provider: Arc, + + /// Codec for decoding primary keys. + codec: IndexValuesCodec, + + /// Whether the indexing process has been aborted. + aborted: bool, + + /// The statistics of the indexer. + stats: Statistics, + + /// The global memory usage. + global_memory_usage: Arc, +} + +impl BloomFilterIndexer { + /// Creates a new bloom filter indexer. + pub fn new( + sst_file_id: FileId, + metadata: &RegionMetadataRef, + intermediate_manager: IntermediateManager, + memory_usage_threshold: Option, + ) -> Result> { + let mut creators = HashMap::new(); + + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(&metadata.region_id, &sst_file_id), + intermediate_manager, + )); + let global_memory_usage = Arc::new(AtomicUsize::new(0)); + + for column in &metadata.column_metadatas { + let options = + column + .column_schema + .skipping_index_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; + + let options = match options { + Some(options) if options.index_type == SkipIndexType::BloomFilter => options, + _ => continue, + }; + + let creator = BloomFilterCreator::new( + options.granularity as _, + temp_file_provider.clone(), + global_memory_usage.clone(), + memory_usage_threshold, + ); + creators.insert(column.column_id, creator); + } + + if creators.is_empty() { + return Ok(None); + } + + let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); + let indexer = Self { + creators, + temp_file_provider, + codec, + aborted: false, + stats: Statistics::new(TYPE_BLOOM_FILTER), + global_memory_usage, + }; + Ok(Some(indexer)) + } + + /// Updates index with a batch of rows. + /// Garbage will be cleaned up if failed to update. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn update(&mut self, batch: &Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.creators.is_empty() { + return Ok(()); + } + + if let Err(update_err) = self.do_update(batch).await { + // clean up garbage if failed to update + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Finishes index creation and cleans up garbage. + /// Returns the number of rows and bytes written. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if self.stats.row_count() == 0 { + // no IO is performed, no garbage to clean up, just return + return Ok((0, 0)); + } + + let finish_res = self.do_finish(puffin_writer).await; + // clean up garbage no matter finish successfully or not + if let Err(err) = self.do_cleanup().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to clean up index creator, err: {err}",); + } else { + warn!(err; "Failed to clean up index creator"); + } + } + + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) + } + + /// Aborts index creation and clean up garbage. + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + self.aborted = true; + + self.do_cleanup().await + } + + async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + + let n = batch.num_rows(); + guard.inc_row_count(n); + + // Tags + for ((col_id, _), field, value) in self.codec.decode(batch.primary_key())? { + let Some(creator) = self.creators.get_mut(col_id) else { + continue; + }; + let elems = value + .map(|v| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(v.as_value_ref(), field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + creator + .push_n_row_elems(n, elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + + // Fields + for field in batch.fields() { + let Some(creator) = self.creators.get_mut(&field.column_id) else { + continue; + }; + + let sort_field = SortField::new(field.data.data_type()); + for i in 0..n { + let value = field.data.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)?; + Ok(buf) + }) + .transpose()?; + + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + } + Ok(()) + } + + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { + let mut guard = self.stats.record_finish(); + + for (id, creator) in &mut self.creators { + let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?; + guard.inc_byte_count(written_bytes); + } + + Ok(()) + } + + async fn do_cleanup(&mut self) -> Result<()> { + let mut _guard = self.stats.record_cleanup(); + + self.creators.clear(); + self.temp_file_provider.cleanup().await + } + + /// Data flow of finishing index: + /// + /// ```text + /// (In Memory Buffer) + /// ┌──────┐ + /// ┌─────────────┐ │ PIPE │ + /// │ │ write index data │ │ + /// │ IndexWriter ├──────────────────►│ tx │ + /// │ │ │ │ + /// └─────────────┘ │ │ + /// ┌─────────────────┤ rx │ + /// ┌─────────────┐ │ read as blob └──────┘ + /// │ │ │ + /// │ PuffinWriter├─┤ + /// │ │ │ copy to file ┌──────┐ + /// └─────────────┘ └────────────────►│ File │ + /// └──────┘ + /// ``` + /// + /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator` + async fn do_finish_single_creator( + col_id: &ColumnId, + creator: &mut BloomFilterCreator, + puffin_writer: &mut SstPuffinWriter, + ) -> Result { + let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + + let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); + let (index_finish, puffin_add_blob) = futures::join!( + creator.finish(tx.compat_write()), + puffin_writer.put_blob(&blob_name, rx.compat(), PutOptions::default()) + ); + + match ( + puffin_add_blob.context(PuffinAddBlobSnafu), + index_finish.context(BloomFilterFinishSnafu), + ) { + (Err(e1), Err(e2)) => BiErrorsSnafu { + first: Box::new(e1), + second: Box::new(e2), + } + .fail()?, + + (Ok(_), e @ Err(_)) => e?, + (e @ Err(_), Ok(_)) => e.map(|_| ())?, + (Ok(written_bytes), Ok(_)) => { + return Ok(written_bytes); + } + } + + Ok(0) + } + + /// Returns the memory usage of the indexer. + pub fn memory_usage(&self) -> usize { + self.global_memory_usage + .load(std::sync::atomic::Ordering::Relaxed) + } + + /// Returns the column ids to be indexed. + pub fn column_ids(&self) -> impl Iterator + use<'_> { + self.creators.keys().copied() + } +} + +#[cfg(test)] +mod tests { + use std::iter; + + use api::v1::SemanticType; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, SkippingIndexOptions}; + use datatypes::value::ValueRef; + use datatypes::vectors::{UInt64Vector, UInt8Vector}; + use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; + use object_store::services::Memory; + use object_store::ObjectStore; + use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + use crate::read::BatchColumn; + use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::sst::index::puffin_manager::PuffinManagerFactory; + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() + } + + /// tag_str: + /// - type: string + /// - index: bloom filter + /// - granularity: 2 + /// - column_id: 1 + /// + /// ts: + /// - type: timestamp + /// - index: time index + /// - column_id: 2 + /// + /// field_u64: + /// - type: uint64 + /// - index: bloom filter + /// - granularity: 4 + /// - column_id: 3 + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_str", + ConcreteDataType::string_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkipIndexType::BloomFilter, + granularity: 2, + }) + .unwrap(), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_u64", + ConcreteDataType::uint64_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + index_type: SkipIndexType::BloomFilter, + granularity: 4, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .primary_key(vec![1]); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch(str_tag: impl AsRef, u64_field: impl IntoIterator) -> Batch { + let fields = vec![SortField::new(ConcreteDataType::string_datatype())]; + let codec = McmpRowCodec::new(fields); + let row: [ValueRef; 1] = [str_tag.as_ref().into()]; + let primary_key = codec.encode(row.into_iter()).unwrap(); + + let u64_field = BatchColumn { + column_id: 3, + data: Arc::new(UInt64Vector::from_iter_values(u64_field)), + }; + let num_rows = u64_field.data.len(); + + Batch::new( + primary_key, + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt64Vector::from_iter_values( + iter::repeat(0).take(num_rows), + )), + Arc::new(UInt8Vector::from_iter_values( + iter::repeat(1).take(num_rows), + )), + vec![u64_field], + ) + .unwrap() + } + + #[tokio::test] + async fn test_bloom_filter_indexer() { + let prefix = "test_bloom_filter_indexer_"; + let object_store = mock_object_store(); + let intm_mgr = new_intm_mgr(prefix).await; + let region_metadata = mock_region_metadata(); + let memory_usage_threshold = Some(1024); + + let mut indexer = BloomFilterIndexer::new( + FileId::random(), + ®ion_metadata, + intm_mgr, + memory_usage_threshold, + ) + .unwrap() + .unwrap(); + + // push 20 rows + let batch = new_batch("tag1", 0..10); + indexer.update(&batch).await.unwrap(); + + let batch = new_batch("tag2", 10..20); + indexer.update(&batch).await.unwrap(); + + let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; + let puffin_manager = factory.build(object_store); + + let index_file_name = "index_file"; + let mut puffin_writer = puffin_manager.writer(index_file_name).await.unwrap(); + let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap(); + assert_eq!(row_count, 20); + assert!(byte_count > 0); + puffin_writer.finish().await.unwrap(); + + let puffin_reader = puffin_manager.reader(index_file_name).await.unwrap(); + + // tag_str + { + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-1") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 10); + for i in 0..5 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag1")); + } + for i in 5..10 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i]) + .await + .unwrap(); + assert!(bf.contains(b"tag2")); + } + } + + // field_u64 + { + let sort_field = SortField::new(ConcreteDataType::uint64_datatype()); + + let blob_guard = puffin_reader + .blob("greptime-bloom-filter-v1-3") + .await + .unwrap(); + let reader = blob_guard.reader().await.unwrap(); + let mut bloom_filter = BloomFilterReaderImpl::new(reader); + let metadata = bloom_filter.metadata().await.unwrap(); + + assert_eq!(metadata.bloom_filter_segments.len(), 5); + for i in 0u64..20 { + let bf = bloom_filter + .bloom_filter(&metadata.bloom_filter_segments[i as usize / 4]) + .await + .unwrap(); + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf) + .unwrap(); + + assert!(bf.contains(&buf)); + } + } + } +} diff --git a/src/mito2/src/sst/index/inverted_index/codec.rs b/src/mito2/src/sst/index/codec.rs similarity index 100% rename from src/mito2/src/sst/index/inverted_index/codec.rs rename to src/mito2/src/sst/index/codec.rs diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 416e39d9dd5e..41fa15bd7c72 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -27,8 +27,7 @@ use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, - FulltextOptionsSnafu, FulltextPushTextSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, - Result, + FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, }; use crate::read::Batch; use crate::sst::file::FileId; @@ -61,13 +60,12 @@ impl FulltextIndexer { let mut creators = HashMap::new(); for column in &metadata.column_metadatas { - let options = - column - .column_schema - .fulltext_options() - .context(FulltextOptionsSnafu { - column_name: &column.column_schema.name, - })?; + let options = column + .column_schema + .fulltext_options() + .context(IndexOptionsSnafu { + column_name: &column.column_schema.name, + })?; // Relax the type constraint here as many types can be casted to string. diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 68034d48fb29..4a49bc4ee84c 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -20,6 +20,7 @@ impl Indexer { pub(crate) async fn do_abort(&mut self) { self.do_abort_inverted_index().await; self.do_abort_fulltext_index().await; + self.do_abort_bloom_filter().await; self.puffin_manager = None; } @@ -64,4 +65,25 @@ impl Indexer { ); } } + + async fn do_abort_bloom_filter(&mut self) { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return; + }; + let Err(err) = indexer.abort().await else { + return; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to abort bloom filter, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to abort bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index a0157a9b66f4..74c693f6a5b4 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -15,11 +15,14 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; +use crate::sst::index::bloom_filter::creator::BloomFilterIndexer; use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; -use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput}; +use crate::sst::index::{ + BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput, +}; impl Indexer { pub(crate) async fn do_finish(&mut self) -> IndexOutput { @@ -46,6 +49,12 @@ impl Indexer { return IndexOutput::default(); } + let success = self.do_finish_bloom_filter(&mut writer, &mut output).await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + output.file_size = self.do_finish_puffin_writer(writer).await; output } @@ -169,6 +178,43 @@ impl Indexer { false } + async fn do_finish_bloom_filter( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.bloom_filter_indexer.take() else { + return true; + }; + + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_bloom_filter_output( + &mut index_output.bloom_filter, + row_count, + byte_count, + &indexer, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish bloom filter, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + fn fill_inverted_index_output( &mut self, output: &mut InvertedIndexOutput, @@ -202,4 +248,21 @@ impl Indexer { output.row_count = row_count; output.columns = indexer.column_ids().collect(); } + + fn fill_bloom_filter_output( + &mut self, + output: &mut BloomFilterOutput, + row_count: RowCount, + byte_count: ByteCount, + indexer: &BloomFilterIndexer, + ) { + debug!( + "Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = indexer.column_ids().collect(); + } } diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index c08f171bb415..e93762a6322f 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -29,6 +29,9 @@ impl Indexer { if !self.do_update_fulltext_index(batch).await { self.do_abort().await; } + if !self.do_update_bloom_filter(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -80,4 +83,29 @@ impl Indexer { false } + + /// Returns false if the update failed. + async fn do_update_bloom_filter(&mut self, batch: &Batch) -> bool { + let Some(creator) = self.bloom_filter_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update bloom filter, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update bloom filter, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index d0da804c745b..fd8845f96ac3 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -14,13 +14,25 @@ use std::path::PathBuf; +use async_trait::async_trait; +use common_error::ext::BoxedError; use common_telemetry::warn; +use futures::{AsyncRead, AsyncWrite}; +use index::error as index_error; +use index::error::Result as IndexResult; +use index::external_provider::ExternalTempFileProvider; use object_store::util::{self, normalize_dir}; +use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; use uuid::Uuid; use crate::access_layer::new_fs_cache_store; use crate::error::Result; +use crate::metrics::{ + INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, +}; use crate::sst::file::FileId; use crate::sst::index::store::InstrumentedStore; @@ -129,14 +141,105 @@ impl IntermediateLocation { } } +/// `TempFileProvider` implements `ExternalTempFileProvider`. +/// It uses `InstrumentedStore` to create and read intermediate files. +pub(crate) struct TempFileProvider { + /// Provides the location of intermediate files. + location: IntermediateLocation, + /// Provides store to access to intermediate files. + manager: IntermediateManager, +} + +#[async_trait] +impl ExternalTempFileProvider for TempFileProvider { + async fn create( + &self, + file_group: &str, + file_id: &str, + ) -> IndexResult> { + let path = self.location.file_path(file_group, file_id); + let writer = self + .manager + .store() + .writer( + &path, + &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, + &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, + &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + Ok(Box::new(writer)) + } + + async fn read_all( + &self, + file_group: &str, + ) -> IndexResult)>> { + let file_group_path = self.location.file_group_path(file_group); + let entries = self + .manager + .store() + .list(&file_group_path) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + let mut readers = Vec::with_capacity(entries.len()); + + for entry in entries { + if entry.metadata().is_dir() { + warn!("Unexpected entry in index creation dir: {:?}", entry.path()); + continue; + } + + let im_file_id = self.location.im_file_id_from_path(entry.path()); + + let reader = self + .manager + .store() + .reader( + entry.path(), + &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, + &INDEX_INTERMEDIATE_READ_OP_TOTAL, + &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, + ) + .await + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + readers.push((im_file_id, Box::new(reader) as _)); + } + + Ok(readers) + } +} + +impl TempFileProvider { + /// Creates a new `TempFileProvider`. + pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { + Self { location, manager } + } + + /// Removes all intermediate files. + pub async fn cleanup(&self) -> Result<()> { + self.manager + .store() + .remove_all(self.location.dir_to_cleanup()) + .await + } +} + #[cfg(test)] mod tests { use std::ffi::OsStr; use common_test_util::temp_dir; + use futures::{AsyncReadExt, AsyncWriteExt}; use regex::Regex; + use store_api::storage::RegionId; use super::*; + use crate::sst::file::FileId; #[tokio::test] async fn test_manager() { @@ -212,4 +315,58 @@ mod tests { .is_match(&pi.next().unwrap().to_string_lossy())); // fulltext path assert!(pi.next().is_none()); } + + #[tokio::test] + async fn test_temp_file_provider_basic() { + let temp_dir = temp_dir::create_temp_dir("intermediate"); + let path = temp_dir.path().display().to_string(); + + let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); + let store = IntermediateManager::init_fs(path).await.unwrap(); + let provider = TempFileProvider::new(location.clone(), store); + + let file_group = "tag0"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_id = "0000000100"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"world").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let file_group = "tag1"; + let file_id = "0000000010"; + let mut writer = provider.create(file_group, file_id).await.unwrap(); + writer.write_all(b"foo").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + + let readers = provider.read_all("tag0").await.unwrap(); + assert_eq!(readers.len(), 2); + for (_, mut reader) in readers { + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert!(matches!(buf.as_slice(), b"hello" | b"world")); + } + let readers = provider.read_all("tag1").await.unwrap(); + assert_eq!(readers.len(), 1); + let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, b"foo"); + + provider.cleanup().await.unwrap(); + + assert!(provider + .manager + .store() + .list(location.dir_to_cleanup()) + .await + .unwrap() + .is_empty()); + } } diff --git a/src/mito2/src/sst/index/inverted_index.rs b/src/mito2/src/sst/index/inverted_index.rs index d325f735a431..73dca4ac47f2 100644 --- a/src/mito2/src/sst/index/inverted_index.rs +++ b/src/mito2/src/sst/index/inverted_index.rs @@ -13,7 +13,6 @@ // limitations under the License. pub(crate) mod applier; -mod codec; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index c2f90b293003..e14bb89bd1c9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -37,8 +37,8 @@ use crate::cache::file_cache::FileCacheRef; use crate::cache::index::inverted_index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; +use crate::sst::index::codec::IndexValueCodec; use crate::sst::index::inverted_index::applier::InvertedIndexApplier; -use crate::sst::index::inverted_index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; /// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan. diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 0076322fccbd..138035d554a1 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub(crate) mod temp_provider; - use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -38,9 +36,10 @@ use crate::error::{ use crate::read::Batch; use crate::row_converter::SortField; use crate::sst::file::FileId; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; -use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec}; -use crate::sst::index::inverted_index::creator::temp_provider::TempFileProvider; +use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::intermediate::{ + IntermediateLocation, IntermediateManager, TempFileProvider, +}; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; diff --git a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs b/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs deleted file mode 100644 index 1822f3119459..000000000000 --- a/src/mito2/src/sst/index/inverted_index/creator/temp_provider.rs +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::BoxedError; -use common_telemetry::warn; -use futures::{AsyncRead, AsyncWrite}; -use index::error as index_error; -use index::error::Result as IndexResult; -use index::external_provider::ExternalTempFileProvider; -use snafu::ResultExt; - -use crate::error::Result; -use crate::metrics::{ - INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, -}; -use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; - -/// `TempFileProvider` implements `ExternalTempFileProvider`. -/// It uses `InstrumentedStore` to create and read intermediate files. -pub(crate) struct TempFileProvider { - /// Provides the location of intermediate files. - location: IntermediateLocation, - /// Provides store to access to intermediate files. - manager: IntermediateManager, -} - -#[async_trait] -impl ExternalTempFileProvider for TempFileProvider { - async fn create( - &self, - file_group: &str, - file_id: &str, - ) -> IndexResult> { - let path = self.location.file_path(file_group, file_id); - let writer = self - .manager - .store() - .writer( - &path, - &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, - &INDEX_INTERMEDIATE_WRITE_OP_TOTAL, - &INDEX_INTERMEDIATE_FLUSH_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - Ok(Box::new(writer)) - } - - async fn read_all( - &self, - file_group: &str, - ) -> IndexResult)>> { - let file_group_path = self.location.file_group_path(file_group); - let entries = self - .manager - .store() - .list(&file_group_path) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - let mut readers = Vec::with_capacity(entries.len()); - - for entry in entries { - if entry.metadata().is_dir() { - warn!("Unexpected entry in index creation dir: {:?}", entry.path()); - continue; - } - - let im_file_id = self.location.im_file_id_from_path(entry.path()); - - let reader = self - .manager - .store() - .reader( - entry.path(), - &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, - &INDEX_INTERMEDIATE_READ_OP_TOTAL, - &INDEX_INTERMEDIATE_SEEK_OP_TOTAL, - ) - .await - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - readers.push((im_file_id, Box::new(reader) as _)); - } - - Ok(readers) - } -} - -impl TempFileProvider { - /// Creates a new `TempFileProvider`. - pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { - Self { location, manager } - } - - /// Removes all intermediate files. - pub async fn cleanup(&self) -> Result<()> { - self.manager - .store() - .remove_all(self.location.dir_to_cleanup()) - .await - } -} - -#[cfg(test)] -mod tests { - use common_test_util::temp_dir; - use futures::{AsyncReadExt, AsyncWriteExt}; - use store_api::storage::RegionId; - - use super::*; - use crate::sst::file::FileId; - - #[tokio::test] - async fn test_temp_file_provider_basic() { - let temp_dir = temp_dir::create_temp_dir("intermediate"); - let path = temp_dir.path().display().to_string(); - - let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); - let store = IntermediateManager::init_fs(path).await.unwrap(); - let provider = TempFileProvider::new(location.clone(), store); - - let file_group = "tag0"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"hello").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_id = "0000000100"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"world").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let file_group = "tag1"; - let file_id = "0000000010"; - let mut writer = provider.create(file_group, file_id).await.unwrap(); - writer.write_all(b"foo").await.unwrap(); - writer.flush().await.unwrap(); - writer.close().await.unwrap(); - - let readers = provider.read_all("tag0").await.unwrap(); - assert_eq!(readers.len(), 2); - for (_, mut reader) in readers { - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert!(matches!(buf.as_slice(), b"hello" | b"world")); - } - let readers = provider.read_all("tag1").await.unwrap(); - assert_eq!(readers.len(), 1); - let mut reader = readers.into_iter().map(|x| x.1).next().unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - assert_eq!(buf, b"foo"); - - provider.cleanup().await.unwrap(); - - assert!(provider - .manager - .store() - .list(location.dir_to_cleanup()) - .await - .unwrap() - .is_empty()); - } -} From 009cbe5c4658cb9176b170dc9f19ccad7ceec799 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 11:13:16 +0000 Subject: [PATCH 2/8] feat(config) add bloom filter config Signed-off-by: Zhenchi --- config/datanode.example.toml | 33 ++++++++++++++++ config/standalone.example.toml | 33 ++++++++++++++++ src/index/src/bloom_filter/creator.rs | 2 +- src/mito2/src/access_layer.rs | 4 +- src/mito2/src/cache/write_cache.rs | 3 ++ src/mito2/src/compaction/compactor.rs | 2 + src/mito2/src/config.rs | 55 +++++++++++++++++++++++++++ src/mito2/src/flush.rs | 1 + src/mito2/src/sst/index.rs | 44 +++++++++++++++++++-- tests-integration/tests/http.rs | 7 ++++ 10 files changed, 178 insertions(+), 6 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 1b062a4b3af1..5c7919d70325 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -576,6 +576,39 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter in Mito engine. +[region_engine.mito.bloom_filter] + +## Whether to create the bloom filter on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the bloom filter on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the bloom filter on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for bloom filter creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + +## Cache size for bloom filter metadata. +metadata_cache_size = "4MiB" + +## Cache size for bloom filter content. +content_cache_size = "128MiB" + +## Page size for bloom filter content cache. +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 77445f8883bf..52fb74b24197 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -619,6 +619,39 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter in Mito engine. +[region_engine.mito.bloom_filter] + +## Whether to create the bloom filter on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the bloom filter on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the bloom filter on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for bloom filter creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + +## Cache size for bloom filter metadata. +metadata_cache_size = "4MiB" + +## Cache size for bloom filter content. +content_cache_size = "128MiB" + +## Page size for bloom filter content cache. +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index da95334782a7..db79983e6274 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -320,7 +320,7 @@ mod tests { #[tokio::test] async fn test_bloom_filter_creator_batch_push() { let mut writer = Cursor::new(Vec::new()); - let mut creator = BloomFilterCreator::new( + let mut creator: BloomFilterCreator = BloomFilterCreator::new( 2, Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 752b36fd1e0b..7ea5221bd425 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -154,6 +154,7 @@ impl AccessLayer { index_options: request.index_options, inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, + bloom_filter_config: request.bloom_filter_config, } .build() .await; @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index fc9972de5305..8fc3db79c7e9 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -125,6 +125,7 @@ impl WriteCache { index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, + bloom_filter_config: write_request.bloom_filter_config, } .build() .await; @@ -378,6 +379,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -470,6 +472,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e7d5e779b675..15503ce3cbb0 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -301,6 +301,7 @@ impl Compactor for DefaultCompactor { let merge_mode = compaction_region.current_version.options.merge_mode(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); + let bloom_filter_config = compaction_region.engine_config.bloom_filter.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -325,6 +326,7 @@ impl Compactor for DefaultCompactor { index_options, inverted_index_config, fulltext_index_config, + bloom_filter_config, }, &write_opts, ) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7a1574c850ae..cdb5cad75ac6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -117,6 +117,8 @@ pub struct MitoConfig { pub inverted_index: InvertedIndexConfig, /// Full-text index configs. pub fulltext_index: FulltextIndexConfig, + /// Bloom filter configs. + pub bloom_filter: BloomFilterConfig, /// Memtable config pub memtable: MemtableConfig, @@ -155,6 +157,7 @@ impl Default for MitoConfig { index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), + bloom_filter: BloomFilterConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), }; @@ -511,6 +514,58 @@ impl FulltextIndexConfig { } } +/// Configuration options for the bloom filter. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct BloomFilterConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for creating the index. + pub mem_threshold_on_create: MemoryThreshold, + + /// Cache size for metadata of the index. Setting it to 0 to disable the cache. + pub metadata_cache_size: ReadableSize, + /// Cache size for the index content. Setting it to 0 to disable the cache. + pub content_cache_size: ReadableSize, + /// Page size for the index content. + pub content_cache_page_size: ReadableSize, +} + +impl Default for BloomFilterConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, + metadata_cache_size: ReadableSize::mb(4), + content_cache_size: ReadableSize::mb(128), + content_cache_page_size: ReadableSize::mb(8), + } + } +} + +impl BloomFilterConfig { + pub fn mem_threshold_on_create(&self) -> Option { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize) + } else { + Some(ReadableSize::mb(64).as_bytes() as usize) + } + } + MemoryThreshold::Unlimited => None, + MemoryThreshold::Size(size) => Some(size.as_bytes() as usize), + } + } +} + /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 64a739068ad9..bae0047d1089 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -360,6 +360,7 @@ impl RegionFlushTask { index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), + bloom_filter_config: self.engine_config.bloom_filter.clone(), }; let Some(sst_info) = self .access_layer diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 84f0736ac27a..a8b94d5abdbc 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::OperationType; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; @@ -179,6 +179,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_config: BloomFilterConfig, } impl<'a> IndexerBuilder<'a> { @@ -320,7 +321,10 @@ impl<'a> IndexerBuilder<'a> { } fn build_bloom_filter_indexer(&self) -> Option { - let create = true; // TODO(zhongzc): add config for bloom filter + let create = match self.op_type { + OperationType::Flush => self.bloom_filter_config.create_on_flush.auto(), + OperationType::Compact => self.bloom_filter_config.create_on_compaction.auto(), + }; if !create { debug!( @@ -330,7 +334,7 @@ impl<'a> IndexerBuilder<'a> { return None; } - let mem_limit = Some(100 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let mem_limit = self.bloom_filter_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( self.file_id, self.metadata, @@ -494,6 +498,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -528,12 +533,14 @@ mod tests { ..Default::default() }, fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let indexer = IndexerBuilder { op_type: OperationType::Compact, @@ -542,19 +549,44 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig { create_on_compaction: Mode::Disable, ..Default::default() }, + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let indexer = IndexerBuilder { + op_type: OperationType::Compact, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig { + create_on_compaction: Mode::Disable, + ..Default::default() + }, + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -579,6 +611,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -603,6 +636,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -627,6 +661,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -658,6 +693,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fb2824790848..611f73a3cb89 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -955,6 +955,13 @@ apply_on_query = "auto" mem_threshold_on_create = "auto" compress = true +[region_engine.mito.bloom_filter] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] type = "time_series" From 61c9b2f8e54243f77f31c5a392d53e1770f35641 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 15:24:39 +0000 Subject: [PATCH 3/8] fix Signed-off-by: Zhenchi --- src/mito2/src/sst/index.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 0fef8bd38b94..daaba859f45e 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -377,7 +377,9 @@ mod tests { use api::v1::SemanticType; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, FulltextOptions, SkipIndexType, SkippingIndexOptions}; + use datatypes::schema::{ + ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType, + }; use object_store::services::Memory; use object_store::ObjectStore; use puffin_manager::PuffinManagerFactory; From 664975ccf9bff119b96bfd61e146a110db03d0f2 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 25 Dec 2024 15:26:46 +0000 Subject: [PATCH 4/8] fix docs Signed-off-by: Zhenchi --- config/config.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/config/config.md b/config/config.md index 85f5e481afd5..39ab7979fc58 100644 --- a/config/config.md +++ b/config/config.md @@ -157,6 +157,14 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter` | -- | -- | The options for bloom filter in Mito engine. | +| `region_engine.mito.bloom_filter.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | +| `region_engine.mito.bloom_filter.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | +| `region_engine.mito.bloom_filter.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -486,6 +494,14 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter` | -- | -- | The options for bloom filter in Mito engine. | +| `region_engine.mito.bloom_filter.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | +| `region_engine.mito.bloom_filter.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | +| `region_engine.mito.bloom_filter.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | From e178ae9b0f5cfb1d3e897baea3b630b5b800a5b5 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 26 Dec 2024 03:39:52 +0000 Subject: [PATCH 5/8] address comments Signed-off-by: Zhenchi --- config/datanode.example.toml | 18 +++++++++--------- config/standalone.example.toml | 2 +- src/mito2/src/access_layer.rs | 4 ++-- src/mito2/src/cache/write_cache.rs | 6 +++--- src/mito2/src/compaction/compactor.rs | 5 +++-- src/mito2/src/config.rs | 6 +++--- src/mito2/src/flush.rs | 2 +- src/mito2/src/sst/index.rs | 24 ++++++++++++------------ tests-integration/tests/http.rs | 2 +- 9 files changed, 35 insertions(+), 34 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 5c7919d70325..6013c9a5c39a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -576,37 +576,37 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" -## The options for bloom filter in Mito engine. -[region_engine.mito.bloom_filter] +## The options for bloom filter index in Mito engine. +[region_engine.mito.bloom_filter_index] -## Whether to create the bloom filter on flush. +## Whether to create the index on flush. ## - `auto`: automatically (default) ## - `disable`: never create_on_flush = "auto" -## Whether to create the bloom filter on compaction. +## Whether to create the index on compaction. ## - `auto`: automatically (default) ## - `disable`: never create_on_compaction = "auto" -## Whether to apply the bloom filter on query +## Whether to apply the index on query ## - `auto`: automatically (default) ## - `disable`: never apply_on_query = "auto" -## Memory threshold for bloom filter creation. +## Memory threshold for the index creation. ## - `auto`: automatically determine the threshold based on the system memory size (default) ## - `unlimited`: no memory limit ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" -## Cache size for bloom filter metadata. +## Cache size for the index metadata. metadata_cache_size = "4MiB" -## Cache size for bloom filter content. +## Cache size for the index content. content_cache_size = "128MiB" -## Page size for bloom filter content cache. +## Page size for the index content cache. content_cache_page_size = "8MiB" [region_engine.mito.memtable] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 52fb74b24197..763045351c30 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -620,7 +620,7 @@ apply_on_query = "auto" mem_threshold_on_create = "auto" ## The options for bloom filter in Mito engine. -[region_engine.mito.bloom_filter] +[region_engine.mito.bloom_filter_index] ## Whether to create the bloom filter on flush. ## - `auto`: automatically (default) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 7ea5221bd425..0d6204d02416 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -154,7 +154,7 @@ impl AccessLayer { index_options: request.index_options, inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, - bloom_filter_config: request.bloom_filter_config, + bloom_filter_index_config: request.bloom_filter_index_config, } .build() .await; @@ -199,7 +199,7 @@ pub(crate) struct SstWriteRequest { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, - pub(crate) bloom_filter_config: BloomFilterConfig, + pub(crate) bloom_filter_index_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 8fc3db79c7e9..18fe41c5f614 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -125,7 +125,7 @@ impl WriteCache { index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, - bloom_filter_config: write_request.bloom_filter_config, + bloom_filter_index_config: write_request.bloom_filter_index_config, } .build() .await; @@ -379,7 +379,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), - bloom_filter_config: Default::default(), + bloom_filter_index_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -472,7 +472,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), - bloom_filter_config: Default::default(), + bloom_filter_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 15503ce3cbb0..58425f4d79e3 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -301,7 +301,8 @@ impl Compactor for DefaultCompactor { let merge_mode = compaction_region.current_version.options.merge_mode(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); - let bloom_filter_config = compaction_region.engine_config.bloom_filter.clone(); + let bloom_filter_index_config = + compaction_region.engine_config.bloom_filter_index.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -326,7 +327,7 @@ impl Compactor for DefaultCompactor { index_options, inverted_index_config, fulltext_index_config, - bloom_filter_config, + bloom_filter_index_config, }, &write_opts, ) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index cdb5cad75ac6..400a5ecf85c6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -117,8 +117,8 @@ pub struct MitoConfig { pub inverted_index: InvertedIndexConfig, /// Full-text index configs. pub fulltext_index: FulltextIndexConfig, - /// Bloom filter configs. - pub bloom_filter: BloomFilterConfig, + /// Bloom filter index configs. + pub bloom_filter_index: BloomFilterConfig, /// Memtable config pub memtable: MemtableConfig, @@ -157,7 +157,7 @@ impl Default for MitoConfig { index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), - bloom_filter: BloomFilterConfig::default(), + bloom_filter_index: BloomFilterConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), }; diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index bae0047d1089..dd844a7d534c 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -360,7 +360,7 @@ impl RegionFlushTask { index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), - bloom_filter_config: self.engine_config.bloom_filter.clone(), + bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), }; let Some(sst_info) = self .access_layer diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index daaba859f45e..e575ac687b8c 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -179,7 +179,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, - pub(crate) bloom_filter_config: BloomFilterConfig, + pub(crate) bloom_filter_index_config: BloomFilterConfig, } impl<'a> IndexerBuilder<'a> { @@ -322,8 +322,8 @@ impl<'a> IndexerBuilder<'a> { fn build_bloom_filter_indexer(&self) -> Option { let create = match self.op_type { - OperationType::Flush => self.bloom_filter_config.create_on_flush.auto(), - OperationType::Compact => self.bloom_filter_config.create_on_compaction.auto(), + OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), + OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), }; if !create { @@ -334,7 +334,7 @@ impl<'a> IndexerBuilder<'a> { return None; } - let mem_limit = self.bloom_filter_config.mem_threshold_on_create(); + let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( self.file_id, self.metadata, @@ -500,7 +500,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -535,7 +535,7 @@ mod tests { ..Default::default() }, fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -558,7 +558,7 @@ mod tests { create_on_compaction: Mode::Disable, ..Default::default() }, - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -578,7 +578,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig { + bloom_filter_index_config: BloomFilterConfig { create_on_compaction: Mode::Disable, ..Default::default() }, @@ -613,7 +613,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -638,7 +638,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -663,7 +663,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -695,7 +695,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), - bloom_filter_config: BloomFilterConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 611f73a3cb89..28593f13adb0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -955,7 +955,7 @@ apply_on_query = "auto" mem_threshold_on_create = "auto" compress = true -[region_engine.mito.bloom_filter] +[region_engine.mito.bloom_filter_index] create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" From 7f36ef1515ec11fe2f7cfc9507b164dc20f9c473 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 26 Dec 2024 03:43:11 +0000 Subject: [PATCH 6/8] fix docs Signed-off-by: Zhenchi --- config/config.md | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/config/config.md b/config/config.md index 39ab7979fc58..f1b7437a41ae 100644 --- a/config/config.md +++ b/config/config.md @@ -157,14 +157,14 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter` | -- | -- | The options for bloom filter in Mito engine. | -| `region_engine.mito.bloom_filter.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | -| `region_engine.mito.bloom_filter.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | -| `region_engine.mito.bloom_filter.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | +| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. | +| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter_index.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | +| `region_engine.mito.bloom_filter_index.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | +| `region_engine.mito.bloom_filter_index.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -494,14 +494,14 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter` | -- | -- | The options for bloom filter in Mito engine. | -| `region_engine.mito.bloom_filter.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | -| `region_engine.mito.bloom_filter.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | -| `region_engine.mito.bloom_filter.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | -| `region_engine.mito.bloom_filter.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | +| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. | +| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter_index.metadata_cache_size` | String | `4MiB` | Cache size for the index metadata. | +| `region_engine.mito.bloom_filter_index.content_cache_size` | String | `128MiB` | Cache size for the index content. | +| `region_engine.mito.bloom_filter_index.content_cache_page_size` | String | `8MiB` | Page size for the index content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | From 7b138154c1243bf129f6ad3746e13a192f3af5cd Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 26 Dec 2024 04:06:04 +0000 Subject: [PATCH 7/8] merge Signed-off-by: Zhenchi --- src/mito2/src/engine.rs | 1 + src/mito2/src/read/scan_region.rs | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 9b912318e16b..71caf363c02c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -433,6 +433,7 @@ impl EngineInner { .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) + .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) .with_start_time(query_start); Ok(scan_region) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2dfa22f9f1c9..7966f164b6a5 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -223,6 +223,13 @@ impl ScanRegion { self } + /// Sets whether to ignore bloom filter. + #[must_use] + pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self { + self.ignore_bloom_filter = ignore; + self + } + #[must_use] pub(crate) fn with_start_time(mut self, now: Instant) -> Self { self.start_time = Some(now); From 44f7aa5d24e9d4aa444e2e6e093f756dc8a2f55c Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 26 Dec 2024 04:08:54 +0000 Subject: [PATCH 8/8] remove cache config Signed-off-by: Zhenchi --- config/config.md | 6 ------ config/datanode.example.toml | 9 --------- config/standalone.example.toml | 9 --------- src/mito2/src/config.rs | 10 ---------- tests-integration/tests/http.rs | 1 - 5 files changed, 35 deletions(-) diff --git a/config/config.md b/config/config.md index f1b7437a41ae..3574be2411f9 100644 --- a/config/config.md +++ b/config/config.md @@ -162,9 +162,6 @@ | `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter_index.metadata_cache_size` | String | `4MiB` | Cache size for bloom filter metadata. | -| `region_engine.mito.bloom_filter_index.content_cache_size` | String | `128MiB` | Cache size for bloom filter content. | -| `region_engine.mito.bloom_filter_index.content_cache_page_size` | String | `8MiB` | Page size for bloom filter content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -499,9 +496,6 @@ | `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | -| `region_engine.mito.bloom_filter_index.metadata_cache_size` | String | `4MiB` | Cache size for the index metadata. | -| `region_engine.mito.bloom_filter_index.content_cache_size` | String | `128MiB` | Cache size for the index content. | -| `region_engine.mito.bloom_filter_index.content_cache_page_size` | String | `8MiB` | Page size for the index content cache. | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 6013c9a5c39a..05b55d6f7e35 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -600,15 +600,6 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" -## Cache size for the index metadata. -metadata_cache_size = "4MiB" - -## Cache size for the index content. -content_cache_size = "128MiB" - -## Page size for the index content cache. -content_cache_page_size = "8MiB" - [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 763045351c30..3e9cfc0694b8 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -643,15 +643,6 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" -## Cache size for bloom filter metadata. -metadata_cache_size = "4MiB" - -## Cache size for bloom filter content. -content_cache_size = "128MiB" - -## Page size for bloom filter content cache. -content_cache_page_size = "8MiB" - [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 400a5ecf85c6..1c06fb3f1aa0 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -527,13 +527,6 @@ pub struct BloomFilterConfig { pub apply_on_query: Mode, /// Memory threshold for creating the index. pub mem_threshold_on_create: MemoryThreshold, - - /// Cache size for metadata of the index. Setting it to 0 to disable the cache. - pub metadata_cache_size: ReadableSize, - /// Cache size for the index content. Setting it to 0 to disable the cache. - pub content_cache_size: ReadableSize, - /// Page size for the index content. - pub content_cache_page_size: ReadableSize, } impl Default for BloomFilterConfig { @@ -543,9 +536,6 @@ impl Default for BloomFilterConfig { create_on_compaction: Mode::Auto, apply_on_query: Mode::Auto, mem_threshold_on_create: MemoryThreshold::Auto, - metadata_cache_size: ReadableSize::mb(4), - content_cache_size: ReadableSize::mb(128), - content_cache_page_size: ReadableSize::mb(8), } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 28593f13adb0..1e1de235a88b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -960,7 +960,6 @@ create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" -content_cache_page_size = "8MiB" [region_engine.mito.memtable] type = "time_series"