Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce PuffinMetadataCache #5148

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};

use crate::cache::cache_size::parquet_meta_size;
Expand Down Expand Up @@ -68,6 +69,8 @@ pub struct CacheManager {
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
}
Expand Down Expand Up @@ -217,6 +220,10 @@ impl CacheManager {
pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
self.index_cache.as_ref()
}

pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
}

/// Increases selector cache miss metrics.
Expand All @@ -237,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
Expand Down Expand Up @@ -278,6 +286,12 @@ impl CacheManagerBuilder {
self
}

/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
self
}

/// Sets selector result cache size.
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
Expand Down Expand Up @@ -340,6 +354,8 @@ impl CacheManagerBuilder {
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
Expand All @@ -361,6 +377,7 @@ impl CacheManagerBuilder {
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ pub struct IndexConfig {

/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,

/// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
pub metadata_cache_size: ReadableSize,
}

impl Default for IndexConfig {
Expand All @@ -312,6 +315,7 @@ impl Default for IndexConfig {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,15 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
index_cache,
self.version.metadata.as_ref(),
self.version.metadata.inverted_indexed_column_ids(
self.version
Expand All @@ -429,6 +433,9 @@ impl ScanRegion {
),
self.access_layer.puffin_manager_factory().clone(),
)
.with_file_cache(file_cache)
.with_index_cache(index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}

pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
Expand Down
42 changes: 33 additions & 9 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -60,6 +61,9 @@ pub(crate) struct InvertedIndexApplier {

/// In-memory cache for inverted index.
inverted_index_cache: Option<InvertedIndexCacheRef>,

/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
Expand All @@ -70,8 +74,6 @@ impl InvertedIndexApplier {
region_dir: String,
region_id: RegionId,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Expand All @@ -81,13 +83,35 @@ impl InvertedIndexApplier {
region_dir,
region_id,
store,
file_cache,
file_cache: None,
index_applier,
puffin_manager_factory,
inverted_index_cache: index_cache,
inverted_index_cache: None,
puffin_metadata_cache: None,
}
}

/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}

/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.inverted_index_cache = index_cache;
self
}

/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}

/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
Expand All @@ -105,6 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}

self.remote_blob_reader(file_id).await?
}
};
Expand Down Expand Up @@ -157,7 +182,10 @@ impl InvertedIndexApplier {

/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
Expand Down Expand Up @@ -219,8 +247,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down Expand Up @@ -261,8 +287,6 @@ mod tests {
region_dir.clone(),
RegionId::new(0, 0),
object_store,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down
55 changes: 40 additions & 15 deletions src/mito2/src/sst/index/inverted_index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
Expand Down Expand Up @@ -65,31 +66,54 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {

/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,

/// Cache for puffin metadata.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

impl<'a> InvertedIndexApplierBuilder<'a> {
/// Creates a new [`InvertedIndexApplierBuilder`].
pub fn new(
region_dir: String,
object_store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
metadata: &'a RegionMetadata,
indexed_column_ids: HashSet<ColumnId>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
object_store,
file_cache,
metadata,
indexed_column_ids,
output: HashMap::default(),
index_cache,
puffin_manager_factory,
file_cache: None,
index_cache: None,
puffin_metadata_cache: None,
}
}

/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self
}

/// Sets the puffin metadata cache.
pub fn with_puffin_metadata_cache(
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self
}

/// Sets the index cache.
pub fn with_index_cache(mut self, index_cache: Option<InvertedIndexCacheRef>) -> Self {
self.index_cache = index_cache;
self
}

/// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
/// the expressions provided. If no predicates match, returns `None`.
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
Expand All @@ -108,15 +132,18 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);

Ok(Some(InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
self.file_cache,
self.index_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)))
Ok(Some(
InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)
.with_file_cache(self.file_cache)
.with_puffin_metadata_cache(self.puffin_metadata_cache)
.with_index_cache(self.index_cache),
))
}

/// Recursively traverses expressions to collect predicates.
Expand Down Expand Up @@ -322,8 +349,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down Expand Up @@ -118,8 +116,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand All @@ -144,8 +140,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down Expand Up @@ -187,8 +181,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand All @@ -214,8 +206,6 @@ mod tests {
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
None,
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
Expand Down
Loading
Loading