From 2ea40d03cc40a6c03f5751a00a5f7785c6461f43 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 19 Aug 2024 18:28:19 +0800 Subject: [PATCH] feat: collect filter metrics --- src/mito2/src/read.rs | 33 +++++++++++++++++++++++++++- src/mito2/src/read/scan_region.rs | 8 ++++--- src/mito2/src/read/seq_scan.rs | 4 ++-- src/mito2/src/read/unordered_scan.rs | 4 ++-- src/mito2/src/sst/parquet/reader.rs | 13 ++++++----- 5 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 04a34fc9ac56..6f6db6554f29 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -56,6 +56,7 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; +use crate::sst::parquet::reader::ReaderMetrics; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -752,11 +753,27 @@ pub(crate) struct ScannerMetrics { num_batches: usize, /// Number of rows returned. num_rows: usize, + /// Number of row groups before filtering. + num_row_groups_before_filtering: usize, + /// Number of row groups filtered by fulltext index. + num_row_groups_fulltext_index_filtered: usize, + /// Number of row groups filtered by inverted index. + num_row_groups_inverted_index_filtered: usize, + /// Number of row groups filtered by min-max index. + num_row_groups_min_max_filtered: usize, + /// Number of rows filtered by precise filter. + num_rows_precise_filtered: usize, + /// Number of rows in row group before filtering. + num_rows_in_row_group_before_filtering: usize, + /// Number of rows in row group filtered by fulltext index. + num_rows_in_row_group_fulltext_index_filtered: usize, + /// Number of rows in row group filtered by inverted index. + num_rows_in_row_group_inverted_index_filtered: usize, } impl ScannerMetrics { /// Sets and observes metrics on initializing parts. - fn observe_init_part(&mut self, build_parts_cost: Duration) { + fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) { self.build_parts_cost = build_parts_cost; // Observes metrics. @@ -766,6 +783,20 @@ impl ScannerMetrics { READ_STAGE_ELAPSED .with_label_values(&["build_parts"]) .observe(self.build_parts_cost.as_secs_f64()); + + self.num_row_groups_before_filtering = reader_metrics.num_row_groups_before_filtering; + self.num_row_groups_fulltext_index_filtered = + reader_metrics.num_row_groups_fulltext_index_filtered; + self.num_row_groups_inverted_index_filtered = + reader_metrics.num_row_groups_inverted_index_filtered; + self.num_row_groups_min_max_filtered = reader_metrics.num_row_groups_min_max_filtered; + self.num_rows_precise_filtered = reader_metrics.num_rows_precise_filtered; + self.num_rows_in_row_group_before_filtering = + reader_metrics.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_fulltext_index_filtered = + reader_metrics.num_rows_in_row_group_fulltext_index_filtered; + self.num_rows_in_row_group_inverted_index_filtered = + reader_metrics.num_rows_in_row_group_inverted_index_filtered; } /// Observes metrics on scanner finish. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5e27ebe6df77..d0bef554727f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::reader::ReaderMetrics; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -606,8 +607,9 @@ impl ScanInput { pub(crate) async fn prune_file_ranges( &self, collector: &mut impl FileRangeCollector, - ) -> Result<()> { + ) -> Result { let mut file_prune_cost = Duration::ZERO; + let mut reader_metrics = ReaderMetrics::default(); for file in &self.files { let prune_start = Instant::now(); let res = self @@ -620,7 +622,7 @@ impl ScanInput { .inverted_index_applier(self.inverted_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) - .build_reader_input() + .build_reader_input(&mut reader_metrics) .await; file_prune_cost += prune_start.elapsed(); let (mut file_range_ctx, row_groups) = match res { @@ -665,7 +667,7 @@ impl ScanInput { file_prune_cost ); - Ok(()) + Ok(reader_metrics) } /// Scans the input source in another task and sends batches to the sender. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 551d304f2f16..e808e2f22385 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -439,7 +439,7 @@ impl SeqScan { if part_list.0.is_none() { let now = Instant::now(); let mut distributor = SeqDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; + let reader_metrics = input.prune_file_ranges(&mut distributor).await?; distributor.append_mem_ranges( &input.memtables, Some(input.mapper.column_ids()), @@ -451,7 +451,7 @@ impl SeqScan { let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; - metrics.observe_init_part(build_part_cost); + metrics.observe_init_part(build_part_cost, &reader_metrics); } else { // Updates the cost of building parts. metrics.build_parts_cost = part_list.1; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 1d8a0cd8ebd3..0fcaf667afe4 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -263,7 +263,7 @@ async fn maybe_init_parts( if part_list.0.is_none() { let now = Instant::now(); let mut distributor = UnorderedDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; + let reader_metrics = input.prune_file_ranges(&mut distributor).await?; distributor.append_mem_ranges( &input.memtables, Some(input.mapper.column_ids()), @@ -275,7 +275,7 @@ async fn maybe_init_parts( let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; - metrics.observe_init_part(build_part_cost); + metrics.observe_init_part(build_part_cost, &reader_metrics); } else { // Updates the cost of building parts. metrics.build_parts_cost = part_list.1; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4cca1c1c29e2..1f049f7a0034 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -174,14 +174,19 @@ impl ParquetReaderBuilder { /// /// This needs to perform IO operation. pub async fn build(&self) -> Result { - let (context, row_groups) = self.build_reader_input().await?; + let mut metrics = ReaderMetrics::default(); + + let (context, row_groups) = self.build_reader_input(&mut metrics).await?; ParquetReader::new(Arc::new(context), row_groups).await } /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. - pub(crate) async fn build_reader_input(&self) -> Result<(FileRangeContext, RowGroupMap)> { + pub(crate) async fn build_reader_input( + &self, + metrics: &mut ReaderMetrics, + ) -> Result<(FileRangeContext, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -219,10 +224,8 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; - let mut metrics = ReaderMetrics::default(); - let row_groups = self - .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) + .row_groups_to_read(&read_format, &parquet_meta, metrics) .await; let reader_builder = RowGroupReaderBuilder {