From 85f92ef6be1d7364857a2c13fbb026d9e45406ea Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Nov 2024 07:07:47 -0500 Subject: [PATCH] Apply projection to `Statistics` in `FilterExec` (#13187) * Apply projection to `Statistics` in `FilterExec` * Use Statistics::project in HashJoin --- datafusion/common/src/stats.rs | 20 ++++++++ datafusion/physical-plan/src/filter.rs | 7 ++- .../physical-plan/src/joins/hash_join.rs | 13 +---- .../sqllogictest/test_files/parquet.slt | 49 +++++++++++++++++++ 4 files changed, 77 insertions(+), 12 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index e669c674f78a..1aa42705e7f8 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -258,6 +258,26 @@ impl Statistics { self } + /// Project the statistics to the given column indices. + /// + /// For example, if we had statistics for columns `{"a", "b", "c"}`, + /// projecting to `vec![2, 1]` would return statistics for columns `{"c", + /// "b"}`. + pub fn project(mut self, projection: Option<&Vec>) -> Self { + let Some(projection) = projection else { + return self; + }; + + // todo: it would be nice to avoid cloning column statistics if + // possible (e.g. if the projection did not contain duplicates) + self.column_statistics = projection + .iter() + .map(|&i| self.column_statistics[i].clone()) + .collect(); + + self + } + /// Calculates the statistics after `fetch` and `skip` operations apply. /// Here, `self` denotes per-partition statistics. Use the `n_partitions` /// parameter to compute global statistics in a multi-partition setting. diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 97d8159137f4..07898e8d22d8 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -371,7 +371,12 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { - Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) + let stats = Self::statistics_helper( + &self.input, + self.predicate(), + self.default_selectivity, + )?; + Ok(stats.project(self.projection.as_ref())) } fn cardinality_effect(&self) -> CardinalityEffect { diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 57d8a9ce7b35..ae872e13a9f6 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -785,7 +785,7 @@ impl ExecutionPlan for HashJoinExec { // TODO stats: it is not possible in general to know the output size of joins // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - let mut stats = estimate_join_statistics( + let stats = estimate_join_statistics( Arc::clone(&self.left), Arc::clone(&self.right), self.on.clone(), @@ -793,16 +793,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - if let Some(projection) = &self.projection { - stats.column_statistics = stats - .column_statistics - .into_iter() - .enumerate() - .filter(|(i, _)| projection.contains(i)) - .map(|(_, s)| s) - .collect(); - } - Ok(stats) + Ok(stats.project(self.projection.as_ref())) } } diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index ed963466fca6..253ebb9ea0ac 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -549,3 +549,52 @@ FixedSizeBinary(16) 0166ce1d46129ad104fa4990c6057c91 statement ok DROP TABLE test_non_utf8_binary; + + +## Tests for https://github.com/apache/datafusion/issues/13186 +statement ok +create table cpu (time timestamp, usage_idle float, usage_user float, cpu int); + +statement ok +insert into cpu values ('1970-01-01 00:00:00', 1.0, 2.0, 3); + +# must put it into a parquet file to get statistics +statement ok +copy (select * from cpu) to 'test_files/scratch/parquet/cpu.parquet'; + +# Run queries against parquet files +statement ok +create external table cpu_parquet +stored as parquet +location 'test_files/scratch/parquet/cpu.parquet'; + +# Double filtering +# +# Expect 1 row for both queries +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + +query PI +select time, rn +from ( + select time, row_number() OVER (ORDER BY usage_idle, time) as rn + from cpu_parquet + where cpu = 3 +) where rn > 0; +---- +1970-01-01T00:00:00 1 + + +# Clean up +statement ok +drop table cpu; + +statement ok +drop table cpu_parquet;