Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): add bloom filter config #5237

Merged
merged 11 commits into from
Dec 26, 2024
33 changes: 33 additions & 0 deletions config/datanode.example.toml
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,39 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter]

## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## Cache size for bloom filter metadata.
metadata_cache_size = "4MiB"

## Cache size for bloom filter content.
content_cache_size = "128MiB"

## Page size for bloom filter content cache.
content_cache_page_size = "8MiB"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: Configuring cache size for each index type is verbose. We may need to provide a global index cache.


[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
33 changes: 33 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,39 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## The options for bloom filter in Mito engine.
[region_engine.mito.bloom_filter]

## Whether to create the bloom filter on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the bloom filter on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the bloom filter on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for bloom filter creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## Cache size for bloom filter metadata.
metadata_cache_size = "4MiB"

## Cache size for bloom filter content.
content_cache_size = "128MiB"

## Page size for bloom filter content cache.
content_cache_page_size = "8MiB"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
2 changes: 1 addition & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
use crate::prelude::ConcreteDataType;
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkippingIndexOptions,
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, SkipIndexType, SkippingIndexOptions,
evenyag marked this conversation as resolved.
Show resolved Hide resolved
COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY,
Expand Down
8 changes: 4 additions & 4 deletions src/index/src/bloom_filter/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl BloomFilterCreator {
/// `rows_per_segment` <= 0
pub fn new(
rows_per_segment: usize,
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
Expand Down Expand Up @@ -252,7 +252,7 @@ mod tests {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down Expand Up @@ -320,9 +320,9 @@ mod tests {
#[tokio::test]
async fn test_bloom_filter_creator_batch_push() {
let mut writer = Cursor::new(Vec::new());
let mut creator = BloomFilterCreator::new(
let mut creator: BloomFilterCreator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down
8 changes: 4 additions & 4 deletions src/index/src/bloom_filter/creator/finalize_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct FinalizedBloomFilterStorage {
intermediate_prefix: String,

/// The provider for intermediate Bloom filter files.
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,

/// The memory usage of the in-memory Bloom filters.
memory_usage: usize,
Expand All @@ -59,7 +59,7 @@ pub struct FinalizedBloomFilterStorage {
impl FinalizedBloomFilterStorage {
/// Creates a new `FinalizedBloomFilterStorage`.
pub fn new(
intermediate_provider: Box<dyn ExternalTempFileProvider>,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
Expand Down Expand Up @@ -132,7 +132,7 @@ impl FinalizedBloomFilterStorage {
/// Drains the storage and returns a stream of finalized Bloom filter segments.
pub async fn drain(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + '_>>> {
) -> Result<Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>> {
// FAST PATH: memory only
if self.intermediate_file_id_counter == 0 {
return Ok(Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))));
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {

let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
let provider = Box::new(mock_provider);
let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/bloom_filter/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
let mut writer = Cursor::new(vec![]);
let mut creator = BloomFilterCreator::new(
2,
Box::new(MockExternalTempFileProvider::new()),
Arc::new(MockExternalTempFileProvider::new()),
Arc::new(AtomicUsize::new(0)),
None,
);
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -154,6 +154,7 @@ impl AccessLayer {
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
bloom_filter_config: request.bloom_filter_config,
}
.build()
.await;
Expand Down Expand Up @@ -198,6 +199,7 @@ pub(crate) struct SstWriteRequest {
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_config: BloomFilterConfig,
}

pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl WriteCache {
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
bloom_filter_config: write_request.bloom_filter_config,
}
.build()
.await;
Expand Down Expand Up @@ -378,6 +379,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_config: Default::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -470,6 +472,7 @@ mod tests {
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
16 changes: 4 additions & 12 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use common_telemetry::{info, warn};
use common_time::TimeToLive;
use object_store::manager::ObjectStoreManagerRef;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
Expand All @@ -41,7 +40,7 @@ use crate::region::options::RegionOptions;
use crate::region::version::VersionRef;
use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
use crate::schedule::scheduler::LocalScheduler;
use crate::sst::file::{FileMeta, IndexType};
use crate::sst::file::FileMeta;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
Expand Down Expand Up @@ -302,6 +301,7 @@ impl Compactor for DefaultCompactor {
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_config = compaction_region.engine_config.bloom_filter.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -326,6 +326,7 @@ impl Compactor for DefaultCompactor {
index_options,
inverted_index_config,
fulltext_index_config,
bloom_filter_config,
},
&write_opts,
)
Expand All @@ -336,16 +337,7 @@ impl Compactor for DefaultCompactor {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: {
let mut indexes = SmallVec::new();
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
available_indexes: sst_info.index_metadata.build_available_indexes(),
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
Expand Down
55 changes: 55 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ pub struct MitoConfig {
pub inverted_index: InvertedIndexConfig,
/// Full-text index configs.
pub fulltext_index: FulltextIndexConfig,
/// Bloom filter configs.
pub bloom_filter: BloomFilterConfig,

/// Memtable config
pub memtable: MemtableConfig,
Expand Down Expand Up @@ -155,6 +157,7 @@ impl Default for MitoConfig {
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
bloom_filter: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
};
Expand Down Expand Up @@ -511,6 +514,58 @@ impl FulltextIndexConfig {
}
}

/// Configuration options for the bloom filter.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct BloomFilterConfig {
/// Whether to create the index on flush: automatically or never.
pub create_on_flush: Mode,
/// Whether to create the index on compaction: automatically or never.
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Memory threshold for creating the index.
pub mem_threshold_on_create: MemoryThreshold,

/// Cache size for metadata of the index. Setting it to 0 to disable the cache.
pub metadata_cache_size: ReadableSize,
/// Cache size for the index content. Setting it to 0 to disable the cache.
pub content_cache_size: ReadableSize,
/// Page size for the index content.
pub content_cache_page_size: ReadableSize,
zhongzc marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for BloomFilterConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
metadata_cache_size: ReadableSize::mb(4),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::mb(8),
}
}
}

impl BloomFilterConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}

/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);
Expand Down
25 changes: 22 additions & 3 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to retrieve fulltext options from column metadata"))]
FulltextOptions {
#[snafu(display("Failed to retrieve index options from column metadata"))]
IndexOptions {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
Expand Down Expand Up @@ -904,6 +904,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to push value to bloom filter"))]
PushBloomFilterValue {
source: index::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to finish bloom filter"))]
BloomFilterFinish {
source: index::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -1029,7 +1043,7 @@ impl ErrorExt for Error {
UnsupportedOperation { .. } => StatusCode::Unsupported,
RemoteCompaction { .. } => StatusCode::Unexpected,

FulltextOptions { source, .. } => source.status_code(),
IndexOptions { source, .. } => source.status_code(),
CreateFulltextCreator { source, .. } => source.status_code(),
CastVector { source, .. } => source.status_code(),
FulltextPushText { source, .. }
Expand All @@ -1039,7 +1053,12 @@ impl ErrorExt for Error {
RegionBusy { .. } => StatusCode::RegionBusy,
GetSchemaMetadata { source, .. } => source.status_code(),
Timeout { .. } => StatusCode::Cancelled,

DecodeArrowRowGroup { .. } => StatusCode::Internal,

PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => {
source.status_code()
}
}
}

Expand Down
Loading
Loading