Skip to content

Commit

Permalink
feat: introduce Buffer for non-continuous bytes (#5164)
Browse files Browse the repository at this point in the history
* feat: introduce Buffer for non-continuous bytes

* Update src/mito2/src/cache/index.rs

Co-authored-by: Weny Xu <[email protected]>

* chore: apply review comments

* refactor: use opendal::Buffer

---------

Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
CookiePieWw and WenyXu authored Dec 18, 2024
1 parent 18e8c45 commit 7d1bcc9
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 64 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-macro.workspace = true
futures.workspace = true
paste = "1.0"
pin-project.workspace = true
rand.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion src/index/src/inverted_index/format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use common_base::BitVec;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::ResultExt;
Expand All @@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send {
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;

/// Reads the bytes in the given ranges.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>>;
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;

/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>>;
Expand Down
6 changes: 3 additions & 3 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -60,9 +61,8 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
Ok(buf.into())
}

async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>> {
let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
Ok(bufs.into_iter().map(|buf| buf.into()).collect())
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.source.read_vec(ranges).await.context(CommonIoSnafu)
}

async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
Expand Down
99 changes: 39 additions & 60 deletions src/mito2/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use std::sync::Arc;

use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait;
use bytes::Bytes;
use common_base::BitVec;
use index::inverted_index::error::DecodeFstSnafu;
use index::inverted_index::format::reader::InvertedIndexReader;
use index::inverted_index::FstMap;
use object_store::Buffer;
use prost::Message;
use snafu::ResultExt;

Expand Down Expand Up @@ -68,15 +70,14 @@ where
if keys.is_empty() {
return Ok(Vec::new());
}
// TODO: Can be replaced by an uncontinuous structure like opendal::Buffer.
let mut data = Vec::with_capacity(keys.len());
data.resize(keys.len(), Arc::new(Vec::new()));
data.resize(keys.len(), Bytes::new());
let mut cache_miss_range = vec![];
let mut cache_miss_idx = vec![];
let last_index = keys.len() - 1;
// TODO: Avoid copy as much as possible.
for (i, index) in keys.clone().into_iter().enumerate() {
match self.cache.get_index(&index) {
for (i, index) in keys.iter().enumerate() {
match self.cache.get_index(index) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
data[i] = page;
Expand All @@ -97,24 +98,19 @@ where
if !cache_miss_range.is_empty() {
let pages = self.inner.read_vec(&cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page = Arc::new(page);
let key = keys[i].clone();
data[i] = page.clone();
self.cache.put_index(key, page.clone());
}
}
let mut result = Vec::with_capacity(size as usize);
data.iter().enumerate().for_each(|(i, page)| {
let range = if i == 0 {
IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size)
} else if i == last_index {
IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
} else {
0..self.cache.page_size as usize
};
result.extend_from_slice(&page[range]);
});
Ok(result)
let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer
.slice(IndexDataPageKey::calculate_range(
offset,
size,
self.cache.page_size,
))
.to_vec())
}
}

Expand All @@ -131,7 +127,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
async fn read_vec(
&mut self,
ranges: &[Range<u64>],
) -> index::inverted_index::error::Result<Vec<Vec<u8>>> {
) -> index::inverted_index::error::Result<Vec<Bytes>> {
self.inner.read_vec(ranges).await
}

Expand Down Expand Up @@ -190,31 +186,19 @@ impl IndexDataPageKey {
(end_page + 1 - start_page) as u32
}

/// Computes the byte range in the first page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
/// Calculates the byte range for data retrieval based on the specified offset and size.
///
/// This function determines the starting and ending byte positions required for reading data.
/// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096,
/// the resulting byte range will be 904..5904. This indicates that:
/// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288).
/// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages.
fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let start = (offset % page_size) as usize;
let end = if size > page_size as u32 - start as u32 {
page_size as usize
} else {
start + size as usize
};
let end = start + size as usize;
start..end
}

/// Computes the byte range in the last page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let offset = offset as usize;
let size = size as usize;
let page_size = page_size as usize;
if (offset + size) % page_size == 0 {
0..page_size
} else {
0..((offset + size) % page_size)
}
}

/// Generates a vector of IndexKey instances for the pages that a given offset and size span.
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec<Self> {
let start_page = Self::calculate_page_id(offset, page_size);
Expand All @@ -234,7 +218,7 @@ pub struct InvertedIndexCache {
/// Cache for inverted index metadata
index_metadata: moka::sync::Cache<IndexMetadataKey, Arc<InvertedIndexMetas>>,
/// Cache for inverted index content.
index: moka::sync::Cache<IndexDataPageKey, Arc<Vec<u8>>>,
index: moka::sync::Cache<IndexDataPageKey, Bytes>,
// Page size for index content.
page_size: u64,
}
Expand Down Expand Up @@ -284,11 +268,11 @@ impl InvertedIndexCache {
self.index_metadata.insert(key, metadata)
}

pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Arc<Vec<u8>>> {
pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Bytes> {
self.index.get(key)
}

pub fn put_index(&self, key: IndexDataPageKey, value: Arc<Vec<u8>>) {
pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) {
CACHE_BYTES
.with_label_values(&[INDEX_CONTENT_TYPE])
.add(index_content_weight(&key, &value).into());
Expand All @@ -302,7 +286,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc<InvertedIndexMetas>) -> u
}

/// Calculates weight for index content.
fn index_content_weight(k: &IndexDataPageKey, v: &Arc<Vec<u8>>) -> u32 {
fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 {
(k.file_id.as_bytes().len() + v.len()) as u32
}

Expand Down Expand Up @@ -331,6 +315,9 @@ mod test {
use crate::sst::index::store::InstrumentedStore;
use crate::test_util::TestEnv;

// Repeat times for following little fuzz tests.
const FUZZ_REPEAT_TIMES: usize = 100;

// Fuzz test for index data page key
#[test]
fn fuzz_index_calculation() {
Expand All @@ -340,7 +327,7 @@ mod test {
rng.fill_bytes(&mut data);
let file_id = FileId::random();

for _ in 0..100 {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.gen_range(0..data.len() as u64);
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
let page_size: usize = rng.gen_range(1..1024);
Expand All @@ -349,32 +336,24 @@ mod test {
IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
let page_num = indexes.len();
let mut read = Vec::with_capacity(size as usize);
let last_index = indexes.len() - 1;
for (i, key) in indexes.into_iter().enumerate() {
for key in indexes.into_iter() {
let start = key.page_id as usize * page_size;
let page = if start + page_size < data.len() {
&data[start..start + page_size]
} else {
&data[start..]
};
let range = if i == 0 {
// first page range
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
} else if i == last_index {
// last page range. when the first page is the last page, the range is not used.
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
} else {
0..page_size
};
read.extend_from_slice(&page[range]);
read.extend_from_slice(page);
}
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
let read =
read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec();
if read != data.get(expected_range).unwrap() {
panic!(
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}",
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
offset, size, page_size, read.len(), size as usize,
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
IndexDataPageKey::calculate_range(offset, size, page_size as u64),
page_num
);
}
}
Expand Down Expand Up @@ -519,7 +498,7 @@ mod test {

// fuzz test
let mut rng = rand::thread_rng();
for _ in 0..100 {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.gen_range(0..file_size);
let size = rng.gen_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap();
Expand Down

0 comments on commit 7d1bcc9

Please sign in to comment.