From df51632b2ac2a70eceaa98363cb14adeb26420a5 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 16:04:27 +0000 Subject: [PATCH] feat(index): add file_size_hint for remote blob reader --- src/common/base/src/range_read.rs | 17 +++++++++++++++ src/mito2/src/sst/file.rs | 21 +++++++++++++++++++ .../src/sst/index/inverted_index/applier.rs | 18 +++++++++------- .../src/sst/index/inverted_index/creator.rs | 2 +- src/mito2/src/sst/index/puffin_manager.rs | 2 +- src/mito2/src/sst/index/store.rs | 21 +++++++++++++++---- src/mito2/src/sst/parquet/reader.rs | 7 +++++-- src/puffin/src/partial_reader/async.rs | 4 ++++ src/puffin/src/puffin_manager.rs | 2 +- .../fs_puffin_manager/reader.rs | 12 +++++++++-- src/puffin/src/puffin_manager/tests.rs | 2 +- 11 files changed, 89 insertions(+), 19 deletions(-) diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs index 91f865d17ef6..98a29c0b4bb8 100644 --- a/src/common/base/src/range_read.rs +++ b/src/common/base/src/range_read.rs @@ -36,6 +36,11 @@ pub struct Metadata { /// `RangeReader` reads a range of bytes from a source. #[async_trait] pub trait RangeReader: Send + Unpin { + /// Sets the file size hint for the reader. + /// + /// It's used to optimize the reading process by reducing the number of remote requests. + fn with_file_size_hint(&mut self, file_size_hint: Option); + /// Returns the metadata of the source. async fn metadata(&mut self) -> io::Result; @@ -70,6 +75,10 @@ pub trait RangeReader: Send + Unpin { #[async_trait] impl RangeReader for &mut R { + fn with_file_size_hint(&mut self, file_size_hint: Option) { + (*self).with_file_size_hint(file_size_hint) + } + async fn metadata(&mut self) -> io::Result { (*self).metadata().await } @@ -186,6 +195,10 @@ impl AsyncRead for AsyncReadAdapter { #[async_trait] impl RangeReader for Vec { + fn with_file_size_hint(&mut self, _file_size_hint: Option) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.len() as u64, @@ -222,6 +235,10 @@ impl FileReader { #[async_trait] impl RangeReader for FileReader { + fn with_file_size_hint(&mut self, _file_size_hint: Option) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.content_length, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 451ec44f1cd2..17c38caa6ece 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -146,12 +146,33 @@ pub enum IndexType { } impl FileMeta { + /// Returns true if the file has an inverted index pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) } + + /// Returns true if the file has a fulltext index pub fn fulltext_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::FulltextIndex) } + + /// Return the size of the inverted index file + pub fn inverted_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.inverted_index_available() { + Some(self.index_file_size) + } else { + None + } + } + + /// Returns the size of the fulltext index file + pub fn fulltext_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.fulltext_index_available() { + Some(self.index_file_size) + } else { + None + } + } } /// Handle to a SST file. diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index cac3ffedd74c..b796524d0e2a 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -89,7 +89,7 @@ impl InvertedIndexApplier { } /// Applies predicates to the provided SST file id and returns the relevant row group ids - pub async fn apply(&self, file_id: FileId) -> Result { + pub async fn apply(&self, file_id: FileId, file_size_hint: Option) -> Result { let _timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_INVERTED_INDEX]) .start_timer(); @@ -105,7 +105,7 @@ impl InvertedIndexApplier { if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } - self.remote_blob_reader(file_id).await? + self.remote_blob_reader(file_id, file_size_hint).await? } }; @@ -146,7 +146,7 @@ impl InvertedIndexApplier { .reader(&puffin_file_name) .await .context(PuffinBuildReaderSnafu)? - .blob(INDEX_BLOB_TYPE) + .blob(INDEX_BLOB_TYPE, None) .await .context(PuffinReadBlobSnafu)? .reader() @@ -156,14 +156,18 @@ impl InvertedIndexApplier { } /// Creates a blob reader from the remote index file. - async fn remote_blob_reader(&self, file_id: FileId) -> Result { + async fn remote_blob_reader( + &self, + file_id: FileId, + file_size_hint: Option, + ) -> Result { let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager .reader(&file_path) .await .context(PuffinBuildReaderSnafu)? - .blob(INDEX_BLOB_TYPE) + .blob(INDEX_BLOB_TYPE, file_size_hint) .await .context(PuffinReadBlobSnafu)? .reader() @@ -224,7 +228,7 @@ mod tests { Box::new(mock_index_applier), puffin_manager_factory, ); - let output = sst_index_applier.apply(file_id).await.unwrap(); + let output = sst_index_applier.apply(file_id, None).await.unwrap(); assert_eq!( output, ApplyOutput { @@ -266,7 +270,7 @@ mod tests { Box::new(mock_index_applier), puffin_manager_factory, ); - let res = sst_index_applier.apply(file_id).await; + let res = sst_index_applier.apply(file_id, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 6db1ef6e0b7b..9fcc8b3be7c2 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -461,7 +461,7 @@ mod tests { .unwrap(); Box::pin(async move { applier - .apply(sst_file_id) + .apply(sst_file_id, None) .await .unwrap() .matched_segment_ids diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 8bfa25de1e87..2075a0f5135e 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -190,7 +190,7 @@ mod tests { writer.finish().await.unwrap(); let reader = manager.reader(file_name).await.unwrap(); - let blob_guard = reader.blob(blob_key).await.unwrap(); + let blob_guard = reader.blob(blob_key, None).await.unwrap(); let mut blob_reader = blob_guard.reader().await.unwrap(); let meta = blob_reader.metadata().await.unwrap(); let bs = blob_reader.read(0..meta.content_length).await.unwrap(); diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 2750c69fc249..f634c39a78e0 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -68,6 +68,7 @@ impl InstrumentedStore { path: path.to_string(), read_byte_count, read_count, + file_size_hint: None, }) } @@ -262,15 +263,27 @@ pub(crate) struct InstrumentedRangeReader<'a> { path: String, read_byte_count: &'a IntCounter, read_count: &'a IntCounter, + file_size_hint: Option, } #[async_trait] impl RangeReader for InstrumentedRangeReader<'_> { + fn with_file_size_hint(&mut self, file_size_hint: Option) { + self.file_size_hint = file_size_hint; + } + async fn metadata(&mut self) -> io::Result { - let stat = self.store.stat(&self.path).await?; - Ok(Metadata { - content_length: stat.content_length(), - }) + match self.file_size_hint { + Some(file_size_hint) => Ok(Metadata { + content_length: file_size_hint, + }), + None => { + let stat = self.store.stat(&self.path).await?; + Ok(Metadata { + content_length: stat.content_length(), + }) + } + } } async fn read(&mut self, range: Range) -> io::Result { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b73026a7a6e3..02c5c2cf3cba 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -475,8 +475,11 @@ impl ParquetReaderBuilder { if !self.file_handle.meta_ref().inverted_index_available() { return false; } - - let apply_output = match index_applier.apply(self.file_handle.file_id()).await { + let file_size_hint = self.file_handle.meta_ref().inverted_index_size(); + let apply_output = match index_applier + .apply(self.file_handle.file_id(), file_size_hint) + .await + { Ok(output) => output, Err(err) => { if cfg!(any(test, feature = "test")) { diff --git a/src/puffin/src/partial_reader/async.rs b/src/puffin/src/partial_reader/async.rs index 3de40cb3a190..3787c64f8018 100644 --- a/src/puffin/src/partial_reader/async.rs +++ b/src/puffin/src/partial_reader/async.rs @@ -23,6 +23,10 @@ use crate::partial_reader::PartialReader; #[async_trait] impl RangeReader for PartialReader { + fn with_file_size_hint(&mut self, _file_size_hint: Option) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.size, diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 7bd5e9039d03..ce4bb739a9e7 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -81,7 +81,7 @@ pub trait PuffinReader { /// /// The returned `BlobGuard` is used to access the blob data. /// Users should hold the `BlobGuard` until they are done with the blob data. - async fn blob(&self, key: &str) -> Result; + async fn blob(&self, key: &str, file_size_hint: Option) -> Result; /// Reads a directory from the Puffin file. /// diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 3de27fdb77b0..072c46fc29e0 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -66,11 +66,12 @@ where type Blob = Either, S::Blob>; type Dir = S::Dir; - async fn blob(&self, key: &str) -> Result { - let reader = self + async fn blob(&self, key: &str, file_size_hint: Option) -> Result { + let mut reader = self .puffin_file_accessor .reader(&self.puffin_file_name) .await?; + reader.with_file_size_hint(file_size_hint); let mut file = PuffinFileReader::new(reader); // TODO(zhongzc): cache the metadata. @@ -274,6 +275,13 @@ where A: RangeReader, B: RangeReader, { + fn with_file_size_hint(&mut self, file_size_hint: Option) { + match self { + Either::L(a) => a.with_file_size_hint(file_size_hint), + Either::R(b) => b.with_file_size_hint(file_size_hint), + } + } + async fn metadata(&mut self) -> io::Result { match self { Either::L(a) => a.metadata().await, diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 37b3ce7c65af..62dc96bb3291 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -296,7 +296,7 @@ async fn check_blob( puffin_reader: &impl PuffinReader, compressed: bool, ) { - let blob = puffin_reader.blob(key).await.unwrap(); + let blob = puffin_reader.blob(key, None).await.unwrap(); let mut reader = blob.reader().await.unwrap(); let meta = reader.metadata().await.unwrap(); let bs = reader.read(0..meta.content_length).await.unwrap();