diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 55aee5b15400..f0bc36288d22 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -342,7 +342,7 @@ impl CacheManagerBuilder { .build() }); let inverted_index_cache = - InvertedIndexCache::new(self.index_metadata_size, self.index_content_size); + InvertedIndexCache::new(self.index_metadata_size, self.index_content_size, PAGE_SIZE); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { Cache::builder() .max_capacity(self.selector_result_cache_size) diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 749d0dacc881..14345a7256c0 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -24,7 +24,6 @@ use index::inverted_index::FstMap; use prost::Message; use snafu::ResultExt; -use super::PAGE_SIZE; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; @@ -61,7 +60,7 @@ where offset: u64, size: u32, ) -> index::inverted_index::error::Result> { - let indexes = IndexKey::index(self.file_id, offset, size); + let indexes = IndexKey::index(self.file_id, offset, size, self.cache.page_size); // Size is 0, return empty data. if indexes.is_empty() { return Ok(Vec::new()); @@ -79,7 +78,10 @@ where None => { let page = self .inner - .seek_read(index.page_id * (PAGE_SIZE as u64), PAGE_SIZE as u32) + .seek_read( + index.page_id * (self.cache.page_size as u64), + self.cache.page_size as u32, + ) .await?; self.cache.put_index(index.clone(), Arc::new(page.clone())); CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); @@ -87,9 +89,13 @@ where } }; if index.page_id == first_page_id { - data.extend_from_slice(&page[IndexKey::offset_to_first_range(offset, size)]); + data.extend_from_slice( + &page[IndexKey::offset_to_first_range(offset, size, self.cache.page_size)], + ); } else if index.page_id == last_page_id { - data.extend_from_slice(&page[IndexKey::offset_to_last_range(offset, size)]); + data.extend_from_slice( + &page[IndexKey::offset_to_last_range(offset, size, self.cache.page_size)], + ); } else { data.extend_from_slice(&page); } @@ -153,33 +159,36 @@ pub struct IndexKey { } impl IndexKey { - fn offset_to_page_id(offset: u64) -> u64 { - offset / (PAGE_SIZE as u64) + fn offset_to_page_id(offset: u64, page_size: usize) -> u64 { + offset / (page_size as u64) } - fn size_to_page_num(size: u32) -> u32 { - size / (PAGE_SIZE as u32) + if size % (PAGE_SIZE as u32) == 0 { 0 } else { 1 } + fn size_to_page_num(offset: u64, size: u32, page_size: usize) -> u32 { + let first_page = Self::offset_to_page_id(offset, page_size); + let last_page = Self::offset_to_page_id(offset + size as u64, page_size); + (last_page - first_page + 1) as u32 } /// Ranges of first page. /// For example, if offset is 1000 and size is 5000 and PAGE_SIZE is 4096, then the first page is 1000..4096. - fn offset_to_first_range(offset: u64, size: u32) -> Range { - (offset % (PAGE_SIZE as u64)) as usize..if size > PAGE_SIZE as u32 { - PAGE_SIZE - } else { - (offset % (PAGE_SIZE as u64) + size as u64) as usize - } + fn offset_to_first_range(offset: u64, size: u32, page_size: usize) -> Range { + (offset % (page_size as u64)) as usize + ..if size > page_size as u32 - (offset % (page_size as u64)) as u32 { + page_size + } else { + (offset % (page_size as u64) + size as u64) as usize + } } /// Ranges of last page. /// For example, if offset is 1000 and size is 5000 and PAGE_SIZE is 4096, then the last page is 0..904. - fn offset_to_last_range(_offset: u64, size: u32) -> Range { - 0..(size % (PAGE_SIZE as u32)) as usize + fn offset_to_last_range(offset: u64, size: u32, page_size: usize) -> Range { + 0..((size as u64 + offset) % (page_size as u64)) as usize } - fn index(file_id: FileId, offset: u64, size: u32) -> Vec { - let page_id = Self::offset_to_page_id(offset); - let page_num = Self::size_to_page_num(size); + fn index(file_id: FileId, offset: u64, size: u32, page_size: usize) -> Vec { + let page_id = Self::offset_to_page_id(offset, page_size); + let page_num = Self::size_to_page_num(offset, size, page_size); (0..page_num) .map(|i| Self { file_id, @@ -196,11 +205,13 @@ pub struct InvertedIndexCache { index_metadata: moka::sync::Cache>, /// Cache for inverted index content. index: moka::sync::Cache>>, + // Page size for index content. + page_size: usize, } impl InvertedIndexCache { /// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`. - pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self { + pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: usize) -> Self { common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}"); let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap) .name("inverted_index_metadata") @@ -225,6 +236,7 @@ impl InvertedIndexCache { Self { index_metadata, index: index_cache, + page_size, } } } @@ -328,14 +340,12 @@ mod test { let mut cached_reader = CachedInvertedIndexBlobReader::new( FileId::random(), reader, - Arc::new(InvertedIndexCache::new(8192, 8192)), + Arc::new(InvertedIndexCache::new(8192, 8192, 10)), ); - let metadata = cached_reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); assert_eq!(metadata.segment_row_count, 1); assert_eq!(metadata.metas.len(), 2); - // tag0 let tag0 = metadata.metas.get("tag0").unwrap(); let stats0 = tag0.stats.as_ref().unwrap(); diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 6db1ef6e0b7b..2fe209ad2d8c 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -446,7 +446,7 @@ mod tests { move |expr| { let _d = &d; - let cache = Arc::new(InvertedIndexCache::new(10, 10)); + let cache = Arc::new(InvertedIndexCache::new(10, 10, 100)); let applier = InvertedIndexApplierBuilder::new( region_dir.clone(), object_store.clone(),