Skip to content

Commit

Permalink
fix failed test, add analyze info, fix compile error
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 25, 2024
1 parent 333c57b commit 1aeb19a
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 20 deletions.
11 changes: 10 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};

use datafusion_physical_plan::joins::DynamicFilterInfo;
use datafusion_physical_plan::Metric;
use itertools::Itertools;
use log::debug;

Expand Down Expand Up @@ -802,7 +803,15 @@ impl ExecutionPlan for ParquetExec {
.clone()
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters {
dynamic_filter.final_predicate(self.predicate.clone())
let (final_expr, name) =
dynamic_filter.final_predicate(self.predicate.clone());
if let Some(_) = &final_expr {
self.metrics.register(Arc::new(Metric::new(
datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name),
None,
)));
}
final_expr
} else {
self.predicate.clone()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use std::sync::Arc;

#[cfg(feature = "parquet")]
use crate::datasource::physical_plan::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec};
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
pub mod coalesce_batches;
pub mod enforce_distribution;
pub mod enforce_sorting;
#[cfg(feature = "parquet")]
pub mod join_filter_pushdown;
pub mod join_selection;
pub mod optimizer;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

//! Physical optimizer traits
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

#[cfg(feature = "parquet")]
use super::join_filter_pushdown::JoinFilterPushdown;
use super::projection_pushdown::ProjectionPushdown;
use super::update_aggr_exprs::OptimizeAggregateOrder;
Expand All @@ -34,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr
use crate::physical_optimizer::output_requirements::OutputRequirements;
use crate::physical_optimizer::sanity_checker::SanityCheckPlan;
use crate::physical_optimizer::topk_aggregation::TopKAggregation;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use std::sync::Arc;

/// A rule-based physical optimizer.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -113,6 +113,7 @@ impl PhysicalOptimizer {
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
#[cfg(feature = "parquet")]
Arc::new(JoinFilterPushdown::new()),
];

Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, Extension, Filter, JoinType, RecursiveQuery, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp
};
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{Column, Literal};
Expand Down Expand Up @@ -1110,17 +1109,16 @@ impl DefaultPhysicalPlanner {
null_equals_null,
)?)
};

// build dynamic filter
if join.support_dynamic_filter() {
if join.support_dynamic_filter() && dynamic_pushdown_columns.as_ref().is_some_and(|columns| !columns.is_empty()) {
let physical_dynamic_filter_info: Option<Arc<DynamicFilterInfo>> =
if let Some(dynamic_columns) = dynamic_pushdown_columns {
let columns_and_types_and_names: Vec<(Arc<Column>, String)> =
dynamic_columns
.iter()
.map(|dynamic_column| {
let column = dynamic_column.column();
let index = join_schema.index_of_column(column)?;
let index = join.schema().index_of(column.name())?;
let physical_column = Arc::new(
datafusion_physical_expr::expressions::Column::new(
&column.name,
Expand Down
10 changes: 8 additions & 2 deletions datafusion/optimizer/src/join_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ impl OptimizerRule for JoinFilterPushdown {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
if !config.options().optimizer.dynamic_join_pushdown {
if !config.options().optimizer.dynamic_join_pushdown
//|| !config.options().execution.parquet.pushdown_filters
{
return Ok(Transformed::no(plan));
}

Expand Down Expand Up @@ -116,7 +118,11 @@ impl OptimizerRule for JoinFilterPushdown {
fn unsupported_join_type(join_type: &JoinType) -> bool {
matches!(
join_type,
JoinType::Left | JoinType::RightSemi | JoinType::RightAnti
JoinType::Left
| JoinType::RightSemi
| JoinType::RightAnti
| JoinType::LeftSemi
| JoinType::LeftAnti
)
}

Expand Down
23 changes: 15 additions & 8 deletions datafusion/physical-plan/src/joins/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,26 @@ impl DynamicFilterInfo {
pub fn final_predicate(
&self,
predicate: Option<Arc<dyn PhysicalExpr>>,
) -> Option<Arc<dyn PhysicalExpr>> {
) -> (Option<Arc<dyn PhysicalExpr>>, String) {
let inner = self.inner.lock();

match (inner.final_expr.clone(), predicate) {
(Some(self_expr), Some(input_expr)) => Some(Arc::new(BinaryExpr::new(
self_expr,
Operator::And,
input_expr,
))),
let result = match (inner.final_expr.clone(), predicate) {
(Some(self_expr), Some(input_expr)) => {
Some(
Arc::new(BinaryExpr::new(self_expr, Operator::And, input_expr))
as Arc<dyn PhysicalExpr>,
)
}
(Some(self_expr), None) => Some(self_expr),
(None, Some(input_expr)) => Some(input_expr),
(None, None) => None,
}
};
let debug_info = inner
.final_expr
.as_ref()
.map(|expr| format!("{}", expr))
.map_or("".to_string(), |name| name);
(result, debug_info)
}

// used for adding partition numbers
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl MetricsSet {
MetricValue::Gauge { name, .. } => name == metric_name,
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
MetricValue::DynamicFilter(_) => false,
})
}

Expand Down
11 changes: 11 additions & 0 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ pub enum MetricValue {
StartTimestamp(Timestamp),
/// The time at which execution ended
EndTimestamp(Timestamp),

/// Dynamic filters
DynamicFilter(String),
}

impl MetricValue {
Expand All @@ -417,6 +420,7 @@ impl MetricValue {
Self::Time { name, .. } => name.borrow(),
Self::StartTimestamp(_) => "start_timestamp",
Self::EndTimestamp(_) => "end_timestamp",
Self::DynamicFilter(_) => "dynamic_filters",
}
}

Expand All @@ -442,6 +446,7 @@ impl MetricValue {
.and_then(|ts| ts.timestamp_nanos_opt())
.map(|nanos| nanos as usize)
.unwrap_or(0),
Self::DynamicFilter(_) => 1,
}
}

Expand Down Expand Up @@ -469,6 +474,7 @@ impl MetricValue {
},
Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()),
Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()),
Self::DynamicFilter(name) => Self::DynamicFilter(name.clone()),
}
}

Expand Down Expand Up @@ -515,6 +521,7 @@ impl MetricValue {
(Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => {
timestamp.update_to_max(other_timestamp);
}
(Self::DynamicFilter(_), _) => {}
m @ (_, _) => {
panic!(
"Mismatched metric types. Can not aggregate {:?} with value {:?}",
Expand All @@ -539,6 +546,7 @@ impl MetricValue {
Self::Time { .. } => 8,
Self::StartTimestamp(_) => 9, // show timestamps last
Self::EndTimestamp(_) => 10,
Self::DynamicFilter(_) => 11,
}
}

Expand Down Expand Up @@ -574,6 +582,9 @@ impl Display for MetricValue {
Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => {
write!(f, "{timestamp}")
}
Self::DynamicFilter(filter) => {
write!(f, "dynamic_filter: {filter}")
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c]
logical_plan after join_filter_pushdown SAME TEXT AS ABOVE
logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
logical_plan after simplify_expressions SAME TEXT AS ABOVE
logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
Expand Down Expand Up @@ -234,6 +235,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
logical_plan after optimize_projections SAME TEXT AS ABOVE
logical_plan after join_filter_pushdown SAME TEXT AS ABOVE
logical_plan TableScan: simple_explain_test projection=[a, b, c]
initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
initial_physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
Expand All @@ -255,6 +257,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true
physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N]
Expand Down Expand Up @@ -331,6 +334,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]

Expand Down Expand Up @@ -371,6 +375,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10
physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N]
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ datafusion.explain.show_sizes true
datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.dynamic_join_pushdown true
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
Expand Down Expand Up @@ -314,6 +315,7 @@ datafusion.explain.show_sizes true When set to true, the explain statement will
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
datafusion.optimizer.dynamic_join_pushdown true when set to true, datafusion would try to push the build side statistic to probe phase
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
Expand Down

0 comments on commit 1aeb19a

Please sign in to comment.