diff --git a/config/config.md b/config/config.md index 6a500a5b4a34..4f66ef30573a 100644 --- a/config/config.md +++ b/config/config.md @@ -150,6 +150,7 @@ | `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | | `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | +| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. | | `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. | | `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0ba80a9f7d92..90a4d69b2e89 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -543,6 +543,15 @@ mem_threshold_on_create = "auto" ## Deprecated, use `region_engine.mito.index.aux_path` instead. intermediate_path = "" +## Cache size for inverted index metadata. +metadata_cache_size = "64MiB" + +## Cache size for inverted index content. +content_cache_size = "128MiB" + +## Page size for inverted index content cache. +content_cache_page_size = "8MiB" + ## The options for full-text index in Mito engine. [region_engine.mito.fulltext_index] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 8eae532d6166..b73246d37f0a 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -588,6 +588,9 @@ metadata_cache_size = "64MiB" ## Cache size for inverted index content. content_cache_size = "128MiB" +## Page size for inverted index content cache. +content_cache_page_size = "8MiB" + ## The options for full-text index in Mito engine. [region_engine.mito.fulltext_index] diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 587b60560224..03cf9136245a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -26,7 +26,6 @@ use std::mem; use std::sync::Arc; use bytes::Bytes; -use common_base::readable_size::ReadableSize; use datatypes::value::Value; use datatypes::vectors::VectorRef; use moka::notification::RemovalCause; @@ -55,9 +54,6 @@ const FILE_TYPE: &str = "file"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; -/// Page size for cache. Currently only used for inverted index cache. -const PAGE_SIZE: usize = 8192 * 1024; // 8MB - /// Manages cached data for the engine. /// /// All caches are disabled by default. @@ -248,6 +244,7 @@ pub struct CacheManagerBuilder { page_cache_size: u64, index_metadata_size: u64, index_content_size: u64, + index_content_page_size: u64, puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, @@ -290,6 +287,12 @@ impl CacheManagerBuilder { self } + /// Sets page size for index content. + pub fn index_content_page_size(mut self, bytes: u64) -> Self { + self.index_content_page_size = bytes; + self + } + /// Sets cache size for puffin metadata. pub fn puffin_metadata_size(mut self, bytes: u64) -> Self { self.puffin_metadata_size = bytes; @@ -359,7 +362,7 @@ impl CacheManagerBuilder { let inverted_index_cache = InvertedIndexCache::new( self.index_metadata_size, self.index_content_size, - ReadableSize::kb(PAGE_SIZE as u64), + self.index_content_page_size, ); let puffin_metadata_cache = PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 242564977be5..c43bf5033a63 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; -use common_base::readable_size::ReadableSize; use common_base::BitVec; use index::inverted_index::error::DecodeFstSnafu; use index::inverted_index::format::reader::InvertedIndexReader; @@ -77,7 +76,7 @@ where } None => { CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); - let offset = index.page_id * (self.cache.page_size as u64); + let offset = index.page_id * self.cache.page_size; let size = self.cache.page_size as _; let page = self.inner.range_read(offset, size).await?; let page = Arc::new(page); @@ -92,7 +91,7 @@ where // last page range. when the first page is the last page, the range is not used. IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size) } else { - 0..self.cache.page_size + 0..self.cache.page_size as usize }; data.extend_from_slice(&page[range]); } @@ -154,12 +153,12 @@ pub struct IndexDataPageKey { impl IndexDataPageKey { /// Converts an offset to a page ID based on the page size. - fn calculate_page_id(offset: u64, page_size: usize) -> u64 { - offset / (page_size as u64) + fn calculate_page_id(offset: u64, page_size: u64) -> u64 { + offset / page_size } /// Calculates the total number of pages that a given size spans, starting from a specific offset. - fn calculate_page_count(offset: u64, size: u32, page_size: usize) -> u32 { + fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 { let start_page = Self::calculate_page_id(offset, page_size); let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size); (end_page - start_page + 1) as u32 @@ -167,21 +166,22 @@ impl IndexDataPageKey { /// Computes the byte range in the first page based on the offset and size. /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096. - fn calculate_first_page_range(offset: u64, size: u32, page_size: usize) -> Range { - let start = (offset % (page_size as u64)) as usize; + fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range { + let start = (offset % page_size) as usize; let end = if size > page_size as u32 - start as u32 { - page_size + page_size as usize } else { - (start as u64 + size as u64) as usize + start + size as usize }; start..end } /// Computes the byte range in the last page based on the offset and size. /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904. - fn calculate_last_page_range(offset: u64, size: u32, page_size: usize) -> Range { + fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range { let offset = offset as usize; let size = size as usize; + let page_size = page_size as usize; if (offset + size) % page_size == 0 { 0..page_size } else { @@ -190,7 +190,7 @@ impl IndexDataPageKey { } /// Generates a vector of IndexKey instances for the pages that a given offset and size span. - fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: usize) -> Vec { + fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec { let start_page = Self::calculate_page_id(offset, page_size); let total_pages = Self::calculate_page_count(offset, size, page_size); (0..total_pages) @@ -210,12 +210,12 @@ pub struct InvertedIndexCache { /// Cache for inverted index content. index: moka::sync::Cache>>, // Page size for index content. - page_size: usize, + page_size: u64, } impl InvertedIndexCache { /// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`. - pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: ReadableSize) -> Self { + pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self { common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}"); let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap) .name("inverted_index_metadata") @@ -240,14 +240,14 @@ impl InvertedIndexCache { Self { index_metadata, index: index_cache, - page_size: page_size.as_bytes() as usize, + page_size, } } } impl InvertedIndexCache { pub fn get_index_metadata(&self, file_id: FileId) -> Option> { - self.index_metadata.get(&&IndexMetadataKey { file_id }) + self.index_metadata.get(&IndexMetadataKey { file_id }) } pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc) { @@ -306,18 +306,15 @@ mod test { let size = 5000; let page_id = IndexDataPageKey::calculate_page_id(offset, page_size); let page_num = IndexDataPageKey::calculate_page_count(offset, size, page_size); - let indexes = - IndexDataPageKey::generate_page_keys(key.file_id, offset, size as u32, page_size); + let indexes = IndexDataPageKey::generate_page_keys(key.file_id, offset, size, page_size); assert_eq!(page_num, 2); assert_eq!(indexes.len(), page_num as usize); assert_eq!(indexes[0].page_id, page_id); assert_eq!(indexes[1].page_id, page_id + 1); - let first_range = - IndexDataPageKey::calculate_first_page_range(offset, size as u32, page_size); + let first_range = IndexDataPageKey::calculate_first_page_range(offset, size, page_size); assert_eq!(first_range, 1000..4096); - let last_range = - IndexDataPageKey::calculate_last_page_range(offset, size as u32, page_size); + let last_range = IndexDataPageKey::calculate_last_page_range(offset, size, page_size); assert_eq!(last_range, 0..1904); // Exactly end the last page. @@ -330,17 +327,14 @@ mod test { let size = 3096; let page_id = IndexDataPageKey::calculate_page_id(offset, page_size); let page_num = IndexDataPageKey::calculate_page_count(offset, size, page_size); - let indexes = - IndexDataPageKey::generate_page_keys(key.file_id, offset, size as u32, page_size); + let indexes = IndexDataPageKey::generate_page_keys(key.file_id, offset, size, page_size); assert_eq!(page_num, 1); assert_eq!(indexes.len(), page_num as usize); assert_eq!(indexes[0].page_id, page_id); - let first_range = - IndexDataPageKey::calculate_first_page_range(offset, size as u32, page_size); + let first_range = IndexDataPageKey::calculate_first_page_range(offset, size, page_size); assert_eq!(first_range, 1000..4096); - let last_range = - IndexDataPageKey::calculate_last_page_range(offset, size as u32, page_size); + let last_range = IndexDataPageKey::calculate_last_page_range(offset, size, page_size); // In this case, the last range will not be used. assert_eq!(last_range, 0..4096); } @@ -354,12 +348,13 @@ mod test { rng.fill_bytes(&mut data); let file_id = FileId::random(); - for _ in 0..1000 { + for _ in 0..100 { let offset = rng.gen_range(0..data.len() as u64); let size = rng.gen_range(0..data.len() as u32 - offset as u32); - let page_size = rng.gen_range(1..1024); + let page_size: usize = rng.gen_range(1..1024); - let indexes = IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size); + let indexes = + IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64); let page_num = indexes.len(); let mut read = Vec::with_capacity(size as usize); let last_index = indexes.len() - 1; @@ -372,22 +367,22 @@ mod test { }; let range = if i == 0 { // first page range - IndexDataPageKey::calculate_first_page_range(offset, size, page_size) + IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64) } else if i == last_index { // last page range. when the first page is the last page, the range is not used. - IndexDataPageKey::calculate_last_page_range(offset, size, page_size) + IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64) } else { 0..page_size }; read.extend_from_slice(&page[range]); } - let expected_range = offset as usize..(offset + size as u64) as usize; + let expected_range = offset as usize..(offset + size as u64 as u64) as usize; if read != data.get(expected_range).unwrap() { panic!( "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\n read len: {}, expected len: {}\n first page range: {:?}, last page range: {:?}, page num: {}", offset, size, page_size, read.len(), size as usize, - IndexDataPageKey::calculate_first_page_range(offset, size, page_size), - IndexDataPageKey::calculate_last_page_range(offset, size, page_size), page_num + IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64), + IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num ); } } @@ -439,7 +434,7 @@ mod test { let mut cached_reader = CachedInvertedIndexBlobReader::new( FileId::random(), reader, - Arc::new(InvertedIndexCache::new(8192, 8192, ReadableSize(10))), + Arc::new(InvertedIndexCache::new(8192, 8192, 100000000000)), ); let metadata = cached_reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index dda3f4271059..963089c60aed 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -416,6 +416,8 @@ pub struct InvertedIndexConfig { pub metadata_cache_size: ReadableSize, /// Cache size for inverted index content. Setting it to 0 to disable the cache. pub content_cache_size: ReadableSize, + /// Page size for inverted index content. + pub content_cache_page_size: ReadableSize, } impl InvertedIndexConfig { @@ -441,6 +443,7 @@ impl Default for InvertedIndexConfig { intermediate_path: String::new(), metadata_cache_size: ReadableSize::mb(64), content_cache_size: ReadableSize::mb(128), + content_cache_page_size: ReadableSize::mb(8), }; if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 4d741a30a7f0..15cba55c4437 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -302,7 +302,6 @@ mod tests { use std::iter; use api::v1::SemanticType; - use common_base::readable_size::ReadableSize; use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -449,7 +448,7 @@ mod tests { move |expr| { let _d = &d; - let cache = Arc::new(InvertedIndexCache::new(10, 10, ReadableSize(10))); + let cache = Arc::new(InvertedIndexCache::new(10, 10, 100)); let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES)); let applier = InvertedIndexApplierBuilder::new( region_dir.clone(), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index f8ab9c3f4edb..233ab9f056b1 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -170,6 +170,7 @@ impl WorkerGroup { .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes()) .index_content_size(config.inverted_index.content_cache_size.as_bytes()) + .index_content_page_size(config.inverted_index.content_cache_page_size.as_bytes()) .puffin_metadata_size(config.index.metadata_cache_size.as_bytes()) .write_cache(write_cache) .build(),