From b821929194c0ae3d3d5e862e0f82a0e5dc55702f Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Wed, 25 Sep 2024 12:24:58 +0200 Subject: [PATCH] parquet: Add finer metrics on operations covered by `time_elapsed_opening` (#12585) * Rename 'pushdown_eval_time' to 'row_pushdown_eval_time' As it does not measure evaluation of row group-level pushdown filters * Add statistics_eval_time and bloom_filter_eval_time metrics * Add metadata_load_time metric * Add docs * Move timers to the callees --- .../physical_plan/parquet/metrics.rs | 30 +++++++++++++++---- .../datasource/physical_plan/parquet/mod.rs | 12 ++++++-- .../physical_plan/parquet/opener.rs | 3 ++ .../physical_plan/parquet/row_filter.rs | 2 +- .../physical_plan/parquet/row_group_filter.rs | 6 ++++ docs/source/user-guide/explain-usage.md | 4 ++- 6 files changed, 48 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs index 20dbc94ecf1d..f1b5f71530dc 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs @@ -43,14 +43,20 @@ pub struct ParquetFileMetrics { pub pushdown_rows_pruned: Count, /// Total rows passed predicates pushed into parquet scan pub pushdown_rows_matched: Count, - /// Total time spent evaluating pushdown filters - pub pushdown_eval_time: Time, + /// Total time spent evaluating row-level pushdown filters + pub row_pushdown_eval_time: Time, + /// Total time spent evaluating row group-level statistics filters + pub statistics_eval_time: Time, + /// Total time spent evaluating row group Bloom Filters + pub bloom_filter_eval_time: Time, /// Total rows filtered out by parquet page index pub page_index_rows_pruned: Count, /// Total rows passed through the parquet page index pub page_index_rows_matched: Count, /// Total time spent evaluating parquet page index filters pub page_index_eval_time: Time, + /// Total time spent reading and parsing metadata from the footer + pub metadata_load_time: Time, } impl ParquetFileMetrics { @@ -91,9 +97,16 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .counter("pushdown_rows_matched", partition); - let pushdown_eval_time = MetricBuilder::new(metrics) + let row_pushdown_eval_time = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) - .subset_time("pushdown_eval_time", partition); + .subset_time("row_pushdown_eval_time", partition); + let statistics_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("statistics_eval_time", partition); + let bloom_filter_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("bloom_filter_eval_time", partition); + let page_index_rows_pruned = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .counter("page_index_rows_pruned", partition); @@ -105,6 +118,10 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .subset_time("page_index_eval_time", partition); + let metadata_load_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("metadata_load_time", partition); + Self { predicate_evaluation_errors, row_groups_matched_bloom_filter, @@ -114,10 +131,13 @@ impl ParquetFileMetrics { bytes_scanned, pushdown_rows_pruned, pushdown_rows_matched, - pushdown_eval_time, + row_pushdown_eval_time, page_index_rows_pruned, page_index_rows_matched, + statistics_eval_time, + bloom_filter_eval_time, page_index_eval_time, + metadata_load_time, } } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index ce679bfa76c5..6afb66cc7c02 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -1873,8 +1873,16 @@ mod tests { assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5); assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2); assert!( - get_value(&metrics, "pushdown_eval_time") > 0, - "no eval time in metrics: {metrics:#?}" + get_value(&metrics, "row_pushdown_eval_time") > 0, + "no pushdown eval time in metrics: {metrics:#?}" + ); + assert!( + get_value(&metrics, "statistics_eval_time") > 0, + "no statistics eval time in metrics: {metrics:#?}" + ); + assert!( + get_value(&metrics, "bloom_filter_eval_time") > 0, + "no Bloom Filter eval time in metrics: {metrics:#?}" ); } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 9880c30ddb6b..a818a8850284 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -118,6 +118,7 @@ impl FileOpener for ParquetOpener { Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + let mut metadata_timer = file_metrics.metadata_load_time.timer(); let metadata = ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; let mut schema = metadata.schema().clone(); @@ -133,6 +134,8 @@ impl FileOpener for ParquetOpener { let metadata = ArrowReaderMetadata::try_new(metadata.metadata().clone(), options)?; + metadata_timer.stop(); + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index a8e3e72c11f3..e876f840d1eb 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -531,7 +531,7 @@ pub fn build_row_filter( ) -> Result> { let rows_pruned = &file_metrics.pushdown_rows_pruned; let rows_matched = &file_metrics.pushdown_rows_matched; - let time = &file_metrics.pushdown_eval_time; + let time = &file_metrics.row_pushdown_eval_time; // Split into conjuncts: // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`] diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs index 4cdcb005018e..a1d74cb54355 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_group_filter.rs @@ -109,6 +109,9 @@ impl RowGroupAccessPlanFilter { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { + // scoped timer updates on drop + let _timer_guard = metrics.statistics_eval_time.timer(); + assert_eq!(groups.len(), self.access_plan.len()); // Indexes of row groups still to scan let row_group_indexes = self.access_plan.row_group_indexes(); @@ -158,6 +161,9 @@ impl RowGroupAccessPlanFilter { predicate: &PruningPredicate, metrics: &ParquetFileMetrics, ) { + // scoped timer updates on drop + let _timer_guard = metrics.bloom_filter_eval_time.timer(); + assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len()); for idx in 0..self.access_plan.len() { if !self.access_plan.should_scan(idx) { diff --git a/docs/source/user-guide/explain-usage.md b/docs/source/user-guide/explain-usage.md index b376c2eedb53..2eb03aad2ef9 100644 --- a/docs/source/user-guide/explain-usage.md +++ b/docs/source/user-guide/explain-usage.md @@ -235,7 +235,9 @@ When predicate pushdown is enabled, `ParquetExec` gains the following metrics: - `pushdown_rows_pruned`: rows that were tested by any of the above filtered, and did not pass one of them (this should be sum of `page_index_rows_matched`, `row_groups_pruned_bloom_filter`, and `row_groups_pruned_statistics`) - `predicate_evaluation_errors`: number of times evaluating the filter expression failed (expected to be zero in normal operation) - `num_predicate_creation_errors`: number of errors creating predicates (expected to be zero in normal operation) -- `pushdown_eval_time`: time spent evaluating these filters +- `bloom_filter_eval_time`: time spent parsing and evaluating Bloom Filters +- `statistics_eval_time`: time spent parsing and evaluating row group-level statistics +- `row_pushdown_eval_time`: time spent evaluating row-level filters - `page_index_eval_time`: time required to evaluate the page index filters ## Partitions and Execution