diff --git a/Cargo.lock b/Cargo.lock index b60615c8e54c..a5c9386dc735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,6 +1918,7 @@ dependencies = [ "futures", "paste", "pin-project", + "rand", "serde", "snafu 0.8.5", "tokio", diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 465599974dae..2d35ad5d31ad 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -17,6 +17,7 @@ common-macro.workspace = true futures.workspace = true paste = "1.0" pin-project.workspace = true +rand.workspace = true serde = { version = "1.0", features = ["derive"] } snafu.workspace = true tokio.workspace = true diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs index 904681d5f40a..21e5487d1e42 100644 --- a/src/index/src/inverted_index/format/reader.rs +++ b/src/index/src/inverted_index/format/reader.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::BitVec; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::ResultExt; @@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send { async fn range_read(&mut self, offset: u64, size: u32) -> Result>; /// Reads the bytes in the given ranges. - async fn read_vec(&mut self, ranges: &[Range]) -> Result>>; + async fn read_vec(&mut self, ranges: &[Range]) -> Result>; /// Retrieves metadata of all inverted indices stored within the blob. async fn metadata(&mut self) -> Result>; diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 371655d535f3..fcaa63773d93 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -16,6 +16,7 @@ use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use common_base::range_read::RangeReader; use greptime_proto::v1::index::InvertedIndexMetas; use snafu::{ensure, ResultExt}; @@ -60,9 +61,8 @@ impl InvertedIndexReader for InvertedIndexBlobReader { Ok(buf.into()) } - async fn read_vec(&mut self, ranges: &[Range]) -> Result>> { - let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?; - Ok(bufs.into_iter().map(|buf| buf.into()).collect()) + async fn read_vec(&mut self, ranges: &[Range]) -> Result> { + self.source.read_vec(ranges).await.context(CommonIoSnafu) } async fn metadata(&mut self) -> Result> { diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index e25fb22dcbf5..de39ea3784b6 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; +use bytes::Bytes; use common_base::BitVec; use index::inverted_index::error::DecodeFstSnafu; use index::inverted_index::format::reader::InvertedIndexReader; use index::inverted_index::FstMap; +use object_store::Buffer; use prost::Message; use snafu::ResultExt; @@ -68,15 +70,14 @@ where if keys.is_empty() { return Ok(Vec::new()); } - // TODO: Can be replaced by an uncontinuous structure like opendal::Buffer. let mut data = Vec::with_capacity(keys.len()); - data.resize(keys.len(), Arc::new(Vec::new())); + data.resize(keys.len(), Bytes::new()); let mut cache_miss_range = vec![]; let mut cache_miss_idx = vec![]; let last_index = keys.len() - 1; // TODO: Avoid copy as much as possible. - for (i, index) in keys.clone().into_iter().enumerate() { - match self.cache.get_index(&index) { + for (i, index) in keys.iter().enumerate() { + match self.cache.get_index(index) { Some(page) => { CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc(); data[i] = page; @@ -97,24 +98,19 @@ where if !cache_miss_range.is_empty() { let pages = self.inner.read_vec(&cache_miss_range).await?; for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) { - let page = Arc::new(page); let key = keys[i].clone(); data[i] = page.clone(); self.cache.put_index(key, page.clone()); } } - let mut result = Vec::with_capacity(size as usize); - data.iter().enumerate().for_each(|(i, page)| { - let range = if i == 0 { - IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size) - } else if i == last_index { - IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size) - } else { - 0..self.cache.page_size as usize - }; - result.extend_from_slice(&page[range]); - }); - Ok(result) + let buffer = Buffer::from_iter(data.into_iter()); + Ok(buffer + .slice(IndexDataPageKey::calculate_range( + offset, + size, + self.cache.page_size, + )) + .to_vec()) } } @@ -131,7 +127,7 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead async fn read_vec( &mut self, ranges: &[Range], - ) -> index::inverted_index::error::Result>> { + ) -> index::inverted_index::error::Result> { self.inner.read_vec(ranges).await } @@ -190,31 +186,19 @@ impl IndexDataPageKey { (end_page + 1 - start_page) as u32 } - /// Computes the byte range in the first page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096. - fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range { + /// Calculates the byte range for data retrieval based on the specified offset and size. + /// + /// This function determines the starting and ending byte positions required for reading data. + /// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096, + /// the resulting byte range will be 904..5904. This indicates that: + /// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288). + /// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages. + fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range { let start = (offset % page_size) as usize; - let end = if size > page_size as u32 - start as u32 { - page_size as usize - } else { - start + size as usize - }; + let end = start + size as usize; start..end } - /// Computes the byte range in the last page based on the offset and size. - /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904. - fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range { - let offset = offset as usize; - let size = size as usize; - let page_size = page_size as usize; - if (offset + size) % page_size == 0 { - 0..page_size - } else { - 0..((offset + size) % page_size) - } - } - /// Generates a vector of IndexKey instances for the pages that a given offset and size span. fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec { let start_page = Self::calculate_page_id(offset, page_size); @@ -234,7 +218,7 @@ pub struct InvertedIndexCache { /// Cache for inverted index metadata index_metadata: moka::sync::Cache>, /// Cache for inverted index content. - index: moka::sync::Cache>>, + index: moka::sync::Cache, // Page size for index content. page_size: u64, } @@ -284,11 +268,11 @@ impl InvertedIndexCache { self.index_metadata.insert(key, metadata) } - pub fn get_index(&self, key: &IndexDataPageKey) -> Option>> { + pub fn get_index(&self, key: &IndexDataPageKey) -> Option { self.index.get(key) } - pub fn put_index(&self, key: IndexDataPageKey, value: Arc>) { + pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) { CACHE_BYTES .with_label_values(&[INDEX_CONTENT_TYPE]) .add(index_content_weight(&key, &value).into()); @@ -302,7 +286,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc) -> u } /// Calculates weight for index content. -fn index_content_weight(k: &IndexDataPageKey, v: &Arc>) -> u32 { +fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 { (k.file_id.as_bytes().len() + v.len()) as u32 } @@ -331,6 +315,9 @@ mod test { use crate::sst::index::store::InstrumentedStore; use crate::test_util::TestEnv; + // Repeat times for following little fuzz tests. + const FUZZ_REPEAT_TIMES: usize = 100; + // Fuzz test for index data page key #[test] fn fuzz_index_calculation() { @@ -340,7 +327,7 @@ mod test { rng.fill_bytes(&mut data); let file_id = FileId::random(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..data.len() as u64); let size = rng.gen_range(0..data.len() as u32 - offset as u32); let page_size: usize = rng.gen_range(1..1024); @@ -349,32 +336,24 @@ mod test { IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64); let page_num = indexes.len(); let mut read = Vec::with_capacity(size as usize); - let last_index = indexes.len() - 1; - for (i, key) in indexes.into_iter().enumerate() { + for key in indexes.into_iter() { let start = key.page_id as usize * page_size; let page = if start + page_size < data.len() { &data[start..start + page_size] } else { &data[start..] }; - let range = if i == 0 { - // first page range - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64) - } else if i == last_index { - // last page range. when the first page is the last page, the range is not used. - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64) - } else { - 0..page_size - }; - read.extend_from_slice(&page[range]); + read.extend_from_slice(page); } let expected_range = offset as usize..(offset + size as u64 as u64) as usize; + let read = + read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec(); if read != data.get(expected_range).unwrap() { panic!( - "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}", + "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}", offset, size, page_size, read.len(), size as usize, - IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64), - IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num + IndexDataPageKey::calculate_range(offset, size, page_size as u64), + page_num ); } } @@ -519,7 +498,7 @@ mod test { // fuzz test let mut rng = rand::thread_rng(); - for _ in 0..100 { + for _ in 0..FUZZ_REPEAT_TIMES { let offset = rng.gen_range(0..file_size); let size = rng.gen_range(0..file_size as u32 - offset as u32); let expected = cached_reader.range_read(offset, size).await.unwrap();