Skip to content

Commit

Permalink
feat: add config for inverted index page size
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Dec 12, 2024
1 parent d334dd3 commit 6586bba
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 44 deletions.
1 change: 1 addition & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
Expand Down
9 changes: 9 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,15 @@ mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""

## Cache size for inverted index metadata.
metadata_cache_size = "64MiB"

## Cache size for inverted index content.
content_cache_size = "128MiB"

## Page size for inverted index content cache.
content_cache_page_size = "8MiB"

## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

Expand Down
3 changes: 3 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ metadata_cache_size = "64MiB"
## Cache size for inverted index content.
content_cache_size = "128MiB"

## Page size for inverted index content cache.
content_cache_page_size = "8MiB"

## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

Expand Down
13 changes: 8 additions & 5 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::mem;
use std::sync::Arc;

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -55,9 +54,6 @@ const FILE_TYPE: &str = "file";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";

/// Page size for cache. Currently only used for inverted index cache.
const PAGE_SIZE: usize = 8192 * 1024; // 8MB

/// Manages cached data for the engine.
///
/// All caches are disabled by default.
Expand Down Expand Up @@ -248,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
index_content_page_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
Expand Down Expand Up @@ -290,6 +287,12 @@ impl CacheManagerBuilder {
self
}

/// Sets page size for index content.
pub fn index_content_page_size(mut self, bytes: u64) -> Self {
self.index_content_page_size = bytes;
self
}

