From 992e75ae39d6a7729eeb60e095bb92c6c32fe4fa Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 23 Dec 2024 10:51:24 +0000 Subject: [PATCH 1/5] feat(index-cache): abstract `IndexCache` to be shared by multi types of indexes Signed-off-by: Zhenchi --- src/index/src/inverted_index/format/reader.rs | 2 +- .../src/inverted_index/format/reader/blob.rs | 2 +- src/mito2/src/cache.rs | 2 +- src/mito2/src/cache/index.rs | 541 +++++------------- src/mito2/src/cache/index/inverted_index.rs | 369 ++++++++++++ src/mito2/src/metrics.rs | 8 +- .../src/sst/index/inverted_index/applier.rs | 2 +- .../index/inverted_index/applier/builder.rs | 2 +- .../src/sst/index/inverted_index/creator.rs | 2 +- 9 files changed, 515 insertions(+), 415 deletions(-) create mode 100644 src/mito2/src/cache/index/inverted_index.rs diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 21e5487d1e42..c914b9007a6a 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -31,7 +31,7 @@ mod footer; /// InvertedIndexReader defines an asynchronous reader of inverted index data #[mockall::automock] #[async_trait] -pub trait InvertedIndexReader: Send { +pub trait InvertedIndexReader: Send + Sync { /// Seeks to given offset and reads data with exact size as provided. async fn range_read(&mut self, offset: u64, size: u32) -> Result>; diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index fcaa63773d93..73d98835794a 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -51,7 +51,7 @@ impl InvertedIndexBlobReader { } #[async_trait] -impl InvertedIndexReader for InvertedIndexBlobReader { +impl InvertedIndexReader for InvertedIndexBlobReader { async fn range_read(&mut self, offset: u64, size: u32) -> Result> { let buf = self .source diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 03cf9136245a..a1864c55179e 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -37,7 +37,7 @@ use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector}; use crate::cache::cache_size::parquet_meta_size; use crate::cache::file_cache::{FileType, IndexKey}; -use crate::cache::index::{InvertedIndexCache, InvertedIndexCacheRef}; +use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index de39ea3784b6..37ae6143378c 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -12,168 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod inverted_index; + +use std::future::Future; use std::ops::Range; use std::sync::Arc; -use api::v1::index::InvertedIndexMetas; -use async_trait::async_trait; use bytes::Bytes; -use common_base::BitVec; -use index::inverted_index::error::DecodeFstSnafu; -use index::inverted_index::format::reader::InvertedIndexReader; -use index::inverted_index::FstMap; use object_store::Buffer; -use prost::Message; -use snafu::ResultExt; use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; -use crate::sst::file::FileId; /// Metrics for index metadata. const INDEX_METADATA_TYPE: &str = "index_metadata"; /// Metrics for index content. const INDEX_CONTENT_TYPE: &str = "index_content"; -/// Inverted index blob reader with cache. -pub struct CachedInvertedIndexBlobReader { - file_id: FileId, - file_size: u64, - inner: R, - cache: InvertedIndexCacheRef, -} - -impl CachedInvertedIndexBlobReader { - pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self { - Self { - file_id, - file_size, - inner, - cache, - } - } -} - -impl CachedInvertedIndexBlobReader -where - R: InvertedIndexReader, -{ - /// Gets given range of index data from cache, and loads from source if the file - /// is not already cached. - async fn get_or_load( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result> { - let keys = - IndexDataPageKey::generate_page_keys(self.file_id, offset, size, self.cache.page_size); - // Size is 0, return empty data. - if keys.is_empty() { - return Ok(Vec::new()); - } - let mut data = Vec::with_capacity(keys.len()); - data.resize(keys.len(), Bytes::new()); - let mut cache_miss_range = vec![]; - let mut cache_miss_idx = vec![]; - let last_index = keys.len() - 1; - // TODO: Avoid copy as much as possible. - for (i, index) in keys.iter().enumerate() { - match self.cache.get_index(index) { - Some(page) => { - CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); - data[i] = page; - } - None => { - CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); - let base_offset = index.page_id * self.cache.page_size; - let pruned_size = if i == last_index { - prune_size(&keys, self.file_size, self.cache.page_size) - } else { - self.cache.page_size - }; - cache_miss_range.push(base_offset..base_offset + pruned_size); - cache_miss_idx.push(i); - } - } - } - if !cache_miss_range.is_empty() { - let pages = self.inner.read_vec(&cache_miss_range).await?; - for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { - let key = keys[i].clone(); - data[i] = page.clone(); - self.cache.put_index(key, page.clone()); - } - } - let buffer = Buffer::from_iter(data.into_iter()); - Ok(buffer - .slice(IndexDataPageKey::calculate_range( - offset, - size, - self.cache.page_size, - )) - .to_vec()) - } -} - -#[async_trait] -impl InvertedIndexReader for CachedInvertedIndexBlobReader { - async fn range_read( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result> { - self.inner.range_read(offset, size).await - } - - async fn read_vec( - &mut self, - ranges: &[Range], - ) -> index::inverted_index::error::Result> { - self.inner.read_vec(ranges).await - } - - async fn metadata(&mut self) -> index::inverted_index::error::Result> { - if let Some(cached) = self.cache.get_index_metadata(self.file_id) { - CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); - Ok(cached) - } else { - let meta = self.inner.metadata().await?; - self.cache.put_index_metadata(self.file_id, meta.clone()); - CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); - Ok(meta) - } - } - - async fn fst( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result { - self.get_or_load(offset, size) - .await - .and_then(|r| FstMap::new(r).context(DecodeFstSnafu)) - } - - async fn bitmap( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result { - self.get_or_load(offset, size).await.map(BitVec::from_vec) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct IndexMetadataKey { - file_id: FileId, -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct IndexDataPageKey { - file_id: FileId, +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct PageKey { page_id: u64, } -impl IndexDataPageKey { +impl PageKey { /// Converts an offset to a page ID based on the page size. fn calculate_page_id(offset: u64, page_size: u64) -> u64 { offset / page_size @@ -200,50 +60,64 @@ impl IndexDataPageKey { } /// Generates a vector of IndexKey instances for the pages that a given offset and size span. - fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec { + fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator { let start_page = Self::calculate_page_id(offset, page_size); let total_pages = Self::calculate_page_count(offset, size, page_size); - (0..total_pages) - .map(|i| Self { - file_id, - page_id: start_page + i as u64, - }) - .collect() + (0..total_pages).map(move |i| Self { + page_id: start_page + i as u64, + }) } } -pub type InvertedIndexCacheRef = Arc; - -pub struct InvertedIndexCache { - /// Cache for inverted index metadata - index_metadata: moka::sync::Cache>, - /// Cache for inverted index content. - index: moka::sync::Cache, +/// Cache for index metadata and content. +pub struct IndexCache { + /// Cache for index metadata + index_metadata: moka::sync::Cache>, + /// Cache for index content. + index: moka::sync::Cache<(K, PageKey), Bytes>, // Page size for index content. page_size: u64, + + /// Index type for telemetry. + index_type: &'static str, + + /// Weighter for metadata. + weight_of_metadata: fn(&K, &Arc) -> u32, + /// Weighter for content. + weight_of_content: fn(&(K, PageKey), &Bytes) -> u32, } -impl InvertedIndexCache { - /// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`. - pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self { - common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}"); +impl IndexCache +where + K: std::hash::Hash + Eq + Send + Sync + 'static, + M: Send + Sync + 'static, +{ + pub fn new_with_weighter( + index_metadata_cap: u64, + index_content_cap: u64, + page_size: u64, + index_type: &'static str, + weight_of_metadata: fn(&K, &Arc) -> u32, + weight_of_content: fn(&(K, PageKey), &Bytes) -> u32, + ) -> Self { + common_telemetry::debug!("Building IndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}, page size: {page_size}, index type: {index_type}"); let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap) - .name("inverted_index_metadata") - .weigher(index_metadata_weight) - .eviction_listener(|k, v, _cause| { - let size = index_metadata_weight(&k, &v); + .name(&format!("index_metadata_{}", index_type)) + .weigher(weight_of_metadata) + .eviction_listener(move |k, v, _cause| { + let size = weight_of_metadata(&k, &v); CACHE_BYTES - .with_label_values(&[INDEX_METADATA_TYPE]) + .with_label_values(&[INDEX_METADATA_TYPE, index_type]) .sub(size.into()); }) .build(); let index_cache = moka::sync::CacheBuilder::new(index_content_cap) - .name("inverted_index_content") - .weigher(index_content_weight) - .eviction_listener(|k, v, _cause| { - let size = index_content_weight(&k, &v); + .name(&format!("index_content_{}", index_type)) + .weigher(weight_of_content) + .eviction_listener(move |k, v, _cause| { + let size = weight_of_content(&k, &v); CACHE_BYTES - .with_label_values(&[INDEX_CONTENT_TYPE]) + .with_label_values(&[INDEX_CONTENT_TYPE, index_type]) .sub(size.into()); }) .build(); @@ -251,259 +125,114 @@ impl InvertedIndexCache { index_metadata, index: index_cache, page_size, + index_type, + weight_of_content, + weight_of_metadata, } } } -impl InvertedIndexCache { - pub fn get_index_metadata(&self, file_id: FileId) -> Option> { - self.index_metadata.get(&IndexMetadataKey { file_id }) +impl IndexCache +where + K: std::hash::Hash + Eq + Clone + Copy + Send + Sync + 'static, + M: Send + Sync + 'static, +{ + pub fn get_index_metadata(&self, key: K) -> Option> { + self.index_metadata.get(&key) } - pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc) { - let key = IndexMetadataKey { file_id }; + pub fn put_index_metadata(&self, key: K, metadata: Arc) { CACHE_BYTES - .with_label_values(&[INDEX_METADATA_TYPE]) - .add(index_metadata_weight(&key, &metadata).into()); + .with_label_values(&[INDEX_METADATA_TYPE, self.index_type]) + .add((self.weight_of_metadata)(&key, &metadata).into()); self.index_metadata.insert(key, metadata) } - pub fn get_index(&self, key: &IndexDataPageKey) -> Option { - self.index.get(key) + pub fn get_index(&self, key: K, page_key: PageKey) -> Option { + self.index.get(&(key, page_key)) } - pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) { + pub fn put_index(&self, key: K, page_key: PageKey, value: Bytes) { CACHE_BYTES - .with_label_values(&[INDEX_CONTENT_TYPE]) - .add(index_content_weight(&key, &value).into()); - self.index.insert(key, value); + .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) + .add((self.weight_of_content)(&(key, page_key), &value).into()); + self.index.insert((key, page_key), value); } -} - -/// Calculates weight for index metadata. -fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc) -> u32 { - (k.file_id.as_bytes().len() + v.encoded_len()) as u32 -} -/// Calculates weight for index content. -fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 { - (k.file_id.as_bytes().len() + v.len()) as u32 + /// Gets given range of index data from cache, and loads from source if the file + /// is not already cached. + async fn get_or_load( + &self, + key: K, + file_size: u64, + offset: u64, + size: u32, + load: F, + ) -> std::result::Result, E> + where + F: FnOnce(Vec>) -> Fut, + Fut: Future, E>>, + E: std::error::Error, + { + let page_keys = + PageKey::generate_page_keys(offset, size, self.page_size).collect::>(); + // Size is 0, return empty data. + if page_keys.is_empty() { + return Ok(Vec::new()); + } + let mut data = Vec::with_capacity(page_keys.len()); + data.resize(page_keys.len(), Bytes::new()); + let mut cache_miss_range = vec![]; + let mut cache_miss_idx = vec![]; + let last_index = page_keys.len() - 1; + // TODO: Avoid copy as much as possible. + for (i, page_key) in page_keys.iter().enumerate() { + match self.get_index(key, *page_key) { + Some(page) => { + CACHE_HIT + .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) + .inc(); + data[i] = page; + } + None => { + CACHE_MISS + .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) + .inc(); + let base_offset = page_key.page_id * self.page_size; + let pruned_size = if i == last_index { + prune_size(page_keys.iter(), file_size, self.page_size) + } else { + self.page_size + }; + cache_miss_range.push(base_offset..base_offset + pruned_size); + cache_miss_idx.push(i); + } + } + } + if !cache_miss_range.is_empty() { + let pages = load(cache_miss_range).await?; + for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { + let page_key = page_keys[i]; + data[i] = page.clone(); + self.put_index(key, page_key, page.clone()); + } + } + let buffer = Buffer::from_iter(data.into_iter()); + Ok(buffer + .slice(PageKey::calculate_range(offset, size, self.page_size)) + .to_vec()) + } } /// Prunes the size of the last page based on the indexes. /// We have following cases: /// 1. The rest file size is less than the page size, read to the end of the file. /// 2. Otherwise, read the page size. -fn prune_size(indexes: &[IndexDataPageKey], file_size: u64, page_size: u64) -> u64 { +fn prune_size<'a>( + indexes: impl Iterator, + file_size: u64, + page_size: u64, +) -> u64 { let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0); page_size.min(file_size - last_page_start) } - -#[cfg(test)] -mod test { - use std::num::NonZeroUsize; - - use common_base::BitVec; - use futures::stream; - use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; - use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter}; - use index::inverted_index::Bytes; - use prometheus::register_int_counter_vec; - use rand::{Rng, RngCore}; - - use super::*; - use crate::sst::index::store::InstrumentedStore; - use crate::test_util::TestEnv; - - // Repeat times for following little fuzz tests. - const FUZZ_REPEAT_TIMES: usize = 100; - - // Fuzz test for index data page key - #[test] - fn fuzz_index_calculation() { - // randomly generate a large u8 array - let mut rng = rand::thread_rng(); - let mut data = vec![0u8; 1024 * 1024]; - rng.fill_bytes(&mut data); - let file_id = FileId::random(); - - for _ in 0..FUZZ_REPEAT_TIMES { - let offset = rng.gen_range(0..data.len() as u64); - let size = rng.gen_range(0..data.len() as u32 - offset as u32); - let page_size: usize = rng.gen_range(1..1024); - - let indexes = - IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64); - let page_num = indexes.len(); - let mut read = Vec::with_capacity(size as usize); - for key in indexes.into_iter() { - let start = key.page_id as usize * page_size; - let page = if start + page_size < data.len() { - &data[start..start + page_size] - } else { - &data[start..] - }; - read.extend_from_slice(page); - } - let expected_range = offset as usize..(offset + size as u64 as u64) as usize; - let read = - read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec(); - if read != data.get(expected_range).unwrap() { - panic!( - "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", - offset, size, page_size, read.len(), size as usize, - IndexDataPageKey::calculate_range(offset, size, page_size as u64), - page_num - ); - } - } - } - - fn unpack(fst_value: u64) -> [u32; 2] { - bytemuck::cast::(fst_value) - } - - async fn create_inverted_index_blob() -> Vec { - let mut blob = Vec::new(); - let mut writer = InvertedIndexBlobWriter::new(&mut blob); - writer - .add_index( - "tag0".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - Box::new(stream::iter(vec![ - Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), - ])), - ) - .await - .unwrap(); - writer - .add_index( - "tag1".to_string(), - BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), - Box::new(stream::iter(vec![ - Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), - Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), - Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), - ])), - ) - .await - .unwrap(); - writer - .finish(8, NonZeroUsize::new(1).unwrap()) - .await - .unwrap(); - - blob - } - - #[tokio::test] - async fn test_inverted_index_cache() { - let blob = create_inverted_index_blob().await; - - // Init a test range reader in local fs. - let mut env = TestEnv::new(); - let file_size = blob.len() as u64; - let store = env.init_object_store_manager(); - let temp_path = "data"; - store.write(temp_path, blob).await.unwrap(); - let store = InstrumentedStore::new(store); - let metric = - register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap(); - let counter = metric.with_label_values(&["test"]); - let range_reader = store - .range_reader("data", &counter, &counter) - .await - .unwrap(); - - let reader = InvertedIndexBlobReader::new(range_reader); - let mut cached_reader = CachedInvertedIndexBlobReader::new( - FileId::random(), - file_size, - reader, - Arc::new(InvertedIndexCache::new(8192, 8192, 50)), - ); - let metadata = cached_reader.metadata().await.unwrap(); - assert_eq!(metadata.total_row_count, 8); - assert_eq!(metadata.segment_row_count, 1); - assert_eq!(metadata.metas.len(), 2); - // tag0 - let tag0 = metadata.metas.get("tag0").unwrap(); - let stats0 = tag0.stats.as_ref().unwrap(); - assert_eq!(stats0.distinct_count, 3); - assert_eq!(stats0.null_count, 1); - assert_eq!(stats0.min_value, Bytes::from("a")); - assert_eq!(stats0.max_value, Bytes::from("c")); - let fst0 = cached_reader - .fst( - tag0.base_offset + tag0.relative_fst_offset as u64, - tag0.fst_size, - ) - .await - .unwrap(); - assert_eq!(fst0.len(), 3); - let [offset, size] = unpack(fst0.get(b"a").unwrap()); - let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); - let [offset, size] = unpack(fst0.get(b"b").unwrap()); - let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); - let [offset, size] = unpack(fst0.get(b"c").unwrap()); - let bitmap = cached_reader - .bitmap(tag0.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); - - // tag1 - let tag1 = metadata.metas.get("tag1").unwrap(); - let stats1 = tag1.stats.as_ref().unwrap(); - assert_eq!(stats1.distinct_count, 3); - assert_eq!(stats1.null_count, 1); - assert_eq!(stats1.min_value, Bytes::from("x")); - assert_eq!(stats1.max_value, Bytes::from("z")); - let fst1 = cached_reader - .fst( - tag1.base_offset + tag1.relative_fst_offset as u64, - tag1.fst_size, - ) - .await - .unwrap(); - assert_eq!(fst1.len(), 3); - let [offset, size] = unpack(fst1.get(b"x").unwrap()); - let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); - let [offset, size] = unpack(fst1.get(b"y").unwrap()); - let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); - let [offset, size] = unpack(fst1.get(b"z").unwrap()); - let bitmap = cached_reader - .bitmap(tag1.base_offset + offset as u64, size) - .await - .unwrap(); - assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); - - // fuzz test - let mut rng = rand::thread_rng(); - for _ in 0..FUZZ_REPEAT_TIMES { - let offset = rng.gen_range(0..file_size); - let size = rng.gen_range(0..file_size as u32 - offset as u32); - let expected = cached_reader.range_read(offset, size).await.unwrap(); - let read = cached_reader.get_or_load(offset, size).await.unwrap(); - assert_eq!(read, expected); - } - } -} diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs new file mode 100644 index 000000000000..8bb088df789b --- /dev/null +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -0,0 +1,369 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Range; +use std::sync::Arc; + +use api::v1::index::InvertedIndexMetas; +use async_trait::async_trait; +use bytes::Bytes; +use common_base::BitVec; +use index::inverted_index::error::DecodeFstSnafu; +use index::inverted_index::format::reader::InvertedIndexReader; +use index::inverted_index::FstMap; +use prost::Message; +use snafu::ResultExt; + +use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; +use crate::metrics::{CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; + +/// Metrics for inverted index. +const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index"; + +/// Cache for inverted index. +pub type InvertedIndexCache = IndexCache; +pub type InvertedIndexCacheRef = Arc; + +impl InvertedIndexCache { + /// Creates a new inverted index cache. + pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self { + Self::new_with_weighter( + index_metadata_cap, + index_content_cap, + page_size, + INDEX_TYPE_INVERTED_INDEX, + inverted_index_metadata_weight, + inverted_index_content_weight, + ) + } +} + +/// Calculates weight for inveretd index metadata. +fn inverted_index_metadata_weight(k: &FileId, v: &Arc) -> u32 { + (k.as_bytes().len() + v.encoded_len()) as u32 +} + +/// Calculates weight for inveretd index content. +fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { + (k.as_bytes().len() + v.len()) as u32 +} + +/// Inverted index blob reader with cache. +pub struct CachedInvertedIndexBlobReader { + file_id: FileId, + file_size: u64, + inner: R, + cache: InvertedIndexCacheRef, +} + +impl CachedInvertedIndexBlobReader { + /// Creates a new inverted index blob reader with cache. + pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self { + Self { + file_id, + file_size, + inner, + cache, + } + } +} + +#[async_trait] +impl InvertedIndexReader for CachedInvertedIndexBlobReader { + async fn range_read( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result> { + self.inner.range_read(offset, size).await + } + + async fn read_vec( + &mut self, + ranges: &[Range], + ) -> index::inverted_index::error::Result> { + self.inner.read_vec(ranges).await + } + + async fn metadata(&mut self) -> index::inverted_index::error::Result> { + if let Some(cached) = self.cache.get_index_metadata(self.file_id) { + CACHE_HIT + .with_label_values(&[INDEX_METADATA_TYPE, INDEX_TYPE_INVERTED_INDEX]) + .inc(); + Ok(cached) + } else { + let meta = self.inner.metadata().await?; + self.cache.put_index_metadata(self.file_id, meta.clone()); + CACHE_MISS + .with_label_values(&[INDEX_METADATA_TYPE, INDEX_TYPE_INVERTED_INDEX]) + .inc(); + Ok(meta) + } + } + + async fn fst( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result { + let inner = &mut self.inner; + self.cache + .get_or_load( + self.file_id, + self.file_size, + offset, + size, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await + .and_then(|r| FstMap::new(r).context(DecodeFstSnafu)) + } + + async fn bitmap( + &mut self, + offset: u64, + size: u32, + ) -> index::inverted_index::error::Result { + let inner = &mut self.inner; + self.cache + .get_or_load( + self.file_id, + self.file_size, + offset, + size, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await + .map(BitVec::from_vec) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use common_base::BitVec; + use futures::stream; + use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; + use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter}; + use index::inverted_index::Bytes; + use prometheus::register_int_counter_vec; + use rand::{Rng, RngCore}; + + use super::*; + use crate::sst::index::store::InstrumentedStore; + use crate::test_util::TestEnv; + + // Repeat times for following little fuzz tests. + const FUZZ_REPEAT_TIMES: usize = 100; + + // Fuzz test for index data page key + #[test] + fn fuzz_index_calculation() { + // randomly generate a large u8 array + let mut rng = rand::thread_rng(); + let mut data = vec![0u8; 1024 * 1024]; + rng.fill_bytes(&mut data); + + for _ in 0..FUZZ_REPEAT_TIMES { + let offset = rng.gen_range(0..data.len() as u64); + let size = rng.gen_range(0..data.len() as u32 - offset as u32); + let page_size: usize = rng.gen_range(1..1024); + + let indexes = + PageKey::generate_page_keys(offset, size, page_size as u64).collect::>(); + let page_num = indexes.len(); + let mut read = Vec::with_capacity(size as usize); + for key in indexes.into_iter() { + let start = key.page_id as usize * page_size; + let page = if start + page_size < data.len() { + &data[start..start + page_size] + } else { + &data[start..] + }; + read.extend_from_slice(page); + } + let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec(); + if read != data.get(expected_range).unwrap() { + panic!( + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", + offset, size, page_size, read.len(), size as usize, + PageKey::calculate_range(offset, size, page_size as u64), + page_num + ); + } + } + } + + fn unpack(fst_value: u64) -> [u32; 2] { + bytemuck::cast::(fst_value) + } + + async fn create_inverted_index_blob() -> Vec { + let mut blob = Vec::new(); + let mut writer = InvertedIndexBlobWriter::new(&mut blob); + writer + .add_index( + "tag0".to_string(), + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Box::new(stream::iter(vec![ + Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))), + Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))), + ])), + ) + .await + .unwrap(); + writer + .add_index( + "tag1".to_string(), + BitVec::from_slice(&[0b0000_0001, 0b0000_0000]), + Box::new(stream::iter(vec![ + Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))), + Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))), + Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))), + ])), + ) + .await + .unwrap(); + writer + .finish(8, NonZeroUsize::new(1).unwrap()) + .await + .unwrap(); + + blob + } + + #[tokio::test] + async fn test_inverted_index_cache() { + let blob = create_inverted_index_blob().await; + + // Init a test range reader in local fs. + let mut env = TestEnv::new(); + let file_size = blob.len() as u64; + let store = env.init_object_store_manager(); + let temp_path = "data"; + store.write(temp_path, blob).await.unwrap(); + let store = InstrumentedStore::new(store); + let metric = + register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap(); + let counter = metric.with_label_values(&["test"]); + let range_reader = store + .range_reader("data", &counter, &counter) + .await + .unwrap(); + + let reader = InvertedIndexBlobReader::new(range_reader); + let mut cached_reader = CachedInvertedIndexBlobReader::new( + FileId::random(), + file_size, + reader, + Arc::new(InvertedIndexCache::new(8192, 8192, 50)), + ); + let metadata = cached_reader.metadata().await.unwrap(); + assert_eq!(metadata.total_row_count, 8); + assert_eq!(metadata.segment_row_count, 1); + assert_eq!(metadata.metas.len(), 2); + // tag0 + let tag0 = metadata.metas.get("tag0").unwrap(); + let stats0 = tag0.stats.as_ref().unwrap(); + assert_eq!(stats0.distinct_count, 3); + assert_eq!(stats0.null_count, 1); + assert_eq!(stats0.min_value, Bytes::from("a")); + assert_eq!(stats0.max_value, Bytes::from("c")); + let fst0 = cached_reader + .fst( + tag0.base_offset + tag0.relative_fst_offset as u64, + tag0.fst_size, + ) + .await + .unwrap(); + assert_eq!(fst0.len(), 3); + let [offset, size] = unpack(fst0.get(b"a").unwrap()); + let bitmap = cached_reader + .bitmap(tag0.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + let [offset, size] = unpack(fst0.get(b"b").unwrap()); + let bitmap = cached_reader + .bitmap(tag0.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + let [offset, size] = unpack(fst0.get(b"c").unwrap()); + let bitmap = cached_reader + .bitmap(tag0.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + + // tag1 + let tag1 = metadata.metas.get("tag1").unwrap(); + let stats1 = tag1.stats.as_ref().unwrap(); + assert_eq!(stats1.distinct_count, 3); + assert_eq!(stats1.null_count, 1); + assert_eq!(stats1.min_value, Bytes::from("x")); + assert_eq!(stats1.max_value, Bytes::from("z")); + let fst1 = cached_reader + .fst( + tag1.base_offset + tag1.relative_fst_offset as u64, + tag1.fst_size, + ) + .await + .unwrap(); + assert_eq!(fst1.len(), 3); + let [offset, size] = unpack(fst1.get(b"x").unwrap()); + let bitmap = cached_reader + .bitmap(tag1.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + let [offset, size] = unpack(fst1.get(b"y").unwrap()); + let bitmap = cached_reader + .bitmap(tag1.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000])); + let [offset, size] = unpack(fst1.get(b"z").unwrap()); + let bitmap = cached_reader + .bitmap(tag1.base_offset + offset as u64, size) + .await + .unwrap(); + assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001])); + + // fuzz test + let mut rng = rand::thread_rng(); + for _ in 0..FUZZ_REPEAT_TIMES { + let offset = rng.gen_range(0..file_size); + let size = rng.gen_range(0..file_size as u32 - offset as u32); + let expected = cached_reader.range_read(offset, size).await.unwrap(); + let inner = &mut cached_reader.inner; + let read = cached_reader + .cache + .get_or_load( + cached_reader.file_id, + file_size, + offset, + size, + |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await + .unwrap(); + assert_eq!(read, expected); + } + } +} diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 5a5d76da4c0b..e51062a053fb 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -28,6 +28,8 @@ pub const FILE_TYPE_LABEL: &str = "file_type"; pub const WORKER_LABEL: &str = "worker"; /// Partition label. pub const PARTITION_LABEL: &str = "partition"; +/// Index type label. +pub const INDEX_TYPE_LABEL: &str = "index_type"; lazy_static! { /// Global write buffer size in bytes. @@ -195,21 +197,21 @@ lazy_static! { pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!( "greptime_mito_cache_hit", "mito cache hit", - &[TYPE_LABEL] + &[TYPE_LABEL, INDEX_TYPE_LABEL] ) .unwrap(); /// Cache miss counter. pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!( "greptime_mito_cache_miss", "mito cache miss", - &[TYPE_LABEL] + &[TYPE_LABEL, INDEX_TYPE_LABEL] ) .unwrap(); /// Cache size in bytes. pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!( "greptime_mito_cache_bytes", "mito cache bytes", - &[TYPE_LABEL] + &[TYPE_LABEL, INDEX_TYPE_LABEL] ) .unwrap(); /// Download bytes counter in the write cache. diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 0542fd7a59ea..6ad116dae035 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -29,7 +29,7 @@ use snafu::ResultExt; use store_api::storage::RegionId; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; -use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; +use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; use crate::error::{ ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, }; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 653679b9fca8..c2f90b293003 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -34,7 +34,7 @@ use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; use crate::cache::file_cache::FileCacheRef; -use crate::cache::index::InvertedIndexCacheRef; +use crate::cache::index::inverted_index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::inverted_index::applier::InvertedIndexApplier; diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 15cba55c4437..0076322fccbd 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -316,7 +316,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::cache::index::InvertedIndexCache; + use crate::cache::index::inverted_index::InvertedIndexCache; use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; From b99ddb6221006165cfcb44b29c53ee39c8f13401 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 23 Dec 2024 12:56:04 +0000 Subject: [PATCH 2/5] fix typo Signed-off-by: Zhenchi --- src/mito2/src/cache/index/inverted_index.rs | 4 ++-- src/mito2/src/read/scan_region.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 8bb088df789b..aa0ed5ce05a7 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -50,12 +50,12 @@ impl InvertedIndexCache { } } -/// Calculates weight for inveretd index metadata. +/// Calculates weight for inverted index metadata. fn inverted_index_metadata_weight(k: &FileId, v: &Arc) -> u32 { (k.as_bytes().len() + v.encoded_len()) as u32 } -/// Calculates weight for inveretd index content. +/// Calculates weight for inverted index content. fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 { (k.as_bytes().len() + v.len()) as u32 } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 091b9bc48c14..2dfa22f9f1c9 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -399,7 +399,7 @@ impl ScanRegion { }); } - /// Use the latest schema to build the inveretd index applier. + /// Use the latest schema to build the inverted index applier. fn build_invereted_index_applier(&self) -> Option { if self.ignore_inverted_index { return None; From 8f5aa18af367b33ec7e7a98711b2ea7cd8b60c86 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 24 Dec 2024 04:26:12 +0000 Subject: [PATCH 3/5] fix: remove added label Signed-off-by: Zhenchi --- src/mito2/src/cache/index.rs | 31 ++++++++------------- src/mito2/src/cache/index/inverted_index.rs | 9 ++---- src/mito2/src/metrics.rs | 8 ++---- 3 files changed, 17 insertions(+), 31 deletions(-) diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 37ae6143378c..7b7416c19104 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -15,6 +15,7 @@ pub mod inverted_index; use std::future::Future; +use std::hash::Hash; use std::ops::Range; use std::sync::Arc; @@ -59,7 +60,7 @@ impl PageKey { start..end } - /// Generates a vector of IndexKey instances for the pages that a given offset and size span. + /// Generates a iterator of `IndexKey` for the pages that a given offset and size span. fn generate_page_keys(offset: u64, size: u32, page_size: u64) -> impl Iterator { let start_page = Self::calculate_page_id(offset, page_size); let total_pages = Self::calculate_page_count(offset, size, page_size); @@ -78,9 +79,6 @@ pub struct IndexCache { // Page size for index content. page_size: u64, - /// Index type for telemetry. - index_type: &'static str, - /// Weighter for metadata. weight_of_metadata: fn(&K, &Arc) -> u32, /// Weighter for content. @@ -89,7 +87,7 @@ pub struct IndexCache { impl IndexCache where - K: std::hash::Hash + Eq + Send + Sync + 'static, + K: Hash + Eq + Send + Sync + 'static, M: Send + Sync + 'static, { pub fn new_with_weighter( @@ -107,7 +105,7 @@ where .eviction_listener(move |k, v, _cause| { let size = weight_of_metadata(&k, &v); CACHE_BYTES - .with_label_values(&[INDEX_METADATA_TYPE, index_type]) + .with_label_values(&[INDEX_METADATA_TYPE]) .sub(size.into()); }) .build(); @@ -117,7 +115,7 @@ where .eviction_listener(move |k, v, _cause| { let size = weight_of_content(&k, &v); CACHE_BYTES - .with_label_values(&[INDEX_CONTENT_TYPE, index_type]) + .with_label_values(&[INDEX_CONTENT_TYPE]) .sub(size.into()); }) .build(); @@ -125,7 +123,6 @@ where index_metadata, index: index_cache, page_size, - index_type, weight_of_content, weight_of_metadata, } @@ -134,7 +131,7 @@ where impl IndexCache where - K: std::hash::Hash + Eq + Clone + Copy + Send + Sync + 'static, + K: Hash + Eq + Clone + Copy + Send + Sync + 'static, M: Send + Sync + 'static, { pub fn get_index_metadata(&self, key: K) -> Option> { @@ -143,7 +140,7 @@ where pub fn put_index_metadata(&self, key: K, metadata: Arc) { CACHE_BYTES - .with_label_values(&[INDEX_METADATA_TYPE, self.index_type]) + .with_label_values(&[INDEX_METADATA_TYPE]) .add((self.weight_of_metadata)(&key, &metadata).into()); self.index_metadata.insert(key, metadata) } @@ -154,7 +151,7 @@ where pub fn put_index(&self, key: K, page_key: PageKey, value: Bytes) { CACHE_BYTES - .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) + .with_label_values(&[INDEX_CONTENT_TYPE]) .add((self.weight_of_content)(&(key, page_key), &value).into()); self.index.insert((key, page_key), value); } @@ -168,10 +165,10 @@ where offset: u64, size: u32, load: F, - ) -> std::result::Result, E> + ) -> Result, E> where F: FnOnce(Vec>) -> Fut, - Fut: Future, E>>, + Fut: Future, E>>, E: std::error::Error, { let page_keys = @@ -189,15 +186,11 @@ where for (i, page_key) in page_keys.iter().enumerate() { match self.get_index(key, *page_key) { Some(page) => { - CACHE_HIT - .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) - .inc(); + CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); data[i] = page; } None => { - CACHE_MISS - .with_label_values(&[INDEX_CONTENT_TYPE, self.index_type]) - .inc(); + CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); let base_offset = page_key.page_id * self.page_size; let pruned_size = if i == last_index { prune_size(page_keys.iter(), file_size, self.page_size) diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index aa0ed5ce05a7..5f6030ebdd1f 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -29,7 +29,6 @@ use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; use crate::sst::file::FileId; -/// Metrics for inverted index. const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index"; /// Cache for inverted index. @@ -99,16 +98,12 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead async fn metadata(&mut self) -> index::inverted_index::error::Result> { if let Some(cached) = self.cache.get_index_metadata(self.file_id) { - CACHE_HIT - .with_label_values(&[INDEX_METADATA_TYPE, INDEX_TYPE_INVERTED_INDEX]) - .inc(); + CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(cached) } else { let meta = self.inner.metadata().await?; self.cache.put_index_metadata(self.file_id, meta.clone()); - CACHE_MISS - .with_label_values(&[INDEX_METADATA_TYPE, INDEX_TYPE_INVERTED_INDEX]) - .inc(); + CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index e51062a053fb..5a5d76da4c0b 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -28,8 +28,6 @@ pub const FILE_TYPE_LABEL: &str = "file_type"; pub const WORKER_LABEL: &str = "worker"; /// Partition label. pub const PARTITION_LABEL: &str = "partition"; -/// Index type label. -pub const INDEX_TYPE_LABEL: &str = "index_type"; lazy_static! { /// Global write buffer size in bytes. @@ -197,21 +195,21 @@ lazy_static! { pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!( "greptime_mito_cache_hit", "mito cache hit", - &[TYPE_LABEL, INDEX_TYPE_LABEL] + &[TYPE_LABEL] ) .unwrap(); /// Cache miss counter. pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!( "greptime_mito_cache_miss", "mito cache miss", - &[TYPE_LABEL, INDEX_TYPE_LABEL] + &[TYPE_LABEL] ) .unwrap(); /// Cache size in bytes. pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!( "greptime_mito_cache_bytes", "mito cache bytes", - &[TYPE_LABEL, INDEX_TYPE_LABEL] + &[TYPE_LABEL] ) .unwrap(); /// Download bytes counter in the write cache. From 29853c0ca6c5942696643697fcc312f55b304564 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 24 Dec 2024 04:38:09 +0000 Subject: [PATCH 4/5] refactor: simplify cached reader impl Signed-off-by: Zhenchi --- src/index/src/inverted_index/format/reader.rs | 11 +++- src/mito2/src/cache/index/inverted_index.rs | 66 ++++--------------- 2 files changed, 22 insertions(+), 55 deletions(-) diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index c914b9007a6a..24f21504d001 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -36,7 +36,16 @@ pub trait InvertedIndexReader: Send + Sync { async fn range_read(&mut self, offset: u64, size: u32) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + let mut result = Vec::with_capacity(ranges.len()); + for range in ranges { + let data = self + .range_read(range.start, (range.end - range.start) as u32) + .await?; + result.push(Bytes::from(data)); + } + Ok(result) + } /// Retrieves metadata of all inverted indices stored within the blob. async fn metadata(&mut self) -> Result>; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 5f6030ebdd1f..6267383b6630 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -12,18 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Range; use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; use bytes::Bytes; -use common_base::BitVec; -use index::inverted_index::error::DecodeFstSnafu; +use index::inverted_index::error::Result; use index::inverted_index::format::reader::InvertedIndexReader; -use index::inverted_index::FstMap; use prost::Message; -use snafu::ResultExt; use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE}; use crate::metrics::{CACHE_HIT, CACHE_MISS}; @@ -81,38 +77,7 @@ impl CachedInvertedIndexBlobReader { #[async_trait] impl InvertedIndexReader for CachedInvertedIndexBlobReader { - async fn range_read( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result> { - self.inner.range_read(offset, size).await - } - - async fn read_vec( - &mut self, - ranges: &[Range], - ) -> index::inverted_index::error::Result> { - self.inner.read_vec(ranges).await - } - - async fn metadata(&mut self) -> index::inverted_index::error::Result> { - if let Some(cached) = self.cache.get_index_metadata(self.file_id) { - CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); - Ok(cached) - } else { - let meta = self.inner.metadata().await?; - self.cache.put_index_metadata(self.file_id, meta.clone()); - CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); - Ok(meta) - } - } - - async fn fst( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result { + async fn range_read(&mut self, offset: u64, size: u32) -> Result> { let inner = &mut self.inner; self.cache .get_or_load( @@ -123,25 +88,18 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead move |ranges| async move { inner.read_vec(&ranges).await }, ) .await - .and_then(|r| FstMap::new(r).context(DecodeFstSnafu)) } - async fn bitmap( - &mut self, - offset: u64, - size: u32, - ) -> index::inverted_index::error::Result { - let inner = &mut self.inner; - self.cache - .get_or_load( - self.file_id, - self.file_size, - offset, - size, - move |ranges| async move { inner.read_vec(&ranges).await }, - ) - .await - .map(BitVec::from_vec) + async fn metadata(&mut self) -> Result> { + if let Some(cached) = self.cache.get_index_metadata(self.file_id) { + CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(cached) + } else { + let meta = self.inner.metadata().await?; + self.cache.put_index_metadata(self.file_id, meta.clone()); + CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); + Ok(meta) + } } } From 1eb73ec1bb426082a5d414f02cf9d2666121f930 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 24 Dec 2024 04:47:36 +0000 Subject: [PATCH 5/5] rename func Signed-off-by: Zhenchi --- src/mito2/src/cache/index.rs | 30 ++++++++++----------- src/mito2/src/cache/index/inverted_index.rs | 4 +-- src/mito2/src/test_util/memtable_util.rs | 5 ++-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index 7b7416c19104..c8bd7a8f329b 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -134,28 +134,17 @@ where K: Hash + Eq + Clone + Copy + Send + Sync + 'static, M: Send + Sync + 'static, { - pub fn get_index_metadata(&self, key: K) -> Option> { + pub fn get_metadata(&self, key: K) -> Option> { self.index_metadata.get(&key) } - pub fn put_index_metadata(&self, key: K, metadata: Arc) { + pub fn put_metadata(&self, key: K, metadata: Arc) { CACHE_BYTES .with_label_values(&[INDEX_METADATA_TYPE]) .add((self.weight_of_metadata)(&key, &metadata).into()); self.index_metadata.insert(key, metadata) } - pub fn get_index(&self, key: K, page_key: PageKey) -> Option { - self.index.get(&(key, page_key)) - } - - pub fn put_index(&self, key: K, page_key: PageKey, value: Bytes) { - CACHE_BYTES - .with_label_values(&[INDEX_CONTENT_TYPE]) - .add((self.weight_of_content)(&(key, page_key), &value).into()); - self.index.insert((key, page_key), value); - } - /// Gets given range of index data from cache, and loads from source if the file /// is not already cached. async fn get_or_load( @@ -184,7 +173,7 @@ where let last_index = page_keys.len() - 1; // TODO: Avoid copy as much as possible. for (i, page_key) in page_keys.iter().enumerate() { - match self.get_index(key, *page_key) { + match self.get_page(key, *page_key) { Some(page) => { CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); data[i] = page; @@ -207,7 +196,7 @@ where for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { let page_key = page_keys[i]; data[i] = page.clone(); - self.put_index(key, page_key, page.clone()); + self.put_page(key, page_key, page.clone()); } } let buffer = Buffer::from_iter(data.into_iter()); @@ -215,6 +204,17 @@ where .slice(PageKey::calculate_range(offset, size, self.page_size)) .to_vec()) } + + fn get_page(&self, key: K, page_key: PageKey) -> Option { + self.index.get(&(key, page_key)) + } + + fn put_page(&self, key: K, page_key: PageKey, value: Bytes) { + CACHE_BYTES + .with_label_values(&[INDEX_CONTENT_TYPE]) + .add((self.weight_of_content)(&(key, page_key), &value).into()); + self.index.insert((key, page_key), value); + } } /// Prunes the size of the last page based on the indexes. diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 6267383b6630..3c399a37cc82 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -91,12 +91,12 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead } async fn metadata(&mut self) -> Result> { - if let Some(cached) = self.cache.get_index_metadata(self.file_id) { + if let Some(cached) = self.cache.get_metadata(self.file_id) { CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(cached) } else { let meta = self.inner.metadata().await?; - self.cache.put_index_metadata(self.file_id, meta.clone()); + self.cache.put_metadata(self.file_id, meta.clone()); CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc(); Ok(meta) } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 1a0eacecf823..72e32c3f0ae5 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -14,7 +14,6 @@ //! Memtable test utilities. -use std::collections::BTreeMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; @@ -34,8 +33,8 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, - MemtableRanges, MemtableRef, MemtableStats, + BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRanges, + MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField};