diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 47ffe0b1c66b..a2863d3ef44c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -617,6 +617,10 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// when set to true, datafusion would try to push the build side statistic + /// to probe phase + pub dynamic_join_pushdown: bool, default = true } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 743dd5896986..1d24e00649f5 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -42,6 +42,7 @@ use crate::{ use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::joins::DynamicFilterInfo; use itertools::Itertools; use log::debug; @@ -282,6 +283,8 @@ pub struct ParquetExec { table_parquet_options: TableParquetOptions, /// Optional user defined schema adapter schema_adapter_factory: Option>, + /// dynamic filters (like join filters) + dynamic_filters: Option>, } impl From for ParquetExecBuilder { @@ -291,7 +294,6 @@ impl From for ParquetExecBuilder { } /// [`ParquetExecBuilder`], builder for [`ParquetExec`]. -/// /// See example on [`ParquetExec`]. pub struct ParquetExecBuilder { file_scan_config: FileScanConfig, @@ -463,6 +465,7 @@ impl ParquetExecBuilder { cache, table_parquet_options, schema_adapter_factory, + dynamic_filters: None, } } } @@ -515,6 +518,7 @@ impl ParquetExec { cache: _, table_parquet_options, schema_adapter_factory, + .. } = self; ParquetExecBuilder { file_scan_config: base_config, @@ -579,6 +583,15 @@ impl ParquetExec { self } + /// with the dynamic filter + pub fn with_dynamic_filter( + mut self, + dynamic_filter: Option>, + ) -> Self { + self.dynamic_filters = dynamic_filter; + self + } + /// If true, the predicate will be used during the parquet scan. /// Defaults to false /// @@ -711,10 +724,15 @@ impl DisplayAs for ParquetExec { ) }) .unwrap_or_default(); - + let dynamic_filter = + format!("dynamic_filter: {:?}", self.dynamic_filters); write!(f, "ParquetExec: ")?; self.base_config.fmt_as(t, f)?; - write!(f, "{}{}", predicate_string, pruning_predicate_string,) + write!( + f, + "{}{}{}", + predicate_string, pruning_predicate_string, dynamic_filter + ) } } } @@ -798,13 +816,18 @@ impl ExecutionPlan for ParquetExec { .schema_adapter_factory .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); + let final_predicate = if let Some(dynamic_filter) = &self.dynamic_filters { + dynamic_filter.final_predicate(self.predicate.clone()) + } else { + self.predicate.clone() + }; let opener = ParquetOpener { partition_index, projection: Arc::from(projection), batch_size: ctx.session_config().batch_size(), limit: self.base_config.limit, - predicate: self.predicate.clone(), + predicate: final_predicate, pruning_predicate: self.pruning_predicate.clone(), page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: self.base_config.file_schema.clone(), @@ -862,6 +885,7 @@ impl ExecutionPlan for ParquetExec { cache: self.cache.clone(), table_parquet_options: self.table_parquet_options.clone(), schema_adapter_factory: self.schema_adapter_factory.clone(), + dynamic_filters: self.dynamic_filters.clone(), })) } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index a818a8850284..c31429cedd40 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -160,7 +160,6 @@ impl FileOpener for ParquetOpener { &file_metrics, Arc::clone(&schema_mapping), ); - match row_filter { Ok(Some(filter)) => { builder = builder.with_row_filter(filter); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c971e6150633..07a273e94aef 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -52,11 +52,10 @@ use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef, }; -use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; -use datafusion_physical_plan::ExecutionPlanProperties; - use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; +use datafusion_physical_plan::ExecutionPlanProperties; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are diff --git a/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs new file mode 100644 index 000000000000..2bee14c01347 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pushdown the dynamic join filters down to scan execution if there is any + +use std::sync::Arc; + +use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec}; + +use crate::datasource::physical_plan::ParquetExec; +use crate::physical_plan::ExecutionPlan; +use datafusion_common::tree_node::{Transformed, TransformedResult}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::joins::DynamicFilterInfo; + +/// this rule used for pushing the build side statistic down to probe phase +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for JoinFilterPushdown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.dynamic_join_pushdown { + return Ok(plan); + } + optimize_impl(plan, &mut None).data() + } + + fn name(&self) -> &str { + "JoinFilterPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn optimize_impl( + plan: Arc, + join_filters: &mut Option>, +) -> Result>> { + if let Some(hashjoin_exec) = plan.as_any().downcast_ref::() { + join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown); + let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?; + if new_right.transformed { + let new_hash_join = HashJoinExec::try_new( + hashjoin_exec.left().clone(), + new_right.data, + hashjoin_exec.on.clone(), + hashjoin_exec.filter().cloned(), + hashjoin_exec.join_type(), + hashjoin_exec.projection.clone(), + *hashjoin_exec.partition_mode(), + hashjoin_exec.null_equals_null(), + )? + .with_dynamic_filter_info(hashjoin_exec.dynamic_filters_pushdown.clone()); + return Ok(Transformed::yes(Arc::new(new_hash_join))); + } + Ok(Transformed::no(plan)) + } else if let Some(parquet_exec) = plan.as_any().downcast_ref::() { + if let Some(dynamic_filters) = join_filters { + let final_exec = parquet_exec + .clone() + .with_dynamic_filter(Some(dynamic_filters.clone())); + return Ok(Transformed::yes(Arc::new(final_exec))); + } + Ok(Transformed::no(plan)) + } else { + let children = plan.children(); + let mut new_children = Vec::with_capacity(children.len()); + let mut transformed = false; + + for child in children { + let new_child = optimize_impl(child.clone(), join_filters)?; + if new_child.transformed { + transformed = true; + } + new_children.push(new_child.data); + } + + if transformed { + let new_plan = plan.with_new_children(new_children)?; + Ok(Transformed::yes(new_plan)) + } else { + Ok(Transformed::no(plan)) + } + } +} diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 1c63df1f0281..adf1aeb3fcc6 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -383,16 +383,19 @@ fn try_collect_left( { Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?)) } else { - Ok(Some(Arc::new(HashJoinExec::try_new( - Arc::clone(left), - Arc::clone(right), - hash_join.on().to_vec(), - hash_join.filter().cloned(), - hash_join.join_type(), - hash_join.projection.clone(), - PartitionMode::CollectLeft, - hash_join.null_equals_null(), - )?))) + Ok(Some(Arc::new( + HashJoinExec::try_new( + Arc::clone(left), + Arc::clone(right), + hash_join.on().to_vec(), + hash_join.filter().cloned(), + hash_join.join_type(), + hash_join.projection.clone(), + PartitionMode::CollectLeft, + hash_join.null_equals_null(), + )? + .with_dynamic_filter_info(hash_join.dynamic_filters_pushdown.clone()), + ))) } } (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new( diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index efdd3148d03f..7b9ef85b9e4c 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ pub mod coalesce_batches; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod join_filter_pushdown; pub mod join_selection; pub mod optimizer; pub mod projection_pushdown; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 7a6f991121ef..3c37365e8217 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -20,6 +20,7 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule; use std::sync::Arc; +use super::join_filter_pushdown::JoinFilterPushdown; use super::projection_pushdown::ProjectionPushdown; use super::update_aggr_exprs::OptimizeAggregateOrder; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; @@ -112,6 +113,7 @@ impl PhysicalOptimizer { // given query plan; i.e. it only acts as a final // gatekeeping rule. Arc::new(SanityCheckPlan::new()), + Arc::new(JoinFilterPushdown::new()), ]; Self::with_rules(rules) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 918ebccbeb70..c31998ad052d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,10 +17,6 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::Arc; - use crate::datasource::file_format::file_type_to_format; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -61,6 +57,11 @@ use crate::physical_plan::{ displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, }; +use arrow_schema::DataType; +use datafusion_physical_plan::joins::DynamicFilterInfo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; @@ -82,7 +83,7 @@ use datafusion_expr::{ StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -343,6 +344,7 @@ impl DefaultPhysicalPlanner { ); } let plan = outputs.pop().unwrap(); + Ok(plan) } @@ -370,6 +372,7 @@ impl DefaultPhysicalPlanner { ChildrenContainer::None, ) .await?; + let mut current_index = leaf_starter_index; // parent_index is None only for root while let Some(parent_index) = node.parent_index { @@ -848,6 +851,7 @@ impl DefaultPhysicalPlanner { join_type, null_equals_null, schema: join_schema, + dynamic_pushdown_columns, .. }) => { let null_equals_null = *null_equals_null; @@ -1040,6 +1044,53 @@ impl DefaultPhysicalPlanner { } _ => None, }; + // build the dynamic filters if there's any + let physical_dynamic_filter_info: Option> = + if let Some(dynamic_columns) = dynamic_pushdown_columns { + let columns_and_types_and_names: Vec<( + Arc, + &DataType, + String, + )> = dynamic_columns + .iter() + .map(|dynamic_column| { + let column = dynamic_column.column(); + let index = join_schema.index_of_column(column)?; + let physical_column = Arc::new( + datafusion_physical_expr::expressions::Column::new( + &column.name, + index, + ), + ); + let data_type = + join_schema.field_from_column(column)?.data_type(); + + let build_side_name = + dynamic_column.build_name().to_owned(); + Ok((physical_column, data_type, build_side_name)) + }) + .collect::>()?; + + let (physical_columns, data_types, build_side_names) = + columns_and_types_and_names.into_iter().fold( + (Vec::new(), Vec::new(), Vec::new()), + |(mut cols, mut types, mut names), (col, ty, name)| { + cols.push(col); + types.push(ty); + names.push(name); + (cols, types, names) + }, + ); + + Some(Arc::new(DynamicFilterInfo::try_new( + physical_columns, + build_side_names, + data_types, + session_state.config_options().execution.target_partitions, + )?)) + } else { + None + }; let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; @@ -1085,27 +1136,33 @@ impl DefaultPhysicalPlanner { PartitionMode::Partitioned } }; - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - partition_mode, - null_equals_null, - )?) + Arc::new( + HashJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + join_type, + None, + partition_mode, + null_equals_null, + )? + .with_dynamic_filter_info(physical_dynamic_filter_info), + ) } else { - Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_filter, - join_type, - None, - PartitionMode::CollectLeft, - null_equals_null, - )?) + Arc::new( + HashJoinExec::try_new( + physical_left, + physical_right, + join_on, + join_filter, + join_type, + None, + PartitionMode::CollectLeft, + null_equals_null, + )? + .with_dynamic_filter_info(physical_dynamic_filter_info), + ) }; // If plan was mutated previously then need to create the ExecutionPlan diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index d7dc1afe4d50..ab7091686f29 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -54,6 +54,7 @@ serde_json = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } strum_macros = "0.26.0" +parking_lot = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 21304068a8ab..0dff11789c66 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -877,6 +877,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, + dynamic_pushdown_columns: None, }))) } @@ -941,6 +942,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } } @@ -958,6 +960,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, null_equals_null: false, schema: DFSchemaRef::new(join_schema), + dynamic_pushdown_columns: None, }))) } @@ -1169,6 +1172,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 72d8f7158be2..a52d31bda3c8 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -36,7 +36,7 @@ use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan, find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, - split_conjunction, + split_conjunction, DynamicFilterColumn, }; use crate::{ build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr, @@ -671,6 +671,7 @@ impl LogicalPlan { on, schema: _, null_equals_null, + dynamic_pushdown_columns, }) => { let schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -692,6 +693,7 @@ impl LogicalPlan { filter, schema: DFSchemaRef::new(schema), null_equals_null, + dynamic_pushdown_columns, })) } LogicalPlan::CrossJoin(CrossJoin { @@ -956,6 +958,7 @@ impl LogicalPlan { filter: filter_expr, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, + dynamic_pushdown_columns: None, })) } LogicalPlan::CrossJoin(_) => { @@ -3240,6 +3243,8 @@ pub struct Join { pub schema: DFSchemaRef, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// store the column which we could try to push down to scan dynamically + pub dynamic_pushdown_columns: Option>, } impl Join { @@ -3273,8 +3278,17 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equals_null: original_join.null_equals_null, + dynamic_pushdown_columns: None, }) } + + pub fn with_dynamic_pushdown_columns( + mut self, + pushdown_columns: Vec, + ) -> Self { + self.dynamic_pushdown_columns = Some(pushdown_columns); + self + } } // Manual implementation needed because of `schema` field. Comparison excludes this field. diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 606868e75abf..c5bd6943417f 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -142,6 +142,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( rewrite_arc(left, &mut f), right, @@ -157,6 +158,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::CrossJoin(CrossJoin { @@ -643,6 +645,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( on.into_iter().map_until_stop_and_collect( |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1)) @@ -662,6 +665,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 9ee13f1e06d3..ab9a877138f1 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1343,6 +1343,25 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } +/// contain the left column name(build side) and actual column side(probe side) +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DynamicFilterColumn { + pub build_name: String, + pub column: Column, +} + +impl DynamicFilterColumn { + pub fn new(build_name: String, column: Column) -> Self { + Self { build_name, column } + } + pub fn column(&self) -> &Column { + &self.column + } + pub fn build_name(&self) -> &str { + &self.build_name + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index 8102d0e4794b..f2dd0bc10f98 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -887,7 +887,7 @@ macro_rules! min_max { } /// An accumulator to compute the maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MaxAccumulator { max: ScalarValue, } @@ -1194,7 +1194,7 @@ fn get_min_doc() -> &'static Documentation { } /// An accumulator to compute the minimum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MinAccumulator { min: ScalarValue, } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 79a5bb24e918..6c5763e7eebd 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -40,6 +40,7 @@ arrow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } +datafusion-functions-aggregate = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index bce5c77ca674..cdffcc59fbb3 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -338,6 +338,7 @@ fn find_inner_join( filter: None, schema: join_schema, null_equals_null: false, + dynamic_pushdown_columns: None, })); } } @@ -360,6 +361,7 @@ fn find_inner_join( join_type: JoinType::Inner, join_constraint: JoinConstraint::On, null_equals_null: false, + dynamic_pushdown_columns: None, })) } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 1ecb32ca2a43..a23f4239acbe 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin { filter: join.filter.clone(), schema: Arc::clone(&join.schema), null_equals_null: join.null_equals_null, + dynamic_pushdown_columns: None, })); Filter::try_new(filter.predicate, new_join) .map(|f| Transformed::yes(LogicalPlan::Filter(f))) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 48191ec20631..efdeb6d197e8 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -93,6 +94,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } else { Ok(Transformed::no(LogicalPlan::Join(Join { @@ -104,6 +106,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } } diff --git a/datafusion/optimizer/src/join_filter_pushdown.rs b/datafusion/optimizer/src/join_filter_pushdown.rs new file mode 100644 index 000000000000..6fa3daf45633 --- /dev/null +++ b/datafusion/optimizer/src/join_filter_pushdown.rs @@ -0,0 +1,304 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JoinFilterPushdown`] pushdown join filter to scan dynamically + +use datafusion_common::{tree_node::Transformed, DataFusionError}; +use datafusion_expr::{utils::DynamicFilterColumn, Expr, JoinType, LogicalPlan}; + +use crate::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}; + +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for JoinFilterPushdown { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + if !config.options().optimizer.dynamic_join_pushdown { + return Ok(Transformed::no(plan)); + } + + match plan { + LogicalPlan::Join(mut join) => { + if unsupported_join_type(&(join.join_type)) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + let mut columns = Vec::new(); + let mut build_side_names = Vec::new(); + // Iterate the on clause and generate the filter info + for (left, right) in join.on.iter() { + // Only support left to be a column + if let (Expr::Column(l), Expr::Column(r)) = (left, right) { + columns.push(r.clone()); + build_side_names.push(l.name().to_owned()); + } + } + + let mut probe = join.right.as_ref(); + // On the probe sides, we want to make sure that we can push the filter to the probe side + loop { + if matches!(probe, LogicalPlan::TableScan(_)) { + break; + } + match probe { + LogicalPlan::Limit(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Distinct(_) => { + probe = probe.inputs()[0]; + } + LogicalPlan::Projection(project) => { + for column in &columns { + if !project.schema.has_column(column) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + } + probe = probe.inputs()[0]; + } + _ => return Ok(Transformed::no(LogicalPlan::Join(join))), + } + } + let dynamic_columns = columns + .into_iter() + .zip(build_side_names) + .map(|(column, name)| DynamicFilterColumn::new(name, column)) + .collect::>(); + // Assign the value + join = join.with_dynamic_pushdown_columns(dynamic_columns); + Ok(Transformed::yes(LogicalPlan::Join(join))) + } + _ => Ok(Transformed::no(plan)), + } + } + fn name(&self) -> &str { + "join_filter_pushdown" + } +} + +fn unsupported_join_type(join_type: &JoinType) -> bool { + matches!( + join_type, + JoinType::Left | JoinType::RightSemi | JoinType::RightAnti + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Column; + use datafusion_expr::{ + col, logical_plan::table_scan, JoinType, LogicalPlan, LogicalPlanBuilder, + }; + use std::sync::Arc; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]) + } + + fn get_optimized_plan(plan: LogicalPlan) -> Result { + Ok(generate_optimized_plan_with_rules( + vec![Arc::new(JoinFilterPushdown::new())], + plan, + )) + } + + #[test] + fn test_inner_join_with_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_left_join_no_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_none()); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_projection() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.a"), col("t2.b")])? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Projection(projection) = optimized_plan { + if let LogicalPlan::Join(join) = projection.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation under Projection"); + } + } else { + panic!("Expected Projection operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_multiple_keys() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + ( + vec![Column::from_name("a"), Column::from_name("b")], + vec![Column::from_name("a"), Column::from_name("b")], + ), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 2); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[1] + .column + .name(), + "b" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_filter() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .filter(col("t1.b").gt(col("t1.c")))? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Filter(filter) = optimized_plan { + if let LogicalPlan::Join(join) = filter.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + } else { + panic!("Expected Join operation under Filter"); + } + } else { + panic!("Expected Filter operation"); + } + Ok(()) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 3b1df3510d2a..c983ac8ffcb4 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -45,6 +45,7 @@ pub mod eliminate_one_union; pub mod eliminate_outer_join; pub mod extract_equijoin_predicate; pub mod filter_null_join_keys; +pub mod join_filter_pushdown; pub mod optimize_projections; pub mod optimizer; pub mod propagate_empty_relation; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 08dcefa22f08..5602f992aa26 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -45,6 +45,7 @@ use crate::eliminate_one_union::EliminateOneUnion; use crate::eliminate_outer_join::EliminateOuterJoin; use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; use crate::filter_null_join_keys::FilterNullJoinKeys; +use crate::join_filter_pushdown::JoinFilterPushdown; use crate::optimize_projections::OptimizeProjections; use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; @@ -277,6 +278,7 @@ impl Optimizer { Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateGroupByConstant::new()), Arc::new(OptimizeProjections::new()), + Arc::new(JoinFilterPushdown::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 2e3bca5b0bbd..fc4e5b3d982f 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1128,6 +1128,7 @@ fn convert_cross_join_to_inner_join(cross_join: CrossJoin) -> Result { filter: None, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }) } diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index cabeafd8e7de..d712e795bf91 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -195,7 +195,7 @@ pub fn assert_optimized_plan_eq( Ok(()) } -fn generate_optimized_plan_with_rules( +pub fn generate_optimized_plan_with_rules( rules: Vec>, plan: LogicalPlan, ) -> LogicalPlan { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 47b04a876b37..cf67ceec79ad 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -24,6 +24,7 @@ use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; +use crate::expressions::binary::kernels::concat_elements_utf8view; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; use arrow::compute::kernels::cmp::*; @@ -39,8 +40,6 @@ use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested}; - -use crate::expressions::binary::kernels::concat_elements_utf8view; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7fcd719539ec..9bdbd0792301 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -55,6 +55,7 @@ datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } +datafusion-functions-aggregate = { workspace = true } futures = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/physical-plan/src/joins/dynamic_filters.rs b/datafusion/physical-plan/src/joins/dynamic_filters.rs new file mode 100644 index 000000000000..eaa1936b221e --- /dev/null +++ b/datafusion/physical-plan/src/joins/dynamic_filters.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::AsArray; +use arrow::compute::filter_record_batch; +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use datafusion_common::{exec_err, DataFusionError}; +use datafusion_expr::Accumulator; +use datafusion_expr::Operator; +use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; +use datafusion_physical_expr::PhysicalExpr; +use hashbrown::HashSet; +use parking_lot::Mutex; +use std::fmt; +use std::sync::Arc; + +pub struct DynamicFilterInfo { + columns: Vec>, + build_side_names: Vec, + inner: Mutex, +} + +struct DynamicFilterInfoInner { + max_accumulators: Vec, + min_accumulators: Vec, + final_expr: Option>, + batch_count: usize, + processed_partitions: HashSet, +} + +impl fmt::Debug for DynamicFilterInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DynamicFilterInfo") + .field("columns", &self.columns) + .field("inner", &"") + .finish() + } +} + +impl DynamicFilterInfo { + // Create a new DynamicFilterInfo instance + pub fn try_new( + columns: Vec>, + build_side_names: Vec, + data_types: Vec<&DataType>, + total_batches: usize, + ) -> Result { + let (max_accumulators, min_accumulators) = data_types + .into_iter() + .try_fold::<_, _, Result<_, DataFusionError>>( + (Vec::new(), Vec::new()), + |(mut max_acc, mut min_acc), data_type| { + max_acc.push(MaxAccumulator::try_new(data_type)?); + min_acc.push(MinAccumulator::try_new(data_type)?); + Ok((max_acc, min_acc)) + }, + )?; + + Ok(Self { + columns, + build_side_names, + inner: Mutex::new(DynamicFilterInfoInner { + max_accumulators, + min_accumulators, + final_expr: None, + batch_count: total_batches, + processed_partitions: HashSet::with_capacity(total_batches), + }), + }) + } + + // Set the final expression for the filter + pub fn with_final_expr(self, expr: Arc) -> Self { + self.inner.lock().final_expr = Some(expr); + self + } + + // Merge a new batch of data and update the max and min accumulators + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + partition: usize, + ) -> Result { + let mut inner = self.inner.lock(); + + if inner.final_expr.is_some() { + return Ok(true); + } + + if !inner.processed_partitions.insert(partition) { + return Ok(false); + } + + inner.batch_count = inner.batch_count.saturating_sub(1); + if records.num_rows() == 0 { + return Ok(false); + } + + let finalize = inner.batch_count == 0; + + let schema = records.schema(); + let columns = records.columns(); + for (i, _) in self.columns.iter().enumerate() { + let index = schema.index_of(&self.build_side_names[i])?; + let column_data = &columns[index]; + inner.max_accumulators[i] + .update_batch(&[Arc::::clone(column_data)])?; + inner.min_accumulators[i] + .update_batch(&[Arc::::clone(column_data)])?; + } + + if finalize { + drop(inner); + self.finalize_filter()?; + return Ok(true); + } + + Ok(false) + } + + // Finalize the filter by creating the final expression + fn finalize_filter(&self) -> Result<(), DataFusionError> { + let mut inner = self.inner.lock(); + let filter_expr = + self.columns.iter().enumerate().try_fold::<_, _, Result< + Option>, + DataFusionError, + >>(None, |acc, (i, column)| { + let max_value = inner.max_accumulators[i].evaluate()?; + let min_value = inner.min_accumulators[i].evaluate()?; + + let max_scalar = max_value.clone(); + let min_scalar = min_value.clone(); + + let max_expr: Arc = Arc::new(Literal::new(max_scalar)); + let min_expr: Arc = Arc::new(Literal::new(min_scalar)); + + let range_condition: Arc = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::::clone( + &min_expr, + ), + Operator::LtEq, + Arc::::clone( + column, + ), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::::clone( + column, + ), + Operator::LtEq, + Arc::::clone( + &max_expr, + ), + )), + )); + + match acc { + Some(expr) => Ok(Some(Arc::new(BinaryExpr::new( + expr, + Operator::And, + range_condition, + )) + as Arc)), + None => Ok(Some(range_condition)), + } + })?; + + let filter_expr = filter_expr.expect("Filter expression should be built"); + inner.final_expr = Some(filter_expr); + Ok(()) + } + + // Apply the filter to a batch of records + pub fn filter_batch( + &self, + records: &RecordBatch, + ) -> Result { + let filter_expr = match self.inner.lock().final_expr.as_ref() { + Some(expr) => Arc::::clone(expr), + None => { + return exec_err!( + "Filter expression should have been created before calling filter_batch" + ) + } + }; + + let boolean_array = filter_expr + .evaluate(records)? + .into_array(records.num_rows())?; + let filtered_batch = + filter_record_batch(records, boolean_array.as_ref().as_boolean())?; + + Ok(filtered_batch) + } + + // Check if the filter is empty (i.e., if the final expression has been created) + pub fn is_empty(&self) -> bool { + self.inner.lock().final_expr.is_some() + } + + // get the final expr + pub fn has_final_expr(&self) -> bool { + let inner = self.inner.lock(); + inner.final_expr.is_some() + } + + // merge the predicate + pub fn final_predicate( + &self, + predicate: Option>, + ) -> Option> { + let inner = self.inner.lock(); + + match (inner.final_expr.clone(), predicate) { + (Some(self_expr), Some(input_expr)) => Some(Arc::new(BinaryExpr::new( + self_expr, + Operator::And, + input_expr, + ))), + (Some(self_expr), None) => Some(self_expr), + (None, Some(input_expr)) => Some(input_expr), + (None, None) => None, + } + } +} + +/// used in partition mode +pub struct PartitionedDynamicFilterInfo { + partition: usize, + dynamic_filter_info: Arc, +} + +impl PartitionedDynamicFilterInfo { + pub fn new(partition: usize, dynamic_filter_info: Arc) -> Self { + Self { + partition, + dynamic_filter_info, + } + } + + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + ) -> Result { + self.dynamic_filter_info + .merge_batch_and_check_finalized(records, self.partition) + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3b730c01291c..9488d2aabca9 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use std::task::Poll; use std::{any::Any, vec}; +use super::dynamic_filters::{DynamicFilterInfo, PartitionedDynamicFilterInfo}; use super::utils::asymmetric_join_output_partitioning; use super::{ utils::{OnceAsync, OnceFut}, @@ -326,6 +327,8 @@ pub struct HashJoinExec { pub null_equals_null: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// The dynamic filter which should be pushed to probe side + pub dynamic_filters_pushdown: Option>, } impl HashJoinExec { @@ -387,6 +390,7 @@ impl HashJoinExec { column_indices, null_equals_null, cache, + dynamic_filters_pushdown: None, }) } @@ -478,7 +482,14 @@ impl HashJoinExec { self.null_equals_null, ) } - + /// adding dynamic filter info + pub fn with_dynamic_filter_info( + mut self, + dynamic_filter_info: Option>, + ) -> Self { + self.dynamic_filters_pushdown = dynamic_filter_info; + self + } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( left: &Arc, @@ -656,16 +667,19 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.on.clone(), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - self.mode, - self.null_equals_null, - )?)) + Ok(Arc::new( + HashJoinExec::try_new( + Arc::clone(&children[0]), + Arc::clone(&children[1]), + self.on.clone(), + self.filter.clone(), + &self.join_type, + self.projection.clone(), + self.mode, + self.null_equals_null, + )? + .with_dynamic_filter_info(self.dynamic_filters_pushdown.clone()), + )) } fn execute( @@ -751,6 +765,16 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let partitioned_dynamic_info = + self.dynamic_filters_pushdown + .as_ref() + .map(|dynamic_filters| { + PartitionedDynamicFilterInfo::new( + partition, + Arc::::clone(dynamic_filters), + ) + }); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_left, @@ -767,6 +791,8 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + // todo: remove this clone + dynamic_filter_info: partitioned_dynamic_info, })) } @@ -855,7 +881,6 @@ async fn collect_left_input( reservation.try_grow(estimated_hashtable_size)?; metrics.build_mem_used.add(estimated_hashtable_size); - let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -879,7 +904,6 @@ async fn collect_left_input( } // Merge all batches into a single batch, so we can directly index into the arrays let single_batch = concat_batches(&schema, batches_iter)?; - // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); @@ -1097,6 +1121,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// dynamic filters after calculating the build sides + dynamic_filter_info: Option, } impl RecordBatchStream for HashJoinStream { @@ -1294,7 +1320,6 @@ impl HashJoinStream { } /// Collects build-side data by polling `OnceFut` future from initialized build-side - /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` fn collect_build_side( &mut self, @@ -1308,10 +1333,20 @@ impl HashJoinStream { .left_fut .get_shared(cx))?; build_timer.done(); + // Merge the information to dynamic filters (if there is any) and check if it's finalized + let filter_finalized = if let Some(filter_info) = &self.dynamic_filter_info { + filter_info.merge_batch_and_check_finalized(&left_data.batch)? + } else { + true // If there's no dynamic filter, we consider it as "finalized" + }; + // If the filter is not finalized after this merge, we need to wait + if !filter_finalized { + cx.waker().wake_by_ref(); + return Poll::Pending; + } self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); - Poll::Ready(Ok(StatefulStreamResult::Continue)) } diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 6ddf19c51193..472631ffcfee 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -21,9 +21,11 @@ pub use cross_join::CrossJoinExec; pub use hash_join::HashJoinExec; pub use nested_loop_join::NestedLoopJoinExec; // Note: SortMergeJoin is not used in plans yet +pub use dynamic_filters::DynamicFilterInfo; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; +mod dynamic_filters; mod hash_join; mod nested_loop_join; mod sort_merge_join;