Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce Buffer for non-continuous bytes #5164

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-macro.workspace = true
futures.workspace = true
paste = "1.0"
pin-project.workspace = true
rand.workspace = true
serde = { version = "1.0", features = ["derive"] }
snafu.workspace = true
tokio.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion src/index/src/inverted_index/format/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use common_base::BitVec;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::ResultExt;
Expand All @@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send {
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;

/// Reads the bytes in the given ranges.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>>;
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;

/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>>;
Expand Down
6 changes: 3 additions & 3 deletions src/index/src/inverted_index/format/reader/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::Range;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use common_base::range_read::RangeReader;
use greptime_proto::v1::index::InvertedIndexMetas;
use snafu::{ensure, ResultExt};
Expand Down Expand Up @@ -60,9 +61,8 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
Ok(buf.into())
}

async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>> {
let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
Ok(bufs.into_iter().map(|buf| buf.into()).collect())
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.source.read_vec(ranges).await.context(CommonIoSnafu)
}

async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
Expand Down
99 changes: 39 additions & 60 deletions src/mito2/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use std::sync::Arc;

use api::v1::index::InvertedIndexMetas;
use async_trait::async_trait;
use bytes::Bytes;
use common_base::BitVec;
use index::inverted_index::error::DecodeFstSnafu;
use index::inverted_index::format::reader::InvertedIndexReader;
use index::inverted_index::FstMap;
use object_store::Buffer;
use prost::Message;
use snafu::ResultExt;

Expand Down Expand Up @@ -68,15 +70,14 @@ where
if keys.is_empty() {
return Ok(Vec::new());
}
// TODO: Can be replaced by an uncontinuous structure like opendal::Buffer.
let mut data = Vec::with_capacity(keys.len());
data.resize(keys.len(), Arc::new(Vec::new()));
data.resize(keys.len(), Bytes::new());
let mut cache_miss_range = vec![];
let mut cache_miss_idx = vec![];
let last_index = keys.len() - 1;
// TODO: Avoid copy as much as possible.
for (i, index) in keys.clone().into_iter().enumerate() {
match self.cache.get_index(&index) {
for (i, index) in keys.iter().enumerate() {
match self.cache.get_index(index) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
data[i] = page;
Expand All @@ -97,24 +98,19 @@ where
if !cache_miss_range.is_empty() {
let pages = self.inner.read_vec(&cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page = Arc::new(page);
let key = keys[i].clone();
data[i] = page.clone();
self.cache.put_index(key, page.clone());
}
}
let mut result = Vec::with_capacity(size as usize);
data.iter().enumerate().for_each(|(i, page)| {
let range = if i == 0 {
IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size)
} else if i == last_index {
IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
} else {
0..self.cache.page_size as usize
};
result.extend_from_slice(&page[range]);
});
Ok(result)
let buffer = Buffer::from_iter(data.into_iter());
Ok(buffer
.slice(IndexDataPageKey::calculate_range(
offset,
size,
self.cache.page_size,
))
.to_vec())
}
}

Expand All @@ -131,7 +127,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
async fn read_vec(
&mut self,
ranges: &[Range<u64>],
) -> index::inverted_index::error::Result<Vec<Vec<u8>>> {
) -> index::inverted_index::error::Result<Vec<Bytes>> {
self.inner.read_vec(ranges).await
}

Expand Down Expand Up @@ -190,31 +186,19 @@ impl IndexDataPageKey {
(end_page + 1 - start_page) as u32
}

/// Computes the byte range in the first page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
/// Calculates the byte range for data retrieval based on the specified offset and size.
///
/// This function determines the starting and ending byte positions required for reading data.
/// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096,
/// the resulting byte range will be 904..5904. This indicates that:
/// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288).
/// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages.
fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let start = (offset % page_size) as usize;
let end = if size > page_size as u32 - start as u32 {
page_size as usize
} else {
start + size as usize
};
let end = start + size as usize;
start..end
}

/// Computes the byte range in the last page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let offset = offset as usize;
let size = size as usize;
let page_size = page_size as usize;
if (offset + size) % page_size == 0 {
0..page_size
} else {
0..((offset + size) % page_size)
}
}

/// Generates a vector of IndexKey instances for the pages that a given offset and size span.
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec<Self> {
let start_page = Self::calculate_page_id(offset, page_size);
Expand All @@ -234,7 +218,7 @@ pub struct InvertedIndexCache {
/// Cache for inverted index metadata
index_metadata: moka::sync::Cache<IndexMetadataKey, Arc<InvertedIndexMetas>>,
/// Cache for inverted index content.
index: moka::sync::Cache<IndexDataPageKey, Arc<Vec<u8>>>,
index: moka::sync::Cache<IndexDataPageKey, Bytes>,
// Page size for index content.
page_size: u64,
}
Expand Down Expand Up @@ -284,11 +268,11 @@ impl InvertedIndexCache {
self.index_metadata.insert(key, metadata)
}

pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Arc<Vec<u8>>> {
pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Bytes> {
self.index.get(key)
}

pub fn put_index(&self, key: IndexDataPageKey, value: Arc<Vec<u8>>) {
pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) {
CACHE_BYTES
.with_label_values(&[INDEX_CONTENT_TYPE])
.add(index_content_weight(&key, &value).into());
Expand All @@ -302,7 +286,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc<InvertedIndexMetas>) -> u
}

/// Calculates weight for index content.
fn index_content_weight(k: &IndexDataPageKey, v: &Arc<Vec<u8>>) -> u32 {
fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 {
(k.file_id.as_bytes().len() + v.len()) as u32
}

Expand Down Expand Up @@ -331,6 +315,9 @@ mod test {
use crate::sst::index::store::InstrumentedStore;
use crate::test_util::TestEnv;

// Repeat times for following little fuzz tests.
const FUZZ_REPEAT_TIMES: usize = 100;

// Fuzz test for index data page key
#[test]
fn fuzz_index_calculation() {
Expand All @@ -340,7 +327,7 @@ mod test {
rng.fill_bytes(&mut data);
let file_id = FileId::random();

for _ in 0..100 {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.gen_range(0..data.len() as u64);
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
let page_size: usize = rng.gen_range(1..1024);
Expand All @@ -349,32 +336,24 @@ mod test {
IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
let page_num = indexes.len();
let mut read = Vec::with_capacity(size as usize);
let last_index = indexes.len() - 1;
for (i, key) in indexes.into_iter().enumerate() {
for key in indexes.into_iter() {
let start = key.page_id as usize * page_size;
let page = if start + page_size < data.len() {
&data[start..start + page_size]
} else {
&data[start..]
};
let range = if i == 0 {
// first page range
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
} else if i == last_index {
// last page range. when the first page is the last page, the range is not used.
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
} else {
0..page_size
};
read.extend_from_slice(&page[range]);
read.extend_from_slice(page);
}
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
let read =
read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec();
if read != data.get(expected_range).unwrap() {
panic!(
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}",
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
offset, size, page_size, read.len(), size as usize,
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
IndexDataPageKey::calculate_range(offset, size, page_size as u64),
page_num
);
}
}
Expand Down Expand Up @@ -519,7 +498,7 @@ mod test {

// fuzz test
let mut rng = rand::thread_rng();
for _ in 0..100 {
for _ in 0..FUZZ_REPEAT_TIMES {
let offset = rng.gen_range(0..file_size);
let size = rng.gen_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap();
Expand Down
Loading