diff --git a/Cargo.lock b/Cargo.lock index 311caafcb2fe..e57a6542afbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8883,6 +8883,7 @@ dependencies = [ "lz4_flex 0.11.3", "moka", "pin-project", + "prometheus", "serde", "serde_json", "sha2", diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 7d977a328ca1..7018b039d62e 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -32,6 +32,7 @@ use moka::notification::RemovalCause; use moka::sync::Cache; use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; +use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef}; use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector}; use crate::cache::cache_size::parquet_meta_size; @@ -68,6 +69,8 @@ pub struct CacheManager { write_cache: Option, /// Cache for inverted index. index_cache: Option, + /// Puffin metadata cache. + puffin_metadata_cache: Option, /// Cache for time series selectors. selector_result_cache: Option, } @@ -217,6 +220,10 @@ impl CacheManager { pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { self.index_cache.as_ref() } + + pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { + self.puffin_metadata_cache.as_ref() + } } /// Increases selector cache miss metrics. @@ -237,6 +244,7 @@ pub struct CacheManagerBuilder { page_cache_size: u64, index_metadata_size: u64, index_content_size: u64, + puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, } @@ -278,6 +286,12 @@ impl CacheManagerBuilder { self } + /// Sets cache size for puffin metadata. + pub fn puffin_metadata_size(mut self, bytes: u64) -> Self { + self.puffin_metadata_size = bytes; + self + } + /// Sets selector result cache size. pub fn selector_result_cache_size(mut self, bytes: u64) -> Self { self.selector_result_cache_size = bytes; @@ -340,6 +354,8 @@ impl CacheManagerBuilder { }); let inverted_index_cache = InvertedIndexCache::new(self.index_metadata_size, self.index_content_size); + let puffin_metadata_cache = + PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { Cache::builder() .max_capacity(self.selector_result_cache_size) @@ -361,6 +377,7 @@ impl CacheManagerBuilder { page_cache, write_cache: self.write_cache, index_cache: Some(Arc::new(inverted_index_cache)), + puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, } } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 9b113027a41b..dda3f4271059 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -304,6 +304,9 @@ pub struct IndexConfig { /// Write buffer size for creating the index. pub write_buffer_size: ReadableSize, + + /// Cache size for metadata of puffin files. Setting it to 0 to disable the cache. + pub metadata_cache_size: ReadableSize, } impl Default for IndexConfig { @@ -312,6 +315,7 @@ impl Default for IndexConfig { aux_path: String::new(), staging_size: ReadableSize::gb(2), write_buffer_size: ReadableSize::mb(8), + metadata_cache_size: ReadableSize::mb(64), } } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 19324f119f3e..8ea51d7a435e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -413,6 +413,12 @@ impl ScanRegion { .and_then(|c| c.index_cache()) .cloned(); + let puffin_metadata_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.puffin_metadata_cache()) + .cloned(); + InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), @@ -428,6 +434,7 @@ impl ScanRegion { .iter(), ), self.access_layer.puffin_manager_factory().clone(), + puffin_metadata_cache, ) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build invereted index applier")) diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 451ec44f1cd2..4353ae55e3e9 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -149,6 +149,7 @@ impl FileMeta { pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) } + pub fn fulltext_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::FulltextIndex) } diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index cac3ffedd74c..42be7f7756b2 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -22,6 +22,7 @@ use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -60,11 +61,16 @@ pub(crate) struct InvertedIndexApplier { /// In-memory cache for inverted index. inverted_index_cache: Option, + + /// Puffin metadata cache. + puffin_metadata_cache: Option, } pub(crate) type InvertedIndexApplierRef = Arc; impl InvertedIndexApplier { + // TODO(weny): remove this after refactoring. + #[allow(clippy::too_many_arguments)] /// Creates a new `InvertedIndexApplier`. pub fn new( region_dir: String, @@ -72,6 +78,7 @@ impl InvertedIndexApplier { store: ObjectStore, file_cache: Option, index_cache: Option, + puffin_metadata_cache: Option, index_applier: Box, puffin_manager_factory: PuffinManagerFactory, ) -> Self { @@ -85,6 +92,7 @@ impl InvertedIndexApplier { index_applier, puffin_manager_factory, inverted_index_cache: index_cache, + puffin_metadata_cache, } } @@ -105,6 +113,7 @@ impl InvertedIndexApplier { if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } + self.remote_blob_reader(file_id).await? } }; @@ -157,7 +166,10 @@ impl InvertedIndexApplier { /// Creates a blob reader from the remote index file. async fn remote_blob_reader(&self, file_id: FileId) -> Result { - let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); + let puffin_manager = self + .puffin_manager_factory + .build(self.store.clone()) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager .reader(&file_path) @@ -221,6 +233,7 @@ mod tests { object_store, None, None, + None, Box::new(mock_index_applier), puffin_manager_factory, ); @@ -263,6 +276,7 @@ mod tests { object_store, None, None, + None, Box::new(mock_index_applier), puffin_manager_factory, ); diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 603cf5aa23fd..131325816fb6 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -28,6 +28,7 @@ use datatypes::value::Value; use index::inverted_index::search::index_apply::PredicatesIndexApplier; use index::inverted_index::search::predicate::Predicate; use object_store::ObjectStore; +use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadata; use store_api::storage::ColumnId; @@ -65,9 +66,14 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> { /// Cache for inverted index. index_cache: Option, + + /// Cache for puffin metadata. + puffin_metadata_cache: Option, } impl<'a> InvertedIndexApplierBuilder<'a> { + // TODO(weny): remove this after refactoring. + #[allow(clippy::too_many_arguments)] /// Creates a new [`InvertedIndexApplierBuilder`]. pub fn new( region_dir: String, @@ -77,6 +83,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> { metadata: &'a RegionMetadata, indexed_column_ids: HashSet, puffin_manager_factory: PuffinManagerFactory, + puffin_metadata_cache: Option, ) -> Self { Self { region_dir, @@ -87,6 +94,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> { output: HashMap::default(), index_cache, puffin_manager_factory, + puffin_metadata_cache, } } @@ -114,6 +122,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> { self.object_store, self.file_cache, self.index_cache, + self.puffin_metadata_cache, Box::new(applier.context(BuildIndexApplierSnafu)?), self.puffin_manager_factory, ))) @@ -327,6 +336,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let expr = Expr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index 0a196e6f1ac6..0a82577b73fc 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -80,6 +80,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let between = Between { @@ -123,6 +124,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let between = Between { @@ -149,6 +151,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let between = Between { @@ -192,6 +195,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let between = Between { @@ -219,6 +223,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let between = Between { diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index cdaec9f94e95..033b55e082e9 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -236,6 +236,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); for ((left, op, right), _) in &cases { @@ -265,6 +266,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); @@ -285,6 +287,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -320,6 +323,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let res = builder.collect_comparison_expr( diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 1d07cca48724..11ce7b69e600 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -142,6 +142,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -180,6 +181,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -209,6 +211,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); @@ -229,6 +232,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); @@ -249,6 +253,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { @@ -308,6 +313,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { @@ -346,6 +352,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index 6a520ba401d3..e9c3928f924d 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -73,6 +73,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let in_list = InList { @@ -106,6 +107,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let in_list = InList { @@ -131,6 +133,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let in_list = InList { @@ -164,6 +167,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let in_list = InList { @@ -191,6 +195,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let in_list = InList { diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs index 7fdf7f3de55c..1c5fa4b3a9d5 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs @@ -67,6 +67,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -96,6 +97,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -125,6 +127,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); builder @@ -147,6 +150,7 @@ mod tests { &metadata, HashSet::from_iter([1, 2, 3]), facotry, + None, ); let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 6db1ef6e0b7b..3ca6a8f5dc61 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -310,12 +310,14 @@ mod tests { use futures::future::BoxFuture; use object_store::services::Memory; use object_store::ObjectStore; + use puffin::puffin_manager::cache::PuffinMetadataCache; use puffin::puffin_manager::PuffinManager; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; use super::*; use crate::cache::index::InvertedIndexCache; + use crate::metrics::CACHE_BYTES; use crate::read::BatchColumn; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -447,6 +449,7 @@ mod tests { move |expr| { let _d = &d; let cache = Arc::new(InvertedIndexCache::new(10, 10)); + let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES)); let applier = InvertedIndexApplierBuilder::new( region_dir.clone(), object_store.clone(), @@ -455,6 +458,7 @@ mod tests { ®ion_metadata, indexed_column_ids.clone(), factory.clone(), + Some(puffin_metadata_cache), ) .build(&[expr]) .unwrap() diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 33d26c8196df..f8ab9c3f4edb 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -170,6 +170,7 @@ impl WorkerGroup { .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) .index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes()) .index_content_size(config.inverted_index.content_cache_size.as_bytes()) + .puffin_metadata_size(config.index.metadata_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index e4e6c74a5c9b..8c43473103b6 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -25,12 +25,13 @@ futures.workspace = true lz4_flex = "0.11" moka = { workspace = true, features = ["future", "sync"] } pin-project.workspace = true +prometheus.workspace = true serde.workspace = true serde_json.workspace = true sha2 = "0.10.8" snafu.workspace = true -tokio.workspace = true tokio-util.workspace = true +tokio.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/src/puffin/src/blob_metadata.rs b/src/puffin/src/blob_metadata.rs index bb2475bfa336..67eb62c5ff1b 100644 --- a/src/puffin/src/blob_metadata.rs +++ b/src/puffin/src/blob_metadata.rs @@ -68,6 +68,20 @@ pub struct BlobMetadata { pub properties: HashMap, } +impl BlobMetadata { + /// Calculates the memory usage of the blob metadata in bytes. + pub fn memory_usage(&self) -> usize { + self.blob_type.len() + + self.input_fields.len() * std::mem::size_of::() + + self + .properties + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() + + std::mem::size_of::() + } +} + /// Compression codec used to compress the blob #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 3736ed5d2d8d..7898ac4cf75b 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -51,6 +51,11 @@ impl PuffinFileReader { } } + pub fn with_metadata(mut self, metadata: Option) -> Self { + self.metadata = metadata; + self + } + fn validate_file_size(file_size: u64) -> Result<()> { ensure!( file_size >= MIN_FILE_SIZE, diff --git a/src/puffin/src/file_metadata.rs b/src/puffin/src/file_metadata.rs index 74eea3aa08f3..4804c65be495 100644 --- a/src/puffin/src/file_metadata.rs +++ b/src/puffin/src/file_metadata.rs @@ -33,6 +33,22 @@ pub struct FileMetadata { pub properties: HashMap, } +impl FileMetadata { + /// Calculates the memory usage of the file metadata in bytes. + pub fn memory_usage(&self) -> usize { + self.blobs + .iter() + .map(|blob| blob.memory_usage()) + .sum::() + + self + .properties + .iter() + .map(|(k, v)| k.len() + v.len()) + .sum::() + + std::mem::size_of::() + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 7bd5e9039d03..17101b1662e8 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod cache; pub mod file_accessor; pub mod fs_puffin_manager; pub mod stager; diff --git a/src/puffin/src/puffin_manager/cache.rs b/src/puffin/src/puffin_manager/cache.rs new file mode 100644 index 000000000000..66fcb36bf9c2 --- /dev/null +++ b/src/puffin/src/puffin_manager/cache.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use prometheus::IntGaugeVec; + +use crate::file_metadata::FileMetadata; +/// Metrics for index metadata. +const PUFFIN_METADATA_TYPE: &str = "puffin_metadata"; + +pub type PuffinMetadataCacheRef = Arc; + +/// A cache for storing the metadata of the index files. +pub struct PuffinMetadataCache { + cache: moka::sync::Cache>, +} + +fn puffin_metadata_weight(k: &String, v: &Arc) -> u32 { + (k.as_bytes().len() + v.memory_usage()) as u32 +} + +impl PuffinMetadataCache { + pub fn new(capacity: u64, cache_bytes: &'static IntGaugeVec) -> Self { + common_telemetry::debug!("Building PuffinMetadataCache with capacity: {capacity}"); + Self { + cache: moka::sync::CacheBuilder::new(capacity) + .name("puffin_metadata") + .weigher(puffin_metadata_weight) + .eviction_listener(|k, v, _cause| { + let size = puffin_metadata_weight(&k, &v); + cache_bytes + .with_label_values(&[PUFFIN_METADATA_TYPE]) + .sub(size.into()); + }) + .build(), + } + } + + /// Gets the metadata from the cache. + pub fn get_metadata(&self, file_id: &str) -> Option> { + self.cache.get(file_id) + } + + /// Puts the metadata into the cache. + pub fn put_metadata(&self, file_id: String, metadata: Arc) { + self.cache.insert(file_id, metadata); + } +} diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 976eb239979a..52190f92fb28 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -21,6 +21,7 @@ pub use reader::FsPuffinReader; pub use writer::FsPuffinWriter; use crate::error::Result; +use crate::puffin_manager::cache::PuffinMetadataCacheRef; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::stager::Stager; use crate::puffin_manager::PuffinManager; @@ -31,16 +32,29 @@ pub struct FsPuffinManager { stager: S, /// The puffin file accessor. puffin_file_accessor: F, + /// The puffin metadata cache. + puffin_metadata_cache: Option, } impl FsPuffinManager { - /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`. + /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`, + /// and optionally with a `puffin_metadata_cache`. pub fn new(stager: S, puffin_file_accessor: F) -> Self { Self { stager, puffin_file_accessor, + puffin_metadata_cache: None, } } + + /// Sets the puffin metadata cache. + pub fn with_puffin_metadata_cache( + mut self, + puffin_metadata_cache: Option, + ) -> Self { + self.puffin_metadata_cache = puffin_metadata_cache; + self + } } #[async_trait] @@ -57,6 +71,7 @@ where puffin_file_name.to_string(), self.stager.clone(), self.puffin_file_accessor.clone(), + self.puffin_metadata_cache.clone(), )) } diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 3de27fdb77b0..2e1ae594adc6 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -14,6 +14,7 @@ use std::io; use std::ops::Range; +use std::sync::Arc; use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; @@ -23,12 +24,14 @@ use futures::io::BufReader; use futures::{AsyncRead, AsyncWrite}; use snafu::{ensure, OptionExt, ResultExt}; +use super::PuffinMetadataCacheRef; use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::{ BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, MetadataSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; use crate::file_format::reader::{AsyncReader, PuffinFileReader}; +use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; @@ -45,14 +48,23 @@ pub struct FsPuffinReader { /// The puffin file accessor. puffin_file_accessor: F, + + /// The puffin file metadata cache. + puffin_file_metadata_cache: Option, } impl FsPuffinReader { - pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self { + pub(crate) fn new( + puffin_file_name: String, + stager: S, + puffin_file_accessor: F, + puffin_file_metadata_cache: Option, + ) -> Self { Self { puffin_file_name, stager, puffin_file_accessor, + puffin_file_metadata_cache, } } } @@ -73,13 +85,13 @@ where .await?; let mut file = PuffinFileReader::new(reader); - // TODO(zhongzc): cache the metadata. - let metadata = file.metadata().await?; + let metadata = self.get_puffin_file_metadata(&mut file).await?; let blob_metadata = metadata .blobs - .into_iter() + .iter() .find(|m| m.blob_type == key) - .context(BlobNotFoundSnafu { blob: key })?; + .context(BlobNotFoundSnafu { blob: key })? + .clone(); let blob = if blob_metadata.compression_codec.is_none() { // If the blob is not compressed, we can directly read it from the puffin file. @@ -133,6 +145,23 @@ where S: Stager, F: PuffinFileAccessor + Clone, { + async fn get_puffin_file_metadata( + &self, + reader: &mut PuffinFileReader, + ) -> Result> { + if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { + if let Some(metadata) = cache.get_metadata(&self.puffin_file_name) { + return Ok(metadata); + } + } + + let metadata = Arc::new(reader.metadata().await?); + if let Some(cache) = self.puffin_file_metadata_cache.as_ref() { + cache.put_metadata(self.puffin_file_name.to_string(), metadata.clone()); + } + Ok(metadata) + } + async fn init_blob_to_stager( reader: PuffinFileReader, blob_metadata: BlobMetadata,