From fee75a1fadfda2f98a496090158e99e4b93915f4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 12 Dec 2024 19:27:22 +0800 Subject: [PATCH] feat: collect reader metrics from prune reader (#5152) --- src/mito2/src/read/last_row.rs | 14 +++++++++++++- src/mito2/src/read/prune.rs | 16 +++++++++++++--- src/mito2/src/read/scan_util.rs | 5 +++-- src/mito2/src/sst/parquet/reader.rs | 4 ++-- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 79d035e03271..1e2a6a5844c6 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -27,7 +27,7 @@ use crate::cache::{ use crate::error::Result; use crate::read::{Batch, BatchReader, BoxedBatchReader}; use crate::sst::file::FileId; -use crate::sst::parquet::reader::RowGroupReader; +use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; /// Reader to keep the last row for each time series. /// It assumes that batches from the input reader are @@ -115,6 +115,14 @@ impl RowGroupLastRowCachedReader { } } + /// Gets the underlying reader metrics if uncached. + pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> { + match self { + RowGroupLastRowCachedReader::Hit(_) => None, + RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()), + } + } + /// Creates new Hit variant and updates metrics. fn new_hit(value: Arc) -> Self { selector_result_cache_hit(); @@ -234,6 +242,10 @@ impl RowGroupLastRowReader { }); cache.put_selector_result(self.key, value); } + + fn metrics(&self) -> &ReaderMetrics { + self.reader.metrics() + } } /// Push last row into `yielded_batches`. diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index cb0066e73472..500cd1430242 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -72,11 +72,21 @@ impl PruneReader { self.source = source; } - pub(crate) fn metrics(&mut self) -> &ReaderMetrics { + /// Merge metrics with the inner reader and return the merged metrics. + pub(crate) fn metrics(&self) -> ReaderMetrics { + let mut metrics = self.metrics.clone(); match &self.source { - Source::RowGroup(r) => r.metrics(), - Source::LastRow(_) => &self.metrics, + Source::RowGroup(r) => { + metrics.merge_from(r.metrics()); + } + Source::LastRow(r) => { + if let Some(inner_metrics) = r.metrics() { + metrics.merge_from(inner_metrics); + } + } } + + metrics } pub(crate) async fn next_batch(&mut self) -> Result> { diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index df790d191a4e..0bdf62e77e03 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -181,8 +181,9 @@ pub(crate) fn scan_file_ranges( } yield batch; } - if let Source::PruneReader(mut reader) = source { - reader_metrics.merge_from(reader.metrics()); + if let Source::PruneReader(reader) = source { + let prune_metrics = reader.metrics(); + reader_metrics.merge_from(&prune_metrics); } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 02c5c2cf3cba..335b09426eca 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -918,10 +918,10 @@ enum ReaderState { impl ReaderState { /// Returns the metrics of the reader. - fn metrics(&mut self) -> &ReaderMetrics { + fn metrics(&self) -> ReaderMetrics { match self { ReaderState::Readable(reader) => reader.metrics(), - ReaderState::Exhausted(m) => m, + ReaderState::Exhausted(m) => m.clone(), } } }