diff --git a/config/config.md b/config/config.md index 85f5e481afd5..3574be2411f9 100644 --- a/config/config.md +++ b/config/config.md @@ -157,6 +157,11 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter in Mito engine. | +| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the bloom filter on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -486,6 +491,11 @@ | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | +| `region_engine.mito.bloom_filter_index` | -- | -- | The options for bloom filter index in Mito engine. | +| `region_engine.mito.bloom_filter_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 1b062a4b3af1..05b55d6f7e35 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -576,6 +576,30 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter index in Mito engine. +[region_engine.mito.bloom_filter_index] + +## Whether to create the index on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the index on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the index on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for the index creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 77445f8883bf..3e9cfc0694b8 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -619,6 +619,30 @@ apply_on_query = "auto" ## - `[size]` e.g. `64MB`: fixed memory threshold mem_threshold_on_create = "auto" +## The options for bloom filter in Mito engine. +[region_engine.mito.bloom_filter_index] + +## Whether to create the bloom filter on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the bloom filter on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the bloom filter on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for bloom filter creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index da95334782a7..db79983e6274 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -320,7 +320,7 @@ mod tests { #[tokio::test] async fn test_bloom_filter_creator_batch_push() { let mut writer = Cursor::new(Vec::new()); - let mut creator = BloomFilterCreator::new( + let mut creator: BloomFilterCreator = BloomFilterCreator::new( 2, Arc::new(MockExternalTempFileProvider::new()), Arc::new(AtomicUsize::new(0)), diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 752b36fd1e0b..0d6204d02416 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -154,6 +154,7 @@ impl AccessLayer { index_options: request.index_options, inverted_index_config: request.inverted_index_config, fulltext_index_config: request.fulltext_index_config, + bloom_filter_index_config: request.bloom_filter_index_config, } .build() .await; @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_index_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index fc9972de5305..18fe41c5f614 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -125,6 +125,7 @@ impl WriteCache { index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, fulltext_index_config: write_request.fulltext_index_config, + bloom_filter_index_config: write_request.bloom_filter_index_config, } .build() .await; @@ -378,6 +379,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -470,6 +472,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: Default::default(), fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index e7d5e779b675..58425f4d79e3 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -301,6 +301,8 @@ impl Compactor for DefaultCompactor { let merge_mode = compaction_region.current_version.options.merge_mode(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); + let bloom_filter_index_config = + compaction_region.engine_config.bloom_filter_index.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -325,6 +327,7 @@ impl Compactor for DefaultCompactor { index_options, inverted_index_config, fulltext_index_config, + bloom_filter_index_config, }, &write_opts, ) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7a1574c850ae..1c06fb3f1aa0 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -117,6 +117,8 @@ pub struct MitoConfig { pub inverted_index: InvertedIndexConfig, /// Full-text index configs. pub fulltext_index: FulltextIndexConfig, + /// Bloom filter index configs. + pub bloom_filter_index: BloomFilterConfig, /// Memtable config pub memtable: MemtableConfig, @@ -155,6 +157,7 @@ impl Default for MitoConfig { index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), fulltext_index: FulltextIndexConfig::default(), + bloom_filter_index: BloomFilterConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), }; @@ -511,6 +514,48 @@ impl FulltextIndexConfig { } } +/// Configuration options for the bloom filter. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct BloomFilterConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for creating the index. + pub mem_threshold_on_create: MemoryThreshold, +} + +impl Default for BloomFilterConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, + } + } +} + +impl BloomFilterConfig { + pub fn mem_threshold_on_create(&self) -> Option { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize) + } else { + Some(ReadableSize::mb(64).as_bytes() as usize) + } + } + MemoryThreshold::Unlimited => None, + MemoryThreshold::Size(size) => Some(size.as_bytes() as usize), + } + } +} + /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 98160eadc46a..71caf363c02c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -433,7 +433,7 @@ impl EngineInner { .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) - // .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) // TODO(ruihang): wait for #5237 + .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) .with_start_time(query_start); Ok(scan_region) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 64a739068ad9..dd844a7d534c 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -360,6 +360,7 @@ impl RegionFlushTask { index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), fulltext_index_config: self.engine_config.fulltext_index.clone(), + bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), }; let Some(sst_info) = self .access_layer diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2ce3367b409b..5cd99fe3778e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -231,7 +231,6 @@ impl ScanRegion { /// Sets whether to ignore bloom filter. #[must_use] - #[allow(dead_code)] // TODO(ruihang): waiting for #5237 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self { self.ignore_bloom_filter = ignore; self diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 0b2822c04a7c..dc0f0978f84c 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -33,7 +33,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::OperationType; -use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; +use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; @@ -179,6 +179,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, pub(crate) fulltext_index_config: FulltextIndexConfig, + pub(crate) bloom_filter_index_config: BloomFilterConfig, } impl<'a> IndexerBuilder<'a> { @@ -320,7 +321,10 @@ impl<'a> IndexerBuilder<'a> { } fn build_bloom_filter_indexer(&self) -> Option { - let create = true; // TODO(zhongzc): add config for bloom filter + let create = match self.op_type { + OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(), + OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(), + }; if !create { debug!( @@ -330,7 +334,7 @@ impl<'a> IndexerBuilder<'a> { return None; } - let mem_limit = Some(16 * 1024 * 1024); // TODO(zhongzc): add config for bloom filter + let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create(); let indexer = BloomFilterIndexer::new( self.file_id, self.metadata, @@ -496,6 +500,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -530,12 +535,14 @@ mod tests { ..Default::default() }, fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_none()); assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_some()); let indexer = IndexerBuilder { op_type: OperationType::Compact, @@ -544,19 +551,44 @@ mod tests { metadata: &metadata, row_group_size: 1024, puffin_manager: factory.build(mock_object_store()), - intermediate_manager: intm_manager, + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig { create_on_compaction: Mode::Disable, ..Default::default() }, + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_some()); assert!(indexer.fulltext_indexer.is_none()); + assert!(indexer.bloom_filter_indexer.is_some()); + + let indexer = IndexerBuilder { + op_type: OperationType::Compact, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig { + create_on_compaction: Mode::Disable, + ..Default::default() + }, + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); + assert!(indexer.bloom_filter_indexer.is_none()); } #[tokio::test] @@ -581,6 +613,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -605,6 +638,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -629,6 +663,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; @@ -660,6 +695,7 @@ mod tests { index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), fulltext_index_config: FulltextIndexConfig::default(), + bloom_filter_index_config: BloomFilterConfig::default(), } .build() .await; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index fb2824790848..1e1de235a88b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -955,6 +955,12 @@ apply_on_query = "auto" mem_threshold_on_create = "auto" compress = true +[region_engine.mito.bloom_filter_index] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] type = "time_series"