/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
Expand Down Expand Up @@ -359,7 +362,7 @@ impl CacheManagerBuilder {
let inverted_index_cache = InvertedIndexCache::new(
self.index_metadata_size,
self.index_content_size,
ReadableSize::kb(PAGE_SIZE as u64),
self.index_content_page_size,
);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
Expand Down
69 changes: 32 additions & 37 deletions src/mito2/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait;
use common_base::readable_size::ReadableSize;
use common_base::BitVec;
use index::inverted_index::error::DecodeFstSnafu;
use index::inverted_index::format::reader::InvertedIndexReader;
Expand Down Expand Up @@ -77,7 +76,7 @@ where
}
None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
let offset = index.page_id * (self.cache.page_size as u64);
let offset = index.page_id * self.cache.page_size;
let size = self.cache.page_size as _;
let page = self.inner.range_read(offset, size).await?;
let page = Arc::new(page);
Expand All @@ -92,7 +91,7 @@ where
// last page range. when the first page is the last page, the range is not used.
IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
} else {
0..self.cache.page_size
0..self.cache.page_size as usize
};
data.extend_from_slice(&page[range]);
}
Expand Down Expand Up @@ -154,34 +153,35 @@ pub struct IndexDataPageKey {

impl IndexDataPageKey {
/// Converts an offset to a page ID based on the page size.
fn calculate_page_id(offset: u64, page_size: usize) -> u64 {
offset / (page_size as u64)
fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
offset / page_size
}

/// Calculates the total number of pages that a given size spans, starting from a specific offset.
fn calculate_page_count(offset: u64, size: u32, page_size: usize) -> u32 {
fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
let start_page = Self::calculate_page_id(offset, page_size);
let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
(end_page - start_page + 1) as u32
}

/// Computes the byte range in the first page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
fn calculate_first_page_range(offset: u64, size: u32, page_size: usize) -> Range<usize> {
let start = (offset % (page_size as u64)) as usize;
fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let start = (offset % page_size) as usize;
let end = if size > page_size as u32 - start as u32 {
page_size
page_size as usize
} else {
(start as u64 + size as u64) as usize
start + size as usize
};
start..end
}

/// Computes the byte range in the last page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
fn calculate_last_page_range(offset: u64, size: u32, page_size: usize) -> Range<usize> {
fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let offset = offset as usize;
let size = size as usize;
let page_size = page_size as usize;
if (offset + size) % page_size == 0 {
0..page_size
} else {
Expand All @@ -190,7 +190,7 @@ impl IndexDataPageKey {
}

/// Generates a vector of IndexKey instances for the pages that a given offset and size span.
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: usize) -> Vec<Self> {
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec<Self> {
let start_page = Self::calculate_page_id(offset, page_size);
let total_pages = Self::calculate_page_count(offset, size, page_size);
(0..total_pages)
Expand All @@ -210,12 +210,12 @@ pub struct InvertedIndexCache {
/// Cache for inverted index content.
index: moka::sync::Cache<IndexDataPageKey, Arc<Vec<u8>>>,
// Page size for index content.
page_size: usize,
page_size: u64,
}

impl InvertedIndexCache {
/// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`.
pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: ReadableSize) -> Self {
pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}");
let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
.name("inverted_index_metadata")
Expand All @@ -240,14 +240,14 @@ impl InvertedIndexCache {
Self {
index_metadata,
index: index_cache,
page_size: page_size.as_bytes() as usize,
page_size,
}
}
}

impl InvertedIndexCache {
pub fn get_index_metadata(&self, file_id: FileId) -> Option<Arc<InvertedIndexMetas>> {
self.index_metadata.get(&&IndexMetadataKey { file_id })
self.index_metadata.get(&IndexMetadataKey { file_id })
}

pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc<InvertedIndexMetas>) {
Expand Down Expand Up @@ -306,18 +306,15 @@ mod test {
let size = 5000;
let page_id = IndexDataPageKey::calculate_page_id(offset, page_size);
let page_num = IndexDataPageKey::calculate_page_count(offset, size, page_size);
let indexes =
IndexDataPageKey::generate_page_keys(key.file_id, offset, size as u32, page_size);
let indexes = IndexDataPageKey::generate_page_keys(key.file_id, offset, size, page_size);
assert_eq!(page_num, 2);
assert_eq!(indexes.len(), page_num as usize);
assert_eq!(indexes[0].page_id, page_id);
assert_eq!(indexes[1].page_id, page_id + 1);

let first_range =
IndexDataPageKey::calculate_first_page_range(offset, size as u32, page_size);
let first_range = IndexDataPageKey::calculate_first_page_range(offset, size, page_size);
assert_eq!(first_range, 1000..4096);
let last_range =
IndexDataPageKey::calculate_last_page_range(offset, size as u32, page_size);
let last_range = IndexDataPageKey::calculate_last_page_range(offset, size, page_size);
assert_eq!(last_range, 0..1904);

// Exactly end the last page.
Expand All @@ -330,17 +327,14 @@ mod test {
let size = 3096;
let page_id = IndexDataPageKey::calculate_page_id(offset, page_size);
let page_num = IndexDataPageKey::calculate_page_count(offset, size, page_size);
let indexes =
IndexDataPageKey::generate_page_keys(key.file_id, offset, size as u32, page_size);
let indexes = IndexDataPageKey::generate_page_keys(key.file_id, offset, size, page_size);
assert_eq!(page_num, 1);
assert_eq!(indexes.len(), page_num as usize);
assert_eq!(indexes[0].page_id, page_id);

let first_range =
IndexDataPageKey::calculate_first_page_range(offset, size as u32, page_size);
let first_range = IndexDataPageKey::calculate_first_page_range(offset, size, page_size);
assert_eq!(first_range, 1000..4096);
let last_range =
IndexDataPageKey::calculate_last_page_range(offset, size as u32, page_size);
let last_range = IndexDataPageKey::calculate_last_page_range(offset, size, page_size);
// In this case, the last range will not be used.
assert_eq!(last_range, 0..4096);
}
Expand All @@ -354,12 +348,13 @@ mod test {
rng.fill_bytes(&mut data);
let file_id = FileId::random();

for _ in 0..1000 {
for _ in 0..100 {
let offset = rng.gen_range(0..data.len() as u64);
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
let page_size = rng.gen_range(1..1024);
let page_size: usize = rng.gen_range(1..1024);

let indexes = IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size);
let indexes =
IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
let page_num = indexes.len();
let mut read = Vec::with_capacity(size as usize);
let last_index = indexes.len() - 1;
Expand All @@ -372,22 +367,22 @@ mod test {
};
let range = if i == 0 {
// first page range
IndexDataPageKey::calculate_first_page_range(offset, size, page_size)
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
} else if i == last_index {
// last page range. when the first page is the last page, the range is not used.
IndexDataPageKey::calculate_last_page_range(offset, size, page_size)
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
} else {
0..page_size
};
read.extend_from_slice(&page[range]);
}
let expected_range = offset as usize..(offset + size as u64) as usize;
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
if read != data.get(expected_range).unwrap() {
panic!(
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\n read len: {}, expected len: {}\n first page range: {:?}, last page range: {:?}, page num: {}",
offset, size, page_size, read.len(), size as usize,
IndexDataPageKey::calculate_first_page_range(offset, size, page_size),
IndexDataPageKey::calculate_last_page_range(offset, size, page_size), page_num
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
);
}
}
Expand Down Expand Up @@ -439,7 +434,7 @@ mod test {
let mut cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, ReadableSize(10))),
Arc::new(InvertedIndexCache::new(8192, 8192, 100000000000)),
);
let metadata = cached_reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ pub struct InvertedIndexConfig {
pub metadata_cache_size: ReadableSize,
/// Cache size for inverted index content. Setting it to 0 to disable the cache.
pub content_cache_size: ReadableSize,
/// Page size for inverted index content.
pub content_cache_page_size: ReadableSize,
}

impl InvertedIndexConfig {
Expand All @@ -441,6 +443,7 @@ impl Default for InvertedIndexConfig {
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::mb(8),
};

if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ mod tests {
use std::iter;

use api::v1::SemanticType;
use common_base::readable_size::ReadableSize;
use datafusion_expr::{binary_expr, col, lit, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand Down Expand Up @@ -449,7 +448,7 @@ mod tests {

move |expr| {
let _d = &d;
let cache = Arc::new(InvertedIndexCache::new(10, 10, ReadableSize(10)));
let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl WorkerGroup {
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
.index_content_page_size(config.inverted_index.content_cache_page_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
Expand Down

0 comments on commit 6586bba

Please sign in to comment.