diff --git a/Cargo.lock b/Cargo.lock
index 534b8c465ae6..b60615c8e54c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6643,6 +6643,7 @@ dependencies = [
"async-channel 1.9.0",
"async-stream",
"async-trait",
+ "bytemuck",
"bytes",
"common-base",
"common-config",
diff --git a/config/config.md b/config/config.md
index 6a500a5b4a34..d3353930b163 100644
--- a/config/config.md
+++ b/config/config.md
@@ -150,6 +150,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
+| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never |
@@ -475,6 +476,9 @@
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
+| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
+| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
+| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 0ba80a9f7d92..90a4d69b2e89 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -543,6 +543,15 @@ mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
+## Cache size for inverted index metadata.
+metadata_cache_size = "64MiB"
+
+## Cache size for inverted index content.
+content_cache_size = "128MiB"
+
+## Page size for inverted index content cache.
+content_cache_page_size = "8MiB"
+
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 8eae532d6166..b73246d37f0a 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -588,6 +588,9 @@ metadata_cache_size = "64MiB"
## Cache size for inverted index content.
content_cache_size = "128MiB"
+## Page size for inverted index content cache.
+content_cache_page_size = "8MiB"
+
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs
index 61f28cb629fd..53c26eeebdee 100644
--- a/src/common/base/src/range_read.rs
+++ b/src/common/base/src/range_read.rs
@@ -205,9 +205,7 @@ impl RangeReader for Vec {
})
}
- async fn read(&mut self, mut range: Range) -> io::Result {
- range.end = range.end.min(self.len() as u64);
-
+ async fn read(&mut self, range: Range) -> io::Result {
let bytes = Bytes::copy_from_slice(&self[range.start as usize..range.end as usize]);
Ok(bytes)
}
diff --git a/src/index/src/inverted_index/format/reader.rs b/src/index/src/inverted_index/format/reader.rs
index a6fb0cecbfcd..904681d5f40a 100644
--- a/src/index/src/inverted_index/format/reader.rs
+++ b/src/index/src/inverted_index/format/reader.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
@@ -30,23 +31,23 @@ mod footer;
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader: Send {
- /// Reads all data to dest.
- async fn read_all(&mut self, dest: &mut Vec) -> Result;
-
/// Seeks to given offset and reads data with exact size as provided.
- async fn seek_read(&mut self, offset: u64, size: u32) -> Result>;
+ async fn range_read(&mut self, offset: u64, size: u32) -> Result>;
+
+ /// Reads the bytes in the given ranges.
+ async fn read_vec(&mut self, ranges: &[Range]) -> Result>>;
/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result>;
/// Retrieves the finite state transducer (FST) map from the given offset and size.
async fn fst(&mut self, offset: u64, size: u32) -> Result {
- let fst_data = self.seek_read(offset, size).await?;
+ let fst_data = self.range_read(offset, size).await?;
FstMap::new(fst_data).context(DecodeFstSnafu)
}
/// Retrieves the bitmap from the given offset and size.
async fn bitmap(&mut self, offset: u64, size: u32) -> Result {
- self.seek_read(offset, size).await.map(BitVec::from_vec)
+ self.range_read(offset, size).await.map(BitVec::from_vec)
}
}
diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs
index de34cd36f849..371655d535f3 100644
--- a/src/index/src/inverted_index/format/reader/blob.rs
+++ b/src/index/src/inverted_index/format/reader/blob.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
@@ -50,16 +51,7 @@ impl InvertedIndexBlobReader {
#[async_trait]
impl InvertedIndexReader for InvertedIndexBlobReader {
- async fn read_all(&mut self, dest: &mut Vec) -> Result {
- let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
- self.source
- .read_into(0..metadata.content_length, dest)
- .await
- .context(CommonIoSnafu)?;
- Ok(metadata.content_length as usize)
- }
-
- async fn seek_read(&mut self, offset: u64, size: u32) -> Result> {
+ async fn range_read(&mut self, offset: u64, size: u32) -> Result> {
let buf = self
.source
.read(offset..offset + size as u64)
@@ -68,6 +60,11 @@ impl InvertedIndexReader for InvertedIndexBlobReader {
Ok(buf.into())
}
+ async fn read_vec(&mut self, ranges: &[Range]) -> Result>> {
+ let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
+ Ok(bufs.into_iter().map(|buf| buf.into()).collect())
+ }
+
async fn metadata(&mut self) -> Result> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;
diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml
index eedf6ae636d8..eecb79440a2e 100644
--- a/src/mito2/Cargo.toml
+++ b/src/mito2/Cargo.toml
@@ -17,6 +17,7 @@ aquamarine.workspace = true
async-channel = "1.9"
async-stream.workspace = true
async-trait = "0.1"
+bytemuck.workspace = true
bytes.workspace = true
common-base.workspace = true
common-config.workspace = true
diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs
index 7018b039d62e..03cf9136245a 100644
--- a/src/mito2/src/cache.rs
+++ b/src/mito2/src/cache.rs
@@ -244,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
+ index_content_page_size: u64,
puffin_metadata_size: u64,
write_cache: Option,
selector_result_cache_size: u64,
@@ -286,6 +287,12 @@ impl CacheManagerBuilder {
self
}
+ /// Sets page size for index content.
+ pub fn index_content_page_size(mut self, bytes: u64) -> Self {
+ self.index_content_page_size = bytes;
+ self
+ }
+
/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
@@ -352,8 +359,11 @@ impl CacheManagerBuilder {
})
.build()
});
- let inverted_index_cache =
- InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
+ let inverted_index_cache = InvertedIndexCache::new(
+ self.index_metadata_size,
+ self.index_content_size,
+ self.index_content_page_size,
+ );
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs
index 4e6e4deee260..e25fb22dcbf5 100644
--- a/src/mito2/src/cache/index.rs
+++ b/src/mito2/src/cache/index.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::ops::Range;
use std::sync::Arc;
use api::v1::index::InvertedIndexMetas;
@@ -34,14 +35,16 @@ const INDEX_CONTENT_TYPE: &str = "index_content";
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader {
file_id: FileId,
+ file_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
}
impl CachedInvertedIndexBlobReader {
- pub fn new(file_id: FileId, inner: R, cache: InvertedIndexCacheRef) -> Self {
+ pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
Self {
file_id,
+ file_size,
inner,
cache,
}
@@ -59,43 +62,77 @@ where
offset: u64,
size: u32,
) -> index::inverted_index::error::Result> {
- let range = offset as usize..(offset + size as u64) as usize;
- if let Some(cached) = self.cache.get_index(IndexKey {
- file_id: self.file_id,
- }) {
- CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
- Ok(cached[range].to_vec())
- } else {
- let mut all_data = Vec::with_capacity(1024 * 1024);
- self.inner.read_all(&mut all_data).await?;
- let result = all_data[range].to_vec();
- self.cache.put_index(
- IndexKey {
- file_id: self.file_id,
- },
- Arc::new(all_data),
- );
- CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
- Ok(result)
+ let keys =
+ IndexDataPageKey::generate_page_keys(self.file_id, offset, size, self.cache.page_size);
+ // Size is 0, return empty data.
+ if keys.is_empty() {
+ return Ok(Vec::new());
+ }
+ // TODO: Can be replaced by an uncontinuous structure like opendal::Buffer.
+ let mut data = Vec::with_capacity(keys.len());
+ data.resize(keys.len(), Arc::new(Vec::new()));
+ let mut cache_miss_range = vec![];
+ let mut cache_miss_idx = vec![];
+ let last_index = keys.len() - 1;
+ // TODO: Avoid copy as much as possible.
+ for (i, index) in keys.clone().into_iter().enumerate() {
+ match self.cache.get_index(&index) {
+ Some(page) => {
+ CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
+ data[i] = page;
+ }
+ None => {
+ CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
+ let base_offset = index.page_id * self.cache.page_size;
+ let pruned_size = if i == last_index {
+ prune_size(&keys, self.file_size, self.cache.page_size)
+ } else {
+ self.cache.page_size
+ };
+ cache_miss_range.push(base_offset..base_offset + pruned_size);
+ cache_miss_idx.push(i);
+ }
+ }
}
+ if !cache_miss_range.is_empty() {
+ let pages = self.inner.read_vec(&cache_miss_range).await?;
+ for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
+ let page = Arc::new(page);
+ let key = keys[i].clone();
+ data[i] = page.clone();
+ self.cache.put_index(key, page.clone());
+ }
+ }
+ let mut result = Vec::with_capacity(size as usize);
+ data.iter().enumerate().for_each(|(i, page)| {
+ let range = if i == 0 {
+ IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size)
+ } else if i == last_index {
+ IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
+ } else {
+ 0..self.cache.page_size as usize
+ };
+ result.extend_from_slice(&page[range]);
+ });
+ Ok(result)
}
}
#[async_trait]
impl InvertedIndexReader for CachedInvertedIndexBlobReader {
- async fn read_all(
- &mut self,
- dest: &mut Vec,
- ) -> index::inverted_index::error::Result {
- self.inner.read_all(dest).await
- }
-
- async fn seek_read(
+ async fn range_read(
&mut self,
offset: u64,
size: u32,
) -> index::inverted_index::error::Result> {
- self.inner.seek_read(offset, size).await
+ self.inner.range_read(offset, size).await
+ }
+
+ async fn read_vec(
+ &mut self,
+ ranges: &[Range],
+ ) -> index::inverted_index::error::Result>> {
+ self.inner.read_vec(ranges).await
}
async fn metadata(&mut self) -> index::inverted_index::error::Result> {
@@ -130,22 +167,81 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub struct IndexKey {
+pub struct IndexMetadataKey {
+ file_id: FileId,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct IndexDataPageKey {
file_id: FileId,
+ page_id: u64,
+}
+
+impl IndexDataPageKey {
+ /// Converts an offset to a page ID based on the page size.
+ fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
+ offset / page_size
+ }
+
+ /// Calculates the total number of pages that a given size spans, starting from a specific offset.
+ fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
+ let start_page = Self::calculate_page_id(offset, page_size);
+ let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
+ (end_page + 1 - start_page) as u32
+ }
+
+ /// Computes the byte range in the first page based on the offset and size.
+ /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
+ fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range {
+ let start = (offset % page_size) as usize;
+ let end = if size > page_size as u32 - start as u32 {
+ page_size as usize
+ } else {
+ start + size as usize
+ };
+ start..end
+ }
+
+ /// Computes the byte range in the last page based on the offset and size.
+ /// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
+ fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range {
+ let offset = offset as usize;
+ let size = size as usize;
+ let page_size = page_size as usize;
+ if (offset + size) % page_size == 0 {
+ 0..page_size
+ } else {
+ 0..((offset + size) % page_size)
+ }
+ }
+
+ /// Generates a vector of IndexKey instances for the pages that a given offset and size span.
+ fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec {
+ let start_page = Self::calculate_page_id(offset, page_size);
+ let total_pages = Self::calculate_page_count(offset, size, page_size);
+ (0..total_pages)
+ .map(|i| Self {
+ file_id,
+ page_id: start_page + i as u64,
+ })
+ .collect()
+ }
}
pub type InvertedIndexCacheRef = Arc;
pub struct InvertedIndexCache {
/// Cache for inverted index metadata
- index_metadata: moka::sync::Cache>,
+ index_metadata: moka::sync::Cache>,
/// Cache for inverted index content.
- index: moka::sync::Cache>>,
+ index: moka::sync::Cache>>,
+ // Page size for index content.
+ page_size: u64,
}
impl InvertedIndexCache {
/// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`.
- pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self {
+ pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}");
let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
.name("inverted_index_metadata")
@@ -170,29 +266,29 @@ impl InvertedIndexCache {
Self {
index_metadata,
index: index_cache,
+ page_size,
}
}
}
impl InvertedIndexCache {
pub fn get_index_metadata(&self, file_id: FileId) -> Option> {
- self.index_metadata.get(&IndexKey { file_id })
+ self.index_metadata.get(&IndexMetadataKey { file_id })
}
pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc) {
- let key = IndexKey { file_id };
+ let key = IndexMetadataKey { file_id };
CACHE_BYTES
.with_label_values(&[INDEX_METADATA_TYPE])
.add(index_metadata_weight(&key, &metadata).into());
self.index_metadata.insert(key, metadata)
}
- // todo(hl): align index file content to pages with size like 4096 bytes.
- pub fn get_index(&self, key: IndexKey) -> Option>> {
- self.index.get(&key)
+ pub fn get_index(&self, key: &IndexDataPageKey) -> Option>> {
+ self.index.get(key)
}
- pub fn put_index(&self, key: IndexKey, value: Arc>) {
+ pub fn put_index(&self, key: IndexDataPageKey, value: Arc>) {
CACHE_BYTES
.with_label_values(&[INDEX_CONTENT_TYPE])
.add(index_content_weight(&key, &value).into());
@@ -201,11 +297,234 @@ impl InvertedIndexCache {
}
/// Calculates weight for index metadata.
-fn index_metadata_weight(k: &IndexKey, v: &Arc) -> u32 {
+fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc) -> u32 {
(k.file_id.as_bytes().len() + v.encoded_len()) as u32
}
/// Calculates weight for index content.
-fn index_content_weight(k: &IndexKey, v: &Arc>) -> u32 {
+fn index_content_weight(k: &IndexDataPageKey, v: &Arc>) -> u32 {
(k.file_id.as_bytes().len() + v.len()) as u32
}
+
+/// Prunes the size of the last page based on the indexes.
+/// We have following cases:
+/// 1. The rest file size is less than the page size, read to the end of the file.
+/// 2. Otherwise, read the page size.
+fn prune_size(indexes: &[IndexDataPageKey], file_size: u64, page_size: u64) -> u64 {
+ let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
+ page_size.min(file_size - last_page_start)
+}
+
+#[cfg(test)]
+mod test {
+ use std::num::NonZeroUsize;
+
+ use common_base::BitVec;
+ use futures::stream;
+ use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
+ use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
+ use index::inverted_index::Bytes;
+ use prometheus::register_int_counter_vec;
+ use rand::{Rng, RngCore};
+
+ use super::*;
+ use crate::sst::index::store::InstrumentedStore;
+ use crate::test_util::TestEnv;
+
+ // Fuzz test for index data page key
+ #[test]
+ fn fuzz_index_calculation() {
+ // randomly generate a large u8 array
+ let mut rng = rand::thread_rng();
+ let mut data = vec![0u8; 1024 * 1024];
+ rng.fill_bytes(&mut data);
+ let file_id = FileId::random();
+
+ for _ in 0..100 {
+ let offset = rng.gen_range(0..data.len() as u64);
+ let size = rng.gen_range(0..data.len() as u32 - offset as u32);
+ let page_size: usize = rng.gen_range(1..1024);
+
+ let indexes =
+ IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
+ let page_num = indexes.len();
+ let mut read = Vec::with_capacity(size as usize);
+ let last_index = indexes.len() - 1;
+ for (i, key) in indexes.into_iter().enumerate() {
+ let start = key.page_id as usize * page_size;
+ let page = if start + page_size < data.len() {
+ &data[start..start + page_size]
+ } else {
+ &data[start..]
+ };
+ let range = if i == 0 {
+ // first page range
+ IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
+ } else if i == last_index {
+ // last page range. when the first page is the last page, the range is not used.
+ IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
+ } else {
+ 0..page_size
+ };
+ read.extend_from_slice(&page[range]);
+ }
+ let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
+ if read != data.get(expected_range).unwrap() {
+ panic!(
+ "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}",
+ offset, size, page_size, read.len(), size as usize,
+ IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
+ IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
+ );
+ }
+ }
+ }
+
+ fn unpack(fst_value: u64) -> [u32; 2] {
+ bytemuck::cast::(fst_value)
+ }
+
+ async fn create_inverted_index_blob() -> Vec {
+ let mut blob = Vec::new();
+ let mut writer = InvertedIndexBlobWriter::new(&mut blob);
+ writer
+ .add_index(
+ "tag0".to_string(),
+ BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
+ Box::new(stream::iter(vec![
+ Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
+ Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
+ Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
+ ])),
+ )
+ .await
+ .unwrap();
+ writer
+ .add_index(
+ "tag1".to_string(),
+ BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
+ Box::new(stream::iter(vec![
+ Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
+ Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
+ Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
+ ])),
+ )
+ .await
+ .unwrap();
+ writer
+ .finish(8, NonZeroUsize::new(1).unwrap())
+ .await
+ .unwrap();
+
+ blob
+ }
+
+ #[tokio::test]
+ async fn test_inverted_index_cache() {
+ let blob = create_inverted_index_blob().await;
+
+ // Init a test range reader in local fs.
+ let mut env = TestEnv::new();
+ let file_size = blob.len() as u64;
+ let store = env.init_object_store_manager();
+ let temp_path = "data";
+ store.write(temp_path, blob).await.unwrap();
+ let store = InstrumentedStore::new(store);
+ let metric =
+ register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
+ let counter = metric.with_label_values(&["test"]);
+ let range_reader = store
+ .range_reader("data", &counter, &counter)
+ .await
+ .unwrap();
+
+ let reader = InvertedIndexBlobReader::new(range_reader);
+ let mut cached_reader = CachedInvertedIndexBlobReader::new(
+ FileId::random(),
+ file_size,
+ reader,
+ Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
+ );
+ let metadata = cached_reader.metadata().await.unwrap();
+ assert_eq!(metadata.total_row_count, 8);
+ assert_eq!(metadata.segment_row_count, 1);
+ assert_eq!(metadata.metas.len(), 2);
+ // tag0
+ let tag0 = metadata.metas.get("tag0").unwrap();
+ let stats0 = tag0.stats.as_ref().unwrap();
+ assert_eq!(stats0.distinct_count, 3);
+ assert_eq!(stats0.null_count, 1);
+ assert_eq!(stats0.min_value, Bytes::from("a"));
+ assert_eq!(stats0.max_value, Bytes::from("c"));
+ let fst0 = cached_reader
+ .fst(
+ tag0.base_offset + tag0.relative_fst_offset as u64,
+ tag0.fst_size,
+ )
+ .await
+ .unwrap();
+ assert_eq!(fst0.len(), 3);
+ let [offset, size] = unpack(fst0.get(b"a").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag0.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
+ let [offset, size] = unpack(fst0.get(b"b").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag0.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
+ let [offset, size] = unpack(fst0.get(b"c").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag0.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
+
+ // tag1
+ let tag1 = metadata.metas.get("tag1").unwrap();
+ let stats1 = tag1.stats.as_ref().unwrap();
+ assert_eq!(stats1.distinct_count, 3);
+ assert_eq!(stats1.null_count, 1);
+ assert_eq!(stats1.min_value, Bytes::from("x"));
+ assert_eq!(stats1.max_value, Bytes::from("z"));
+ let fst1 = cached_reader
+ .fst(
+ tag1.base_offset + tag1.relative_fst_offset as u64,
+ tag1.fst_size,
+ )
+ .await
+ .unwrap();
+ assert_eq!(fst1.len(), 3);
+ let [offset, size] = unpack(fst1.get(b"x").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag1.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
+ let [offset, size] = unpack(fst1.get(b"y").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag1.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
+ let [offset, size] = unpack(fst1.get(b"z").unwrap());
+ let bitmap = cached_reader
+ .bitmap(tag1.base_offset + offset as u64, size)
+ .await
+ .unwrap();
+ assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
+
+ // fuzz test
+ let mut rng = rand::thread_rng();
+ for _ in 0..100 {
+ let offset = rng.gen_range(0..file_size);
+ let size = rng.gen_range(0..file_size as u32 - offset as u32);
+ let expected = cached_reader.range_read(offset, size).await.unwrap();
+ let read = cached_reader.get_or_load(offset, size).await.unwrap();
+ assert_eq!(read, expected);
+ }
+ }
+}
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index dda3f4271059..963089c60aed 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -416,6 +416,8 @@ pub struct InvertedIndexConfig {
pub metadata_cache_size: ReadableSize,
/// Cache size for inverted index content. Setting it to 0 to disable the cache.
pub content_cache_size: ReadableSize,
+ /// Page size for inverted index content.
+ pub content_cache_page_size: ReadableSize,
}
impl InvertedIndexConfig {
@@ -441,6 +443,7 @@ impl Default for InvertedIndexConfig {
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
+ content_cache_page_size: ReadableSize::mb(8),
};
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs
index d5e47d213657..f6d1dbafeec9 100644
--- a/src/mito2/src/error.rs
+++ b/src/mito2/src/error.rs
@@ -893,6 +893,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display("Failed to read file metadata"))]
+ Metadata {
+ #[snafu(source)]
+ error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
@@ -965,7 +973,8 @@ impl ErrorExt for Error {
| CreateDir { .. }
| ReadDataPart { .. }
| CorruptedEntry { .. }
- | BuildEntry { .. } => StatusCode::Internal,
+ | BuildEntry { .. }
+ | Metadata { .. } => StatusCode::Internal,
OpenRegion { source, .. } => source.status_code(),
diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs
index a4f4ab9e446b..1972f3d7abb6 100644
--- a/src/mito2/src/sst/index.rs
+++ b/src/mito2/src/sst/index.rs
@@ -18,7 +18,7 @@ pub(crate) mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
mod statistics;
-mod store;
+pub(crate) mod store;
use std::num::NonZeroUsize;
diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs
index d060d4bec17b..0542fd7a59ea 100644
--- a/src/mito2/src/sst/index/inverted_index/applier.rs
+++ b/src/mito2/src/sst/index/inverted_index/applier.rs
@@ -16,6 +16,7 @@ pub mod builder;
use std::sync::Arc;
+use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
@@ -29,7 +30,9 @@ use store_api::storage::RegionId;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
-use crate::error::{ApplyInvertedIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
+use crate::error::{
+ ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
+};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
@@ -123,7 +126,7 @@ impl InvertedIndexApplier {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
- let blob = match self.cached_blob_reader(file_id).await {
+ let mut blob = match self.cached_blob_reader(file_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
@@ -134,8 +137,14 @@ impl InvertedIndexApplier {
};
if let Some(index_cache) = &self.inverted_index_cache {
+ let file_size = if let Some(file_size) = file_size_hint {
+ file_size
+ } else {
+ blob.metadata().await.context(MetadataSnafu)?.content_length
+ };
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id,
+ file_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);
diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs
index 43cf54fa2811..15cba55c4437 100644
--- a/src/mito2/src/sst/index/inverted_index/creator.rs
+++ b/src/mito2/src/sst/index/inverted_index/creator.rs
@@ -448,7 +448,7 @@ mod tests {
move |expr| {
let _d = &d;
- let cache = Arc::new(InvertedIndexCache::new(10, 10));
+ let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),
diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs
index dec175e76ff6..314e886ba9ca 100644
--- a/src/mito2/src/test_util.rs
+++ b/src/mito2/src/test_util.rs
@@ -35,8 +35,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
-use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache};
-use common_meta::key::schema_name::{SchemaName, SchemaNameValue};
+use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -49,7 +48,7 @@ use datatypes::schema::ColumnSchema;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
-use moka::future::{Cache, CacheBuilder};
+use moka::future::CacheBuilder;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index f8ab9c3f4edb..233ab9f056b1 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -170,6 +170,7 @@ impl WorkerGroup {
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
+ .index_content_page_size(config.inverted_index.content_cache_page_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index ab2ec4ea6777..4843b81e9142 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -946,6 +946,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
+content_cache_page_size = "8MiB"
[region_engine.mito.fulltext_index]
create_on_flush = "auto"