Skip to content

Commit

Permalink
fix: math
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Dec 12, 2024
1 parent f20b94b commit 57e1bc3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl CacheManagerBuilder {
.build()
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size, PAGE_SIZE);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Expand Down
58 changes: 34 additions & 24 deletions src/mito2/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use index::inverted_index::FstMap;
use prost::Message;
use snafu::ResultExt;

use super::PAGE_SIZE;
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;

Expand Down Expand Up @@ -61,7 +60,7 @@ where
offset: u64,
size: u32,
) -> index::inverted_index::error::Result<Vec<u8>> {
let indexes = IndexKey::index(self.file_id, offset, size);
let indexes = IndexKey::index(self.file_id, offset, size, self.cache.page_size);
// Size is 0, return empty data.
if indexes.is_empty() {
return Ok(Vec::new());
Expand All @@ -79,17 +78,24 @@ where
None => {
let page = self
.inner
.seek_read(index.page_id * (PAGE_SIZE as u64), PAGE_SIZE as u32)
.seek_read(
index.page_id * (self.cache.page_size as u64),
self.cache.page_size as u32,
)
.await?;
self.cache.put_index(index.clone(), Arc::new(page.clone()));
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
Arc::new(page)
}
};
if index.page_id == first_page_id {
data.extend_from_slice(&page[IndexKey::offset_to_first_range(offset, size)]);
data.extend_from_slice(
&page[IndexKey::offset_to_first_range(offset, size, self.cache.page_size)],
);
} else if index.page_id == last_page_id {
data.extend_from_slice(&page[IndexKey::offset_to_last_range(offset, size)]);
data.extend_from_slice(
&page[IndexKey::offset_to_last_range(offset, size, self.cache.page_size)],
);
} else {
data.extend_from_slice(&page);
}
Expand Down Expand Up @@ -153,33 +159,36 @@ pub struct IndexKey {
}

impl IndexKey {
fn offset_to_page_id(offset: u64) -> u64 {
offset / (PAGE_SIZE as u64)
fn offset_to_page_id(offset: u64, page_size: usize) -> u64 {
offset / (page_size as u64)
}

fn size_to_page_num(size: u32) -> u32 {
size / (PAGE_SIZE as u32) + if size % (PAGE_SIZE as u32) == 0 { 0 } else { 1 }
fn size_to_page_num(offset: u64, size: u32, page_size: usize) -> u32 {
let first_page = Self::offset_to_page_id(offset, page_size);
let last_page = Self::offset_to_page_id(offset + size as u64, page_size);
(last_page - first_page + 1) as u32
}

/// Ranges of first page.
/// For example, if offset is 1000 and size is 5000 and PAGE_SIZE is 4096, then the first page is 1000..4096.
fn offset_to_first_range(offset: u64, size: u32) -> Range<usize> {
(offset % (PAGE_SIZE as u64)) as usize..if size > PAGE_SIZE as u32 {
PAGE_SIZE
} else {
(offset % (PAGE_SIZE as u64) + size as u64) as usize
}
fn offset_to_first_range(offset: u64, size: u32, page_size: usize) -> Range<usize> {
(offset % (page_size as u64)) as usize
..if size > page_size as u32 - (offset % (page_size as u64)) as u32 {
page_size
} else {
(offset % (page_size as u64) + size as u64) as usize
}
}

/// Ranges of last page.
/// For example, if offset is 1000 and size is 5000 and PAGE_SIZE is 4096, then the last page is 0..904.
fn offset_to_last_range(_offset: u64, size: u32) -> Range<usize> {
0..(size % (PAGE_SIZE as u32)) as usize
fn offset_to_last_range(offset: u64, size: u32, page_size: usize) -> Range<usize> {
0..((size as u64 + offset) % (page_size as u64)) as usize
}

fn index(file_id: FileId, offset: u64, size: u32) -> Vec<Self> {
let page_id = Self::offset_to_page_id(offset);
let page_num = Self::size_to_page_num(size);
fn index(file_id: FileId, offset: u64, size: u32, page_size: usize) -> Vec<Self> {
let page_id = Self::offset_to_page_id(offset, page_size);
let page_num = Self::size_to_page_num(offset, size, page_size);
(0..page_num)
.map(|i| Self {
file_id,
Expand All @@ -196,11 +205,13 @@ pub struct InvertedIndexCache {
index_metadata: moka::sync::Cache<IndexKey, Arc<InvertedIndexMetas>>,
/// Cache for inverted index content.
index: moka::sync::Cache<IndexKey, Arc<Vec<u8>>>,
// Page size for index content.
page_size: usize,
}

impl InvertedIndexCache {
/// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`.
pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self {
pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: usize) -> Self {
common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}");
let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
.name("inverted_index_metadata")
Expand All @@ -225,6 +236,7 @@ impl InvertedIndexCache {
Self {
index_metadata,
index: index_cache,
page_size,
}
}
}
Expand Down Expand Up @@ -328,14 +340,12 @@ mod test {
let mut cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
reader,
Arc::new(InvertedIndexCache::new(8192, 8192)),
Arc::new(InvertedIndexCache::new(8192, 8192, 10)),
);

let metadata = cached_reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);

// tag0
let tag0 = metadata.metas.get("tag0").unwrap();
let stats0 = tag0.stats.as_ref().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ mod tests {

move |expr| {
let _d = &d;
let cache = Arc::new(InvertedIndexCache::new(10, 10));
let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),
Expand Down

0 comments on commit 57e1bc3

Please sign in to comment.