From d0e4b97ae88a878af9a992d1dc4fb12b517fad28 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 19 Aug 2024 22:10:59 +0800 Subject: [PATCH] refactor: reuse ReaderFilterMetrics --- src/mito2/src/read.rs | 37 ++------- src/mito2/src/read/prune.rs | 4 +- src/mito2/src/sst/parquet/reader.rs | 113 +++++++++++++++++----------- 3 files changed, 76 insertions(+), 78 deletions(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 6f6db6554f29..edde28dab80c 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -56,7 +56,7 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; -use crate::sst::parquet::reader::ReaderMetrics; +use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -753,22 +753,8 @@ pub(crate) struct ScannerMetrics { num_batches: usize, /// Number of rows returned. num_rows: usize, - /// Number of row groups before filtering. - num_row_groups_before_filtering: usize, - /// Number of row groups filtered by fulltext index. - num_row_groups_fulltext_index_filtered: usize, - /// Number of row groups filtered by inverted index. - num_row_groups_inverted_index_filtered: usize, - /// Number of row groups filtered by min-max index. - num_row_groups_min_max_filtered: usize, - /// Number of rows filtered by precise filter. - num_rows_precise_filtered: usize, - /// Number of rows in row group before filtering. - num_rows_in_row_group_before_filtering: usize, - /// Number of rows in row group filtered by fulltext index. - num_rows_in_row_group_fulltext_index_filtered: usize, - /// Number of rows in row group filtered by inverted index. - num_rows_in_row_group_inverted_index_filtered: usize, + /// Filter related metrics for readers. + filter_metrics: ReaderFilterMetrics, } impl ScannerMetrics { @@ -784,19 +770,10 @@ impl ScannerMetrics { .with_label_values(&["build_parts"]) .observe(self.build_parts_cost.as_secs_f64()); - self.num_row_groups_before_filtering = reader_metrics.num_row_groups_before_filtering; - self.num_row_groups_fulltext_index_filtered = - reader_metrics.num_row_groups_fulltext_index_filtered; - self.num_row_groups_inverted_index_filtered = - reader_metrics.num_row_groups_inverted_index_filtered; - self.num_row_groups_min_max_filtered = reader_metrics.num_row_groups_min_max_filtered; - self.num_rows_precise_filtered = reader_metrics.num_rows_precise_filtered; - self.num_rows_in_row_group_before_filtering = - reader_metrics.num_rows_in_row_group_before_filtering; - self.num_rows_in_row_group_fulltext_index_filtered = - reader_metrics.num_rows_in_row_group_fulltext_index_filtered; - self.num_rows_in_row_group_inverted_index_filtered = - reader_metrics.num_rows_in_row_group_inverted_index_filtered; + // We only call this once so we overwrite it directly. + self.filter_metrics = reader_metrics.filter_metrics; + // Observes filter metrics. + self.filter_metrics.observe(); } /// Observes metrics on scanner finish. diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 58c81a1815a8..0ab2f9e2b5d5 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -97,13 +97,13 @@ impl PruneReader { let num_rows_before_filter = batch.num_rows(); let Some(batch_filtered) = self.context.precise_filter(batch)? else { // the entire batch is filtered out - self.metrics.num_rows_precise_filtered += num_rows_before_filter; + self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter; return Ok(None); }; // update metric let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.num_rows_precise_filtered += filtered_rows; + self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows; if !batch_filtered.is_empty() { Ok(Some(batch_filtered)) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1f049f7a0034..6174165206d0 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -225,7 +225,7 @@ impl ParquetReaderBuilder { .context(ReadParquetSnafu { path: &file_path })?; let row_groups = self - .row_groups_to_read(&read_format, &parquet_meta, metrics) + .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; let reader_builder = RowGroupReaderBuilder { @@ -339,7 +339,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> BTreeMap> { let num_row_groups = parquet_meta.num_row_groups(); let num_rows = parquet_meta.file_metadata().num_rows(); @@ -385,7 +385,7 @@ impl ParquetReaderBuilder { row_group_size: usize, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.fulltext_index_applier else { return false; @@ -465,7 +465,7 @@ impl ParquetReaderBuilder { row_group_size: usize, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.inverted_index_applier else { return false; @@ -532,7 +532,7 @@ impl ParquetReaderBuilder { read_format: &ReadFormat, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(predicate) = &self.predicate else { return false; @@ -727,9 +727,9 @@ fn time_range_to_predicate( Ok(predicates) } -/// Parquet reader metrics. -#[derive(Debug, Default, Clone)] -pub(crate) struct ReaderMetrics { +/// Metrics of filtering rows groups and rows. +#[derive(Debug, Default, Clone, Copy)] +pub(crate) struct ReaderFilterMetrics { /// Number of row groups before filtering. pub(crate) num_row_groups_before_filtering: usize, /// Number of row groups filtered by fulltext index. @@ -746,6 +746,57 @@ pub(crate) struct ReaderMetrics { pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize, /// Number of rows in row group filtered by inverted index. pub(crate) num_rows_in_row_group_inverted_index_filtered: usize, +} + +impl ReaderFilterMetrics { + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) { + self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; + self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; + self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; + self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; + self.num_rows_precise_filtered += other.num_rows_precise_filtered; + self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_fulltext_index_filtered += + other.num_rows_in_row_group_fulltext_index_filtered; + self.num_rows_in_row_group_inverted_index_filtered += + other.num_rows_in_row_group_inverted_index_filtered; + } + + /// Reports metrics. + pub(crate) fn observe(&self) { + READ_ROW_GROUPS_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.num_row_groups_before_filtering as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.num_row_groups_fulltext_index_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.num_row_groups_inverted_index_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["minmax_index_filtered"]) + .inc_by(self.num_row_groups_min_max_filtered as u64); + PRECISE_FILTER_ROWS_TOTAL + .with_label_values(&["parquet"]) + .inc_by(self.num_rows_precise_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.num_rows_in_row_group_before_filtering as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64); + } +} + +/// Parquet reader metrics. +#[derive(Debug, Default, Clone)] +pub(crate) struct ReaderMetrics { + /// Filtered row groups and rows metrics. + pub(crate) filter_metrics: ReaderFilterMetrics, /// Duration to build the parquet reader. pub(crate) build_cost: Duration, /// Duration to scan the reader. @@ -761,16 +812,7 @@ pub(crate) struct ReaderMetrics { impl ReaderMetrics { /// Adds `other` metrics to this metrics. pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) { - self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; - self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; - self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; - self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; - self.num_rows_precise_filtered += other.num_rows_precise_filtered; - self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; - self.num_rows_in_row_group_fulltext_index_filtered += - other.num_rows_in_row_group_fulltext_index_filtered; - self.num_rows_in_row_group_inverted_index_filtered += - other.num_rows_in_row_group_inverted_index_filtered; + self.filter_metrics.merge_from(&other.filter_metrics); self.build_cost += other.build_cost; self.scan_cost += other.scan_cost; self.num_record_batches += other.num_record_batches; @@ -1009,10 +1051,12 @@ impl Drop for ParquetReader { self.context.reader_builder().file_handle.region_id(), self.context.reader_builder().file_handle.file_id(), self.context.reader_builder().file_handle.time_range(), - metrics.num_row_groups_before_filtering - - metrics.num_row_groups_inverted_index_filtered - - metrics.num_row_groups_min_max_filtered, - metrics.num_row_groups_before_filtering, + metrics.filter_metrics.num_row_groups_before_filtering + - metrics + .filter_metrics + .num_row_groups_inverted_index_filtered + - metrics.filter_metrics.num_row_groups_min_max_filtered, + metrics.filter_metrics.num_row_groups_before_filtering, metrics ); @@ -1026,30 +1070,7 @@ impl Drop for ParquetReader { READ_ROWS_TOTAL .with_label_values(&["parquet"]) .inc_by(metrics.num_rows as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["before_filtering"]) - .inc_by(metrics.num_row_groups_before_filtering as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["fulltext_index_filtered"]) - .inc_by(metrics.num_row_groups_fulltext_index_filtered as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["inverted_index_filtered"]) - .inc_by(metrics.num_row_groups_inverted_index_filtered as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["minmax_index_filtered"]) - .inc_by(metrics.num_row_groups_min_max_filtered as u64); - PRECISE_FILTER_ROWS_TOTAL - .with_label_values(&["parquet"]) - .inc_by(metrics.num_rows_precise_filtered as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["before_filtering"]) - .inc_by(metrics.num_rows_in_row_group_before_filtering as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["fulltext_index_filtered"]) - .inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["inverted_index_filtered"]) - .inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64); + metrics.filter_metrics.observe(); } }