diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 0416b581a44d2..cab61c5091e39 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -43,6 +43,7 @@ use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; use datafusion_physical_plan::joins::DynamicFilterInfo; +use datafusion_physical_plan::Metric; use itertools::Itertools; use log::debug; @@ -802,7 +803,15 @@ impl ExecutionPlan for ParquetExec { .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters { - dynamic_filter.final_predicate(self.predicate.clone()) + let (final_expr, name) = + dynamic_filter.final_predicate(self.predicate.clone()); + if let Some(_) = &final_expr { + self.metrics.register(Arc::new(Metric::new( + datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name), + None, + ))); + } + final_expr } else { self.predicate.clone() }; diff --git a/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs index d01da64286a71..3bf02d2d1bbf1 100644 --- a/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs @@ -19,7 +19,6 @@ use std::sync::Arc; -#[cfg(feature = "parquet")] use crate::datasource::physical_plan::ParquetExec; use crate::physical_plan::ExecutionPlan; use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec}; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 7b9ef85b9e4c1..f9d65141d9476 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ pub mod coalesce_batches; pub mod enforce_distribution; pub mod enforce_sorting; +#[cfg(feature = "parquet")] pub mod join_filter_pushdown; pub mod join_selection; pub mod optimizer; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 3c37365e82174..412c20458e090 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -17,9 +17,7 @@ //! Physical optimizer traits -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use std::sync::Arc; - +#[cfg(feature = "parquet")] use super::join_filter_pushdown::JoinFilterPushdown; use super::projection_pushdown::ProjectionPushdown; use super::update_aggr_exprs::OptimizeAggregateOrder; @@ -34,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_optimizer::topk_aggregation::TopKAggregation; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use std::sync::Arc; /// A rule-based physical optimizer. #[derive(Clone, Debug)] @@ -113,6 +113,7 @@ impl PhysicalOptimizer { // given query plan; i.e. it only acts as a final // gatekeeping rule. Arc::new(SanityCheckPlan::new()), + #[cfg(feature = "parquet")] Arc::new(JoinFilterPushdown::new()), ]; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 74ca505c7d068..1902521aedf69 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1121,9 +1121,8 @@ impl DefaultPhysicalPlanner { null_equals_null, )?) }; - // build dynamic filter - if join.support_dynamic_filter() { + if join.support_dynamic_filter() && dynamic_pushdown_columns.as_ref().is_some_and(|columns| !columns.is_empty()) { let physical_dynamic_filter_info: Option> = if let Some(dynamic_columns) = dynamic_pushdown_columns { let columns_and_types_and_names: Vec<(Arc, String)> = @@ -1131,7 +1130,7 @@ impl DefaultPhysicalPlanner { .iter() .map(|dynamic_column| { let column = dynamic_column.column(); - let index = join_schema.index_of_column(column)?; + let index = join.schema().index_of(column.name())?; let physical_column = Arc::new( datafusion_physical_expr::expressions::Column::new( &column.name, diff --git a/datafusion/optimizer/src/join_filter_pushdown.rs b/datafusion/optimizer/src/join_filter_pushdown.rs index d13457499a7af..c7ab8ddb837ef 100644 --- a/datafusion/optimizer/src/join_filter_pushdown.rs +++ b/datafusion/optimizer/src/join_filter_pushdown.rs @@ -47,7 +47,9 @@ impl OptimizerRule for JoinFilterPushdown { plan: LogicalPlan, config: &dyn OptimizerConfig, ) -> Result, DataFusionError> { - if !config.options().optimizer.dynamic_join_pushdown { + if !config.options().optimizer.dynamic_join_pushdown + //|| !config.options().execution.parquet.pushdown_filters + { return Ok(Transformed::no(plan)); } @@ -116,7 +118,11 @@ impl OptimizerRule for JoinFilterPushdown { fn unsupported_join_type(join_type: &JoinType) -> bool { matches!( join_type, - JoinType::Left | JoinType::RightSemi | JoinType::RightAnti + JoinType::Left + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::LeftSemi + | JoinType::LeftAnti ) } diff --git a/datafusion/physical-plan/src/joins/dynamic_filters.rs b/datafusion/physical-plan/src/joins/dynamic_filters.rs index c25c55ed00b22..ab174f38c556c 100644 --- a/datafusion/physical-plan/src/joins/dynamic_filters.rs +++ b/datafusion/physical-plan/src/joins/dynamic_filters.rs @@ -230,19 +230,26 @@ impl DynamicFilterInfo { pub fn final_predicate( &self, predicate: Option>, - ) -> Option> { + ) -> (Option>, String) { let inner = self.inner.lock(); - match (inner.final_expr.clone(), predicate) { - (Some(self_expr), Some(input_expr)) => Some(Arc::new(BinaryExpr::new( - self_expr, - Operator::And, - input_expr, - ))), + let result = match (inner.final_expr.clone(), predicate) { + (Some(self_expr), Some(input_expr)) => { + Some( + Arc::new(BinaryExpr::new(self_expr, Operator::And, input_expr)) + as Arc, + ) + } (Some(self_expr), None) => Some(self_expr), (None, Some(input_expr)) => Some(input_expr), (None, None) => None, - } + }; + let debug_info = inner + .final_expr + .as_ref() + .map(|expr| format!("{}", expr)) + .map_or("".to_string(), |name| name); + (result, debug_info) } // used for adding partition numbers diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index ead0ca3369387..13cfb339a8245 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -263,6 +263,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::DynamicFilter(_) => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 5a335d9f99cd2..3481d668caf67 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -400,6 +400,9 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + + /// Dynamic filters + DynamicFilter(String), } impl MetricValue { @@ -417,6 +420,7 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::DynamicFilter(_) => "dynamic_filters", } } @@ -442,6 +446,7 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + Self::DynamicFilter(_) => 1, } } @@ -469,6 +474,7 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::DynamicFilter(name) => Self::DynamicFilter(name.clone()), } } @@ -515,6 +521,7 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + (Self::DynamicFilter(_), _) => {} m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -539,6 +546,7 @@ impl MetricValue { Self::Time { .. } => 8, Self::StartTimestamp(_) => 9, // show timestamps last Self::EndTimestamp(_) => 10, + Self::DynamicFilter(_) => 11, } } @@ -574,6 +582,9 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::DynamicFilter(filter) => { + write!(f, "dynamic_filter: {filter}") + } } } } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 1340fd490e06f..6bcd919b09376 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -205,6 +205,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE @@ -230,6 +231,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections SAME TEXT AS ABOVE +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true initial_physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -251,6 +253,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N] @@ -327,6 +330,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -367,6 +371,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 57bf029a63c1b..ca47e1cd7543f 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -223,6 +223,7 @@ datafusion.explain.show_sizes true datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.dynamic_join_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true @@ -314,6 +315,7 @@ datafusion.explain.show_sizes true When set to true, the explain statement will datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.dynamic_join_pushdown true when set to true, datafusion would try to push the build side statistic to probe phase datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible