Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cache inverted index with fixed-size page #5114

Merged
merged 14 commits into from
Dec 13, 2024
Merged
Prev Previous commit
Next Next commit
fix: add unit test and fix bugs
  • Loading branch information
CookiePieWw committed Dec 12, 2024
commit 6abe813e6a9788ff3a18594cdca3876375f50cf0
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-channel = "1.9"
async-stream.workspace = true
async-trait = "0.1"
bytes.workspace = true
bytemuck.workspace = true
common-base.workspace = true
common-config.workspace = true
common-datasource.workspace = true
Expand Down
204 changes: 174 additions & 30 deletions src/mito2/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,36 @@ where
size: u32,
) -> index::inverted_index::error::Result<Vec<u8>> {
let indexes = IndexKey::index(self.file_id, offset, size);
// Size is 0, return empty data.
if indexes.is_empty() {
return Ok(Vec::new());
}
let mut data = Vec::with_capacity(size as usize);
// Safety: indexes is not empty.
let first_page_id = indexes[0].page_id;
let last_page_id = indexes.last().unwrap().page_id;
let last_page_id = indexes[indexes.len() - 1].page_id;
for index in indexes {
if let Some(cached) = self.cache.get_index(index.clone()) {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
data.extend_from_slice(&cached);
} else {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
let buf = self
.inner
.seek_read(index.page_id * (PAGE_SIZE as u64), PAGE_SIZE as u32)
.await?;
let first = IndexKey::offset_to_first_range(offset, size);
let last = IndexKey::offset_to_last_range(offset, size);
if index.page_id == first_page_id {
data.extend_from_slice(&buf[first.clone()]);
} else if index.page_id == last_page_id {
data.extend_from_slice(&buf[last.clone()]);
} else {
data.extend_from_slice(&buf);
let page = match self.cache.get_index(&index) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
page
}
self.cache.put_index(index, Arc::new(buf));
None => {
let page = self
.inner
.seek_read(index.page_id * (PAGE_SIZE as u64), PAGE_SIZE as u32)
.await?;
self.cache.put_index(index.clone(), Arc::new(page.clone()));
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
Arc::new(page)
CookiePieWw marked this conversation as resolved.
Show resolved Hide resolved
}
};
if index.page_id == first_page_id {
data.extend_from_slice(&page[IndexKey::offset_to_first_range(offset, size)]);
} else if index.page_id == last_page_id {
data.extend_from_slice(&page[IndexKey::offset_to_last_range(offset, size)]);
} else {
data.extend_from_slice(&page);
}
}
Ok(data)
Expand Down Expand Up @@ -147,25 +154,27 @@ pub struct IndexKey {

impl IndexKey {
fn offset_to_page_id(offset: u64) -> u64 {
(offset / (PAGE_SIZE as u64)) as u64
offset / (PAGE_SIZE as u64)
}

fn size_to_page_num(size: u32) -> u32 {
size / (PAGE_SIZE as u32)
size / (PAGE_SIZE as u32) + if size % (PAGE_SIZE as u32) == 0 { 0 } else { 1 }
}

/// Ranges of first page.
/// For example, if offset is 1000 and size is 2000, then the first page is 1000..4096.
fn offset_to_first_range(offset: u64, _size: u32) -> Range<usize> {
let first = (offset % (PAGE_SIZE as u64)) as usize..PAGE_SIZE;
first
/// For example, if offset is 1000 and size is 5000 and PAGE_SIZE is 4096, then the first page is 1000..4096.
fn offset_to_first_range(offset: u64, size: u32) -> Range<usize> {
(offset % (PAGE_SIZE as u64)) as usize..if size > PAGE_SIZE as u32 {
PAGE_SIZE
} else {
(offset % (PAGE_SIZE as u64) + size as u64) as usize
}
}

/// Ranges of last page.
/// For example, if offset is 1000 and size is 2000, then the last page is 0..904.
fn offset_to_last_range(_offset: u64, size: u32) -> Range<usize> {
let last = 0..(size % (PAGE_SIZE as u32)) as usize;
last
0..(size % (PAGE_SIZE as u32)) as usize
}

pub fn index(file_id: FileId, offset: u64, size: u32) -> Vec<Self> {
Expand Down Expand Up @@ -239,9 +248,8 @@ impl InvertedIndexCache {
self.index_metadata.insert(key, metadata)
}

// todo(hl): align index file content to pages with size like 4096 bytes.
pub fn get_index(&self, key: IndexKey) -> Option<Arc<Vec<u8>>> {
self.index.get(&key)
pub fn get_index(&self, key: &IndexKey) -> Option<Arc<Vec<u8>>> {
self.index.get(key)
}

pub fn put_index(&self, key: IndexKey, value: Arc<Vec<u8>>) {
Expand All @@ -261,3 +269,139 @@ fn index_metadata_weight(k: &IndexKey, v: &Arc<InvertedIndexMetas>) -> u32 {
fn index_content_weight(k: &IndexKey, v: &Arc<Vec<u8>>) -> u32 {
(k.file_id.as_bytes().len() + v.len()) as u32
}

#[cfg(test)]
mod test {
use std::num::NonZeroUsize;

use common_base::BitVec;
use futures::stream;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
use index::inverted_index::Bytes;

use super::*;

fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
}

async fn create_inverted_index_blob() -> Vec<u8> {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.add_index(
"tag0".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
Box::new(stream::iter(vec![
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
])),
)
.await
.unwrap();
writer
.add_index(
"tag1".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
Box::new(stream::iter(vec![
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
])),
)
.await
.unwrap();
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();

blob
}

#[tokio::test]
async fn test_inverted_index_cache() {
let blob = create_inverted_index_blob().await;
let reader = InvertedIndexBlobReader::new(blob);
let mut cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
reader,
Arc::new(InvertedIndexCache::new(8192, 8192)),
);

let metadata = cached_reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);

// tag0
let tag0 = metadata.metas.get("tag0").unwrap();
let stats0 = tag0.stats.as_ref().unwrap();
assert_eq!(stats0.distinct_count, 3);
assert_eq!(stats0.null_count, 1);
assert_eq!(stats0.min_value, Bytes::from("a"));
assert_eq!(stats0.max_value, Bytes::from("c"));
let fst0 = cached_reader
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));

// tag1
let tag1 = metadata.metas.get("tag1").unwrap();
let stats1 = tag1.stats.as_ref().unwrap();
assert_eq!(stats1.distinct_count, 3);
assert_eq!(stats1.null_count, 1);
assert_eq!(stats1.min_value, Bytes::from("x"));
assert_eq!(stats1.max_value, Bytes::from("z"));
let fst1 = cached_reader
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
}
}
5 changes: 2 additions & 3 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache};
use common_meta::key::schema_name::{SchemaName, SchemaNameValue};
use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
Expand All @@ -49,7 +48,7 @@ use datatypes::schema::ColumnSchema;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use moka::future::{Cache, CacheBuilder};
use moka::future::CacheBuilder;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;
Expand Down