diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 1b062a4b3af1..5c7919d70325 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -576,6 +576,39 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter in Mito engine. +[region_engine.mito.bloom_filter] + +## Whether to create the bloom filter on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the bloom filter on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the bloom filter on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for bloom filter creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + +## Cache size for bloom filter metadata. +metadata_cache_size = "4MiB" + +## Cache size for bloom filter content. +content_cache_size = "128MiB" + +## Page size for bloom filter content cache. +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 77445f8883bf..52fb74b24197 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -619,6 +619,39 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter in Mito engine. +[region_engine.mito.bloom_filter] + +## Whether to create the bloom filter on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the bloom filter on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the bloom filter on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for bloom filter creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + +## Cache size for bloom filter metadata. +metadata_cache_size = "4MiB" + +## Cache size for bloom filter content. +content_cache_size = "128MiB" + +## Page size for bloom filter content cache. +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index da95334782a7..db79983e6274 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -320,7 +320,7 @@ mod tests { #[tokio::test] async fn test_bloom_filter_creator_batch_push() { let mut writer = Cursor::new(Vec::new()); - let mut creator = BloomFilterCreator::new( + let mut creator: BloomFilterCreator = BloomFilterCreator::new( 2, Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 752b36fd1e0b..7ea5221bd425 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -154,6 +154,7 @@ impl AccessLayer { index_options: request.index_options, inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, + bloom_filter_config: request.bloom_filter_config, } .build() .await; @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index fc9972de5305..8fc3db79c7e9 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -125,6 +125,7 @@ impl WriteCache { index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, + bloom_filter_config: write_request.bloom_filter_config, } .build() .await; @@ -378,6 +379,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -470,6 +472,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e7d5e779b675..15503ce3cbb0 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -301,6 +301,7 @@ impl Compactor for DefaultCompactor { let merge_mode = compaction_region.current_version.options.merge_mode(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); + let bloom_filter_config = compaction_region.engine_config.bloom_filter.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -325,6 +326,7 @@ impl Compactor for DefaultCompactor { index_options, inverted_index_config, fulltext_index_config, + bloom_filter_config, }, &write_opts, ) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7a1574c850ae..cdb5cad75ac6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -117,6 +117,8 @@ pub struct MitoConfig { pub inverted_index: InvertedIndexConfig, /// Full-text index configs. pub fulltext_index: FulltextIndexConfig, + /// Bloom filter configs. + pub bloom_filter: BloomFilterConfig, /// Memtable config pub memtable: MemtableConfig, @@ -155,6 +157,7 @@ impl Default for MitoConfig { index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), + bloom_filter: BloomFilterConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), }; @@ -511,6 +514,58 @@ impl FulltextIndexConfig { } } +/// Configuration options for the bloom filter. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct BloomFilterConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for creating the index. + pub mem_threshold_on_create: MemoryThreshold, + + /// Cache size for metadata of the index. Setting it to 0 to disable the cache. + pub metadata_cache_size: ReadableSize, + /// Cache size for the index content. Setting it to 0 to disable the cache. + pub content_cache_size: ReadableSize, + /// Page size for the index content. + pub content_cache_page_size: ReadableSize, +} + +impl Default for BloomFilterConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, + metadata_cache_size: ReadableSize::mb(4), + content_cache_size: ReadableSize::mb(128), + content_cache_page_size: ReadableSize::mb(8), + } + } +} + +impl BloomFilterConfig { + pub fn mem_threshold_on_create(&self) -> Option { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize) + } else { + Some(ReadableSize::mb(64).as_bytes() as usize) + } + } + MemoryThreshold::Unlimited => None, + MemoryThreshold::Size(size) => Some(size.as_bytes() as usize), + } + } +} + /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 64a739068ad9..bae0047d1089 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -360,6 +360,7 @@ impl RegionFlushTask { index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), + bloom_filter_config: self.engine_config.bloom_filter.clone(), }; let Some(sst_info) = self .access_layer diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 84f0736ac27a..a8b94d5abdbc 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::OperationType; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; @@ -179,6 +179,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_config: BloomFilterConfig, } impl<'a> IndexerBuilder<'a> { @@ -320,7 +321,10 @@ impl<'a> IndexerBuilder<'a> { } fn build_bloom_filter_indexer(&self) -> Option { - let create = true; // TODO(zhongzc): add config for bloom filter + let create = match self.op_type { + OperationType::Flush => self.bloom_filter_config.create_on_flush.auto(), + OperationType::Compact => self.bloom_filter_config.create_on_compaction.auto(), + }; if !create { debug!( @@ -330,7 +334,7 @@ impl<'a> IndexerBuilder<'a> { return None; } - let mem_limit = Some(100 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let mem_limit = self.bloom_filter_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( self.file_id, self.metadata, @@ -494,6 +498,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -528,12 +533,14 @@ mod tests { ..Default::default() }, fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let indexer = IndexerBuilder { op_type: OperationType::Compact, @@ -542,19 +549,44 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig { create_on_compaction: Mode::Disable, ..Default::default() }, + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let indexer = IndexerBuilder { + op_type: OperationType::Compact, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig { + create_on_compaction: Mode::Disable, + ..Default::default() + }, + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -579,6 +611,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -603,6 +636,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -627,6 +661,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; @@ -658,6 +693,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_config: BloomFilterConfig::default(), } .build() .await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fb2824790848..611f73a3cb89 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -955,6 +955,13 @@ apply_on_query = "auto" mem_threshold_on_create = "auto" compress = true +[region_engine.mito.bloom_filter] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" +content_cache_page_size = "8MiB" + [region_engine.mito.memtable] type = "time_series"