Skip to content

Commit

Permalink
feat: introduce PuffinMetadataCache
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 11, 2024
1 parent a801214 commit 590adb0
Show file tree
Hide file tree
Showing 22 changed files with 233 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};

use crate::cache::cache_size::parquet_meta_size;
Expand Down Expand Up @@ -68,6 +69,8 @@ pub struct CacheManager {
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,
/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
}
Expand Down Expand Up @@ -217,6 +220,10 @@ impl CacheManager {
pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
self.index_cache.as_ref()
}

pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
}

/// Increases selector cache miss metrics.
Expand All @@ -237,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
Expand Down Expand Up @@ -278,6 +286,12 @@ impl CacheManagerBuilder {
self
}

/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
self
}

/// Sets selector result cache size.
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
Expand Down Expand Up @@ -340,6 +354,8 @@ impl CacheManagerBuilder {
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
Expand All @@ -361,6 +377,7 @@ impl CacheManagerBuilder {
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ pub struct IndexConfig {

/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,

/// Cache size for metadata of puffin files. Setting it to 0 to disable the cache.
pub metadata_cache_size: ReadableSize,
}

impl Default for IndexConfig {
Expand All @@ -312,6 +315,7 @@ impl Default for IndexConfig {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();

let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
Expand All @@ -428,6 +434,7 @@ impl ScanRegion {
.iter(),
),
self.access_layer.puffin_manager_factory().clone(),
puffin_metadata_cache,
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}

pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
Expand Down
16 changes: 15 additions & 1 deletion src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
Expand Down Expand Up @@ -60,18 +61,24 @@ pub(crate) struct InvertedIndexApplier {

/// In-memory cache for inverted index.
inverted_index_cache: Option<InvertedIndexCacheRef>,

/// Puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;

impl InvertedIndexApplier {
// TODO(weny): remove this after refactoring.
#[allow(clippy::too_many_arguments)]
/// Creates a new `InvertedIndexApplier`.
pub fn new(
region_dir: String,
region_id: RegionId,
store: ObjectStore,
file_cache: Option<FileCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
index_applier: Box<dyn IndexApplier>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Expand All @@ -85,6 +92,7 @@ impl InvertedIndexApplier {
index_applier,
puffin_manager_factory,
inverted_index_cache: index_cache,
puffin_metadata_cache,
}
}

Expand All @@ -105,6 +113,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}

self.remote_blob_reader(file_id).await?
}
};
Expand Down Expand Up @@ -157,7 +166,10 @@ impl InvertedIndexApplier {

/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
Expand Down Expand Up @@ -221,6 +233,7 @@ mod tests {
object_store,
None,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down Expand Up @@ -263,6 +276,7 @@ mod tests {
object_store,
None,
None,
None,
Box::new(mock_index_applier),
puffin_manager_factory,
);
Expand Down
10 changes: 10 additions & 0 deletions src/mito2/src/sst/index/inverted_index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datatypes::value::Value;
use index::inverted_index::search::index_apply::PredicatesIndexApplier;
use index::inverted_index::search::predicate::Predicate;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::ColumnId;
Expand Down Expand Up @@ -65,9 +66,14 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {

/// Cache for inverted index.
index_cache: Option<InvertedIndexCacheRef>,

/// Cache for puffin metadata.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}

impl<'a> InvertedIndexApplierBuilder<'a> {
// TODO(weny): remove this after refactoring.
#[allow(clippy::too_many_arguments)]
/// Creates a new [`InvertedIndexApplierBuilder`].
pub fn new(
region_dir: String,
Expand All @@ -77,6 +83,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
metadata: &'a RegionMetadata,
indexed_column_ids: HashSet<ColumnId>,
puffin_manager_factory: PuffinManagerFactory,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
Self {
region_dir,
Expand All @@ -87,6 +94,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
output: HashMap::default(),
index_cache,
puffin_manager_factory,
puffin_metadata_cache,
}
}

Expand Down Expand Up @@ -114,6 +122,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
self.object_store,
self.file_cache,
self.index_cache,
self.puffin_metadata_cache,
Box::new(applier.context(BuildIndexApplierSnafu)?),
self.puffin_manager_factory,
)))
Expand Down Expand Up @@ -327,6 +336,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let expr = Expr::BinaryExpr(BinaryExpr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let between = Between {
Expand Down Expand Up @@ -123,6 +124,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let between = Between {
Expand All @@ -149,6 +151,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let between = Between {
Expand Down Expand Up @@ -192,6 +195,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let between = Between {
Expand Down Expand Up @@ -219,6 +223,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let between = Between {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

for ((left, op, right), _) in &cases {
Expand Down Expand Up @@ -265,6 +266,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10));
Expand All @@ -285,6 +287,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

builder
Expand Down Expand Up @@ -320,6 +323,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let res = builder.collect_comparison_expr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

builder
Expand Down Expand Up @@ -180,6 +181,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

builder
Expand Down Expand Up @@ -209,6 +211,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
Expand All @@ -229,6 +232,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let res = builder.collect_eq(&tag_column(), &int64_lit(1));
Expand All @@ -249,6 +253,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
Expand Down Expand Up @@ -308,6 +313,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
Expand Down Expand Up @@ -346,6 +352,7 @@ mod tests {
&metadata,
HashSet::from_iter([1, 2, 3]),
facotry,
None,
);

let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
Expand Down
Loading

0 comments on commit 590adb0

Please sign in to comment.