From 8762266dc9ab10a15a5a7684f7551393f520c05e Mon Sep 17 00:00:00 2001 From: Lordworms Date: Thu, 14 Nov 2024 18:12:36 -0800 Subject: [PATCH] init --- datafusion-cli/Cargo.lock | 2 + datafusion/common/src/config.rs | 4 + .../datasource/physical_plan/file_stream.rs | 29 +- .../datasource/physical_plan/parquet/mod.rs | 50 +- .../enforce_distribution.rs | 3 + .../join_filter_pushdown.rs | 136 +++++ .../src/physical_optimizer/join_selection.rs | 13 + datafusion/core/src/physical_optimizer/mod.rs | 2 + .../core/src/physical_optimizer/optimizer.rs | 9 +- .../physical_optimizer/projection_pushdown.rs | 2 + .../replace_with_order_preserving_variants.rs | 1 + .../core/src/physical_optimizer/test_utils.rs | 1 + datafusion/core/src/physical_planner.rs | 67 ++- datafusion/core/tests/fuzz_cases/join_fuzz.rs | 1 + datafusion/expr/src/logical_plan/builder.rs | 4 + datafusion/expr/src/logical_plan/plan.rs | 16 +- datafusion/expr/src/logical_plan/tree_node.rs | 4 + datafusion/expr/src/utils.rs | 19 + datafusion/functions-aggregate/src/min_max.rs | 4 +- datafusion/optimizer/Cargo.toml | 1 + .../optimizer/src/eliminate_cross_join.rs | 2 + .../optimizer/src/eliminate_outer_join.rs | 1 + .../src/extract_equijoin_predicate.rs | 3 + .../optimizer/src/join_filter_pushdown.rs | 316 +++++++++++ datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 2 + datafusion/optimizer/src/test/mod.rs | 2 +- .../physical-expr/src/expressions/binary.rs | 3 +- datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/src/execution_plan.rs | 14 + .../src/joins/dynamic_filters.rs | 524 ++++++++++++++++++ .../physical-plan/src/joins/hash_join.rs | 74 ++- datafusion/physical-plan/src/joins/mod.rs | 2 + .../physical-plan/src/joins/test_utils.rs | 1 + datafusion/physical-plan/src/joins/utils.rs | 56 +- datafusion/physical-plan/src/metrics/mod.rs | 1 + datafusion/physical-plan/src/metrics/value.rs | 11 + datafusion/proto/src/physical_plan/mod.rs | 1 + .../tests/cases/roundtrip_physical_plan.rs | 1 + .../sqllogictest/test_files/explain.slt | 5 + .../test_files/information_schema.slt | 2 + docs/source/user-guide/configs.md | 1 + 42 files changed, 1353 insertions(+), 39 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/join_filter_pushdown.rs create mode 100644 datafusion/optimizer/src/join_filter_pushdown.rs create mode 100644 datafusion/physical-plan/src/joins/dynamic_filters.rs diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 02bd01a49905..e0b90185708c 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1478,6 +1478,7 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-physical-expr", "hashbrown 0.14.5", "indexmap", @@ -1559,6 +1560,7 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1ad10d164868..21bb8058c99f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -636,6 +636,10 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// when set to true, datafusion would try to push the build side statistic + /// to probe phase + pub dynamic_join_pushdown: bool, default = true } } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 18cda4524ab2..a1919c8888c1 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -21,12 +21,6 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. -use std::collections::VecDeque; -use std::mem; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; @@ -35,6 +29,11 @@ use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; use crate::physical_plan::RecordBatchStream; +use std::collections::VecDeque; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; @@ -42,6 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::instant::Instant; use datafusion_common::ScalarValue; +use datafusion_physical_plan::joins::DynamicFilterInfo; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; @@ -96,6 +96,8 @@ pub struct FileStream { baseline_metrics: BaselineMetrics, /// Describes the behavior of the `FileStream` if file opening or scanning fails on_error: OnError, + /// dynamic filters + dynamic_filters: Option>, } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -273,6 +275,7 @@ impl FileStream { file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), on_error: OnError::Fail, + dynamic_filters: None, }) } @@ -284,6 +287,14 @@ impl FileStream { self.on_error = on_error; self } + /// with dynamic filters + pub fn with_dynamic_filter( + mut self, + dynamic_filter: Option>, + ) -> Self { + self.dynamic_filters = dynamic_filter; + self + } /// Begin opening the next file in parallel while decoding the current file in FileStream. /// @@ -391,7 +402,11 @@ impl FileStream { } } match ready!(reader.poll_next_unpin(cx)) { - Some(Ok(batch)) => { + Some(Ok(mut batch)) => { + // if there is a ready dynamic filter, we just use it to filter + if let Some(dynamic_filters) = &self.dynamic_filters { + batch = dynamic_filters.filter_batch(&batch)? + } self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = self diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 9dd0b9e206a9..9633d61936f2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -42,6 +42,8 @@ use crate::{ use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::joins::DynamicFilterInfo; +use datafusion_physical_plan::Metric; use itertools::Itertools; use log::debug; @@ -282,6 +284,8 @@ pub struct ParquetExec { table_parquet_options: TableParquetOptions, /// Optional user defined schema adapter schema_adapter_factory: Option>, + /// dynamic filters (like join filters) + dynamic_filters: Option>, } impl From for ParquetExecBuilder { @@ -291,7 +295,6 @@ impl From for ParquetExecBuilder { } /// [`ParquetExecBuilder`], builder for [`ParquetExec`]. -/// /// See example on [`ParquetExec`]. pub struct ParquetExecBuilder { file_scan_config: FileScanConfig, @@ -463,6 +466,7 @@ impl ParquetExecBuilder { cache, table_parquet_options, schema_adapter_factory, + dynamic_filters: None, } } } @@ -515,6 +519,7 @@ impl ParquetExec { cache: _, table_parquet_options, schema_adapter_factory, + .. } = self; ParquetExecBuilder { file_scan_config: base_config, @@ -711,10 +716,9 @@ impl DisplayAs for ParquetExec { ) }) .unwrap_or_default(); - write!(f, "ParquetExec: ")?; self.base_config.fmt_as(t, f)?; - write!(f, "{}{}", predicate_string, pruning_predicate_string,) + write!(f, "{}{}", predicate_string, pruning_predicate_string) } } } @@ -798,7 +802,16 @@ impl ExecutionPlan for ParquetExec { .schema_adapter_factory .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); - + if let Some(dynamic_filter) = &self.dynamic_filters { + let (final_expr, name) = + dynamic_filter.final_predicate(self.predicate.clone()); + if final_expr.is_some() { + self.metrics.register(Arc::new(Metric::new( + datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name), + None, + ))); + } + } let opener = ParquetOpener { partition_index, projection: Arc::from(projection), @@ -817,9 +830,9 @@ impl ExecutionPlan for ParquetExec { enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, }; - let stream = - FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?; + FileStream::new(&self.base_config, partition_index, opener, &self.metrics)? + .with_dynamic_filter(self.dynamic_filters.clone()); Ok(Box::pin(stream)) } @@ -862,8 +875,33 @@ impl ExecutionPlan for ParquetExec { cache: self.cache.clone(), table_parquet_options: self.table_parquet_options.clone(), schema_adapter_factory: self.schema_adapter_factory.clone(), + dynamic_filters: self.dynamic_filters.clone(), })) } + + fn support_dynamic_filter(&self) -> bool { + true + } + + fn with_dynamic_filter( + &self, + dynamic_filters: Option>, + ) -> Result>> { + Ok(Some(Arc::new(ParquetExec { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + metrics: self.metrics.clone(), + predicate: self.predicate.clone(), + pruning_predicate: self.pruning_predicate.clone(), + page_pruning_predicate: self.page_pruning_predicate.clone(), + metadata_size_hint: self.metadata_size_hint, + parquet_file_reader_factory: self.parquet_file_reader_factory.clone(), + cache: self.cache.clone(), + table_parquet_options: self.table_parquet_options.clone(), + schema_adapter_factory: self.schema_adapter_factory.clone(), + dynamic_filters, + }))) + } } fn should_enable_page_index( diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 82fde60de090..12fc176836eb 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -304,6 +304,7 @@ fn adjust_input_keys_ordering( projection.clone(), PartitionMode::Partitioned, *null_equals_null, + None, ) .map(|e| Arc::new(e) as _) }; @@ -632,6 +633,7 @@ pub(crate) fn reorder_join_keys_to_inputs( projection.clone(), PartitionMode::Partitioned, *null_equals_null, + None, )?)); } } @@ -1696,6 +1698,7 @@ pub(crate) mod tests { None, PartitionMode::Partitioned, false, + None, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs new file mode 100644 index 000000000000..4bd706547471 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pushdown the dynamic join filters down to scan execution if there is any + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::datasource::physical_plan::ParquetExec; +use crate::physical_plan::ExecutionPlan; +use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec}; +use datafusion_common::tree_node::{Transformed, TransformedResult}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::joins::DynamicFilterInfo; + +/// this rule used for pushing the build side statistic down to probe phase +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for JoinFilterPushdown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.dynamic_join_pushdown { + return Ok(plan); + } + + let mut filters_stack = VecDeque::new(); + optimize_impl(plan, &mut filters_stack).data() + } + + fn name(&self) -> &str { + "JoinFilterPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn optimize_impl( + plan: Arc, + join_filters_stack: &mut VecDeque>, +) -> Result>> { + if let Some(hashjoin_exec) = plan.as_any().downcast_ref::() { + // Push current join's filters to the stack if they exist + if let Some(filters) = &hashjoin_exec.dynamic_filters_pushdown { + join_filters_stack.push_back(Arc::clone(filters)); + } + + // Optimize both sides + let new_right = optimize_impl( + Arc::::clone(&hashjoin_exec.right), + join_filters_stack, + )?; + + let new_left = optimize_impl( + Arc::::clone(&hashjoin_exec.left), + join_filters_stack, + )?; + + // Pop the filters after processing both sides + if hashjoin_exec.dynamic_filters_pushdown.is_some() { + join_filters_stack.pop_back(); + } + + if new_left.transformed || new_right.transformed { + let new_hash_join = Arc::new(HashJoinExec::try_new( + new_left.data, + new_right.data, + hashjoin_exec.on.clone(), + hashjoin_exec.filter().cloned(), + hashjoin_exec.join_type(), + hashjoin_exec.projection.clone(), + *hashjoin_exec.partition_mode(), + hashjoin_exec.null_equals_null(), + hashjoin_exec.dynamic_filters_pushdown.clone(), + )?); + return Ok(Transformed::yes(new_hash_join)); + } + Ok(Transformed::no(plan)) + } else if let Some(parquet_exec) = plan.as_any().downcast_ref::() { + if let Some(filter) = join_filters_stack.pop_front() { + let final_exec = parquet_exec.clone().with_dynamic_filter(Some(filter))?; + if let Some(new_plan) = final_exec { + return Ok(Transformed::yes(new_plan)); + } + } + Ok(Transformed::no(plan)) + } else { + let children = plan.children(); + let mut new_children = Vec::with_capacity(children.len()); + let mut transformed = false; + + for child in children { + let new_child = optimize_impl( + Arc::::clone(child), + join_filters_stack, + )?; + if new_child.transformed { + transformed = true; + } + new_children.push(new_child.data); + } + + if transformed { + let new_plan = plan.with_new_children(new_children)?; + Ok(Transformed::yes(new_plan)) + } else { + Ok(Transformed::no(plan)) + } + } +} diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 9b2402c6bb87..e72779f9f43a 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -199,6 +199,7 @@ pub fn swap_hash_join( ), partition_mode, hash_join.null_equals_null(), + None, )?; // In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again if matches!( @@ -396,6 +397,7 @@ fn try_collect_left( hash_join.projection.clone(), PartitionMode::CollectLeft, hash_join.null_equals_null(), + hash_join.dynamic_filters_pushdown.clone(), )?))) } } @@ -408,6 +410,7 @@ fn try_collect_left( hash_join.projection.clone(), PartitionMode::CollectLeft, hash_join.null_equals_null(), + hash_join.dynamic_filters_pushdown.clone(), )?))), (false, true) => { if supports_swap(*hash_join.join_type()) { @@ -436,6 +439,7 @@ fn partitioned_hash_join(hash_join: &HashJoinExec) -> Result = Arc::new(ProjectionExec::try_new( vec![ diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 7fc3adf784e2..a53ee030c36c 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1452,6 +1452,7 @@ mod tests { None, PartitionMode::Partitioned, false, + None, ) .unwrap(), ) diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 88bb0b6fef23..ac83d0922d90 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -233,6 +233,7 @@ pub fn hash_join_exec( None, PartitionMode::Partitioned, true, + None, )?)) } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 26f6b12908a7..10a074efc383 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,10 +17,6 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::Arc; - use crate::datasource::file_format::file_type_to_format; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -60,6 +56,10 @@ use crate::physical_plan::{ displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, }; +use datafusion_physical_plan::joins::DynamicFilterInfo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; @@ -81,7 +81,7 @@ use datafusion_expr::{ SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -342,6 +342,7 @@ impl DefaultPhysicalPlanner { ); } let plan = outputs.pop().unwrap(); + Ok(plan) } @@ -369,6 +370,7 @@ impl DefaultPhysicalPlanner { ChildrenContainer::None, ) .await?; + let mut current_index = leaf_starter_index; // parent_index is None only for root while let Some(parent_index) = node.parent_index { @@ -861,6 +863,7 @@ impl DefaultPhysicalPlanner { join_type, null_equals_null, schema: join_schema, + dynamic_pushdown_columns, .. }) => { let null_equals_null = *null_equals_null; @@ -1063,8 +1066,8 @@ impl DefaultPhysicalPlanner { let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; - - let join: Arc = if join_on.is_empty() { + let right_schema = physical_right.schema(); + let mut join: Arc = if join_on.is_empty() { if join_filter.is_none() && matches!(join_type, JoinType::Inner) { // cross join if there is no join conditions and no join filter set Arc::new(CrossJoinExec::new(physical_left, physical_right)) @@ -1114,6 +1117,7 @@ impl DefaultPhysicalPlanner { None, partition_mode, null_equals_null, + None, )?) } else { Arc::new(HashJoinExec::try_new( @@ -1125,9 +1129,58 @@ impl DefaultPhysicalPlanner { None, PartitionMode::CollectLeft, null_equals_null, + None, )?) }; + // build dynamic filter + if join.support_dynamic_filter() + && dynamic_pushdown_columns + .as_ref() + .is_some_and(|columns| !columns.is_empty()) + { + let physical_dynamic_filter_info: Option> = + if let Some(dynamic_columns) = dynamic_pushdown_columns { + let columns_and_types_and_names: Vec<(Arc, String)> = + dynamic_columns + .iter() + .map(|dynamic_column| { + let column = dynamic_column.column(); + let index = + right_schema.index_of(column.name())?; + let physical_column = + Arc::new(Column::new(&column.name, index)); + let build_side_name = + dynamic_column.build_name().to_owned(); + Ok((physical_column, build_side_name)) + }) + .collect::>()?; + + let (physical_columns, build_side_names) = + columns_and_types_and_names.into_iter().fold( + (Vec::new(), Vec::new()), + |(mut cols, mut names), (col, name)| { + cols.push(col); + names.push(name); + (cols, names) + }, + ); + + Some(Arc::new(DynamicFilterInfo::try_new( + physical_columns, + build_side_names, + )?)) + } else { + None + }; + println!( + "physical dynamic filter is {:?}", + physical_dynamic_filter_info + ); + join = join + .with_dynamic_filter(physical_dynamic_filter_info)? + .map_or(join, |plan| plan); + } // If plan was mutated previously then need to create the ExecutionPlan // for the new Projection that was applied on top. if let Some((input, expr)) = new_project { diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs b/datafusion/core/tests/fuzz_cases/join_fuzz.rs index cf1742a30e66..5f9cbad84bc1 100644 --- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs @@ -447,6 +447,7 @@ impl JoinFuzzTestCase { None, PartitionMode::Partitioned, false, + None, ) .unwrap(), ) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 90235e3f84c4..75f5dc66099e 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -976,6 +976,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, + dynamic_pushdown_columns: None, }))) } @@ -1040,6 +1041,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } } @@ -1057,6 +1059,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, null_equals_null: false, schema: DFSchemaRef::new(join_schema), + dynamic_pushdown_columns: None, }))) } @@ -1275,6 +1278,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6ee99b22c7f3..1fce5f2df57a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -36,7 +36,7 @@ use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan, find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, - split_conjunction, + split_conjunction, DynamicFilterColumn, }; use crate::{ build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute, @@ -632,6 +632,7 @@ impl LogicalPlan { on, schema: _, null_equals_null, + dynamic_pushdown_columns, }) => { let schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -653,6 +654,7 @@ impl LogicalPlan { filter, schema: DFSchemaRef::new(schema), null_equals_null, + dynamic_pushdown_columns, })) } LogicalPlan::Subquery(_) => Ok(self), @@ -906,6 +908,7 @@ impl LogicalPlan { filter: filter_expr, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, + dynamic_pushdown_columns: None, })) } LogicalPlan::Subquery(Subquery { @@ -3248,6 +3251,8 @@ pub struct Join { pub schema: DFSchemaRef, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// store the column which we could try to push down to scan dynamically + pub dynamic_pushdown_columns: Option>, } impl Join { @@ -3281,8 +3286,17 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equals_null: original_join.null_equals_null, + dynamic_pushdown_columns: None, }) } + + pub fn with_dynamic_pushdown_columns( + mut self, + pushdown_columns: Vec, + ) -> Self { + self.dynamic_pushdown_columns = Some(pushdown_columns); + self + } } // Manual implementation needed because of `schema` field. Comparison excludes this field. diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index e7dfe8791924..25bc283d06bf 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -143,6 +143,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( rewrite_arc(left, &mut f), right, @@ -158,6 +159,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::Limit(Limit { skip, fetch, input }) => rewrite_arc(input, f)? @@ -621,6 +623,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( on.into_iter().map_until_stop_and_collect( |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1)) @@ -640,6 +643,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index c22ee244fe28..45b403d8cb7f 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1395,6 +1395,25 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } +/// contain the left column name(build side) and actual column side(probe side) +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DynamicFilterColumn { + pub build_name: String, + pub column: Column, +} + +impl DynamicFilterColumn { + pub fn new(build_name: String, column: Column) -> Self { + Self { build_name, column } + } + pub fn column(&self) -> &Column { + &self.column + } + pub fn build_name(&self) -> &str { + &self.build_name + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index b497953bc591..c69678e5849f 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -889,7 +889,7 @@ macro_rules! min_max { } /// An accumulator to compute the maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MaxAccumulator { max: ScalarValue, } @@ -1196,7 +1196,7 @@ fn get_min_doc() -> &'static Documentation { } /// An accumulator to compute the minimum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MinAccumulator { min: ScalarValue, } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 34e35c66107a..456b526afb9d 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -41,6 +41,7 @@ async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 32b7ce44a63a..b1aa5123e5ab 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -330,6 +330,7 @@ fn find_inner_join( filter: None, schema: join_schema, null_equals_null: false, + dynamic_pushdown_columns: None, })); } } @@ -352,6 +353,7 @@ fn find_inner_join( join_type: JoinType::Inner, join_constraint: JoinConstraint::On, null_equals_null: false, + dynamic_pushdown_columns: None, })) } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 1ecb32ca2a43..a23f4239acbe 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin { filter: join.filter.clone(), schema: Arc::clone(&join.schema), null_equals_null: join.null_equals_null, + dynamic_pushdown_columns: None, })); Filter::try_new(filter.predicate, new_join) .map(|f| Transformed::yes(LogicalPlan::Filter(f))) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 48191ec20631..efdeb6d197e8 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -93,6 +94,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } else { Ok(Transformed::no(LogicalPlan::Join(Join { @@ -104,6 +106,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } } diff --git a/datafusion/optimizer/src/join_filter_pushdown.rs b/datafusion/optimizer/src/join_filter_pushdown.rs new file mode 100644 index 000000000000..accd8aa7699e --- /dev/null +++ b/datafusion/optimizer/src/join_filter_pushdown.rs @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JoinFilterPushdown`] pushdown join filter to scan dynamically + +use arrow::datatypes::DataType; +use datafusion_common::{tree_node::Transformed, DataFusionError, ExprSchema}; +use datafusion_expr::{utils::DynamicFilterColumn, Expr, JoinType, LogicalPlan}; + +use crate::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}; + +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for JoinFilterPushdown { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + if !config.options().optimizer.dynamic_join_pushdown + //|| !config.options().execution.parquet.pushdown_filters + { + return Ok(Transformed::no(plan)); + } + + match plan { + LogicalPlan::Join(mut join) => { + if unsupported_join_type(&(join.join_type)) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + let mut columns = Vec::new(); + let mut build_side_names = Vec::new(); + // Iterate the on clause and generate the filter info + for (left, right) in join.on.iter() { + // Only support left to be a column + if let (Expr::Column(l), Expr::Column(r)) = (left, right) { + let data_type = join.schema.data_type(l)?; + // Todo: currently only support numeric data type + if data_type.is_numeric() || matches!(data_type, DataType::Utf8) { + columns.push(r.clone()); + build_side_names.push(l.name().to_owned()); + } + } + } + + let mut probe = join.right.as_ref(); + // On the probe sides, we want to make sure that we can push the filter to the probe side + loop { + if matches!(probe, LogicalPlan::TableScan(_)) { + break; + } + match probe { + LogicalPlan::Limit(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Distinct(_) => { + probe = probe.inputs()[0]; + } + LogicalPlan::Projection(project) => { + for column in &columns { + if !project.schema.has_column(column) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + } + probe = probe.inputs()[0]; + } + _ => return Ok(Transformed::no(LogicalPlan::Join(join))), + } + } + let dynamic_columns = columns + .into_iter() + .zip(build_side_names) + .map(|(column, name)| DynamicFilterColumn::new(name, column)) + .collect::>(); + // Assign the value + join = join.with_dynamic_pushdown_columns(dynamic_columns); + Ok(Transformed::yes(LogicalPlan::Join(join))) + } + _ => Ok(Transformed::no(plan)), + } + } + fn name(&self) -> &str { + "join_filter_pushdown" + } +} + +fn unsupported_join_type(join_type: &JoinType) -> bool { + matches!( + join_type, + JoinType::Left + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Column; + use datafusion_expr::{ + col, logical_plan::table_scan, JoinType, LogicalPlan, LogicalPlanBuilder, + }; + use std::sync::Arc; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]) + } + + fn get_optimized_plan(plan: LogicalPlan) -> Result { + Ok(generate_optimized_plan_with_rules( + vec![Arc::new(JoinFilterPushdown::new())], + plan, + )) + } + + #[test] + fn test_inner_join_with_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_left_join_no_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_none()); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_projection() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.a"), col("t2.b")])? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Projection(projection) = optimized_plan { + if let LogicalPlan::Join(join) = projection.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation under Projection"); + } + } else { + panic!("Expected Projection operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_multiple_keys() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + ( + vec![Column::from_name("a"), Column::from_name("b")], + vec![Column::from_name("a"), Column::from_name("b")], + ), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 2); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[1] + .column + .name(), + "b" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_filter() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .filter(col("t1.b").gt(col("t1.c")))? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Filter(filter) = optimized_plan { + if let LogicalPlan::Join(join) = filter.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + } else { + panic!("Expected Join operation under Filter"); + } + } else { + panic!("Expected Filter operation"); + } + Ok(()) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 263770b81fcd..23c27b773c0c 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -46,6 +46,7 @@ pub mod eliminate_one_union; pub mod eliminate_outer_join; pub mod extract_equijoin_predicate; pub mod filter_null_join_keys; +pub mod join_filter_pushdown; pub mod optimize_projections; pub mod optimizer; pub mod propagate_empty_relation; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 975150cd6122..b37c9a4d7bc3 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -44,6 +44,7 @@ use crate::eliminate_one_union::EliminateOneUnion; use crate::eliminate_outer_join::EliminateOuterJoin; use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; use crate::filter_null_join_keys::FilterNullJoinKeys; +use crate::join_filter_pushdown::JoinFilterPushdown; use crate::optimize_projections::OptimizeProjections; use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; @@ -270,6 +271,7 @@ impl Optimizer { Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateGroupByConstant::new()), Arc::new(OptimizeProjections::new()), + Arc::new(JoinFilterPushdown::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 94d07a0791b3..6e37e83d68f1 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -181,7 +181,7 @@ pub fn assert_optimized_plan_eq( Ok(()) } -fn generate_optimized_plan_with_rules( +pub fn generate_optimized_plan_with_rules( rules: Vec>, plan: LogicalPlan, ) -> LogicalPlan { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index ae2bfe5b0bd4..83d15b3675fe 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -23,6 +23,7 @@ use std::{any::Any, sync::Arc}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::PhysicalExpr; +use crate::expressions::binary::kernels::concat_elements_utf8view; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; use arrow::compute::kernels::cmp::*; @@ -38,8 +39,6 @@ use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested}; - -use crate::expressions::binary::kernels::concat_elements_utf8view; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 64fd0f49a233..b1dcd2d96bf4 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -51,6 +51,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 7220e7594ea6..100bde188339 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -43,6 +43,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +use crate::joins::DynamicFilterInfo; pub use crate::metrics::Metric; use crate::metrics::MetricsSet; pub use crate::ordering::InputOrderMode; @@ -422,6 +423,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown } + + /// whether it is a physical scan or not + fn support_dynamic_filter(&self) -> bool { + false + } + + /// return a new execution plan with dynamic filter + fn with_dynamic_filter( + &self, + _dynamic_filters: Option>, + ) -> Result>> { + Ok(None) + } } /// Extension trait provides an easy API to fetch various properties of diff --git a/datafusion/physical-plan/src/joins/dynamic_filters.rs b/datafusion/physical-plan/src/joins/dynamic_filters.rs new file mode 100644 index 000000000000..d93dd01b1112 --- /dev/null +++ b/datafusion/physical-plan/src/joins/dynamic_filters.rs @@ -0,0 +1,524 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::utils::JoinHashMap; +use arrow::array::{AsArray, BooleanBuilder}; +use arrow::array::{ + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, +}; +use arrow::compute::filter_record_batch; +use arrow::compute::kernels::aggregate::{max, max_string, min, min_string}; + +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use arrow_array::StringArray; +use arrow_array::{Array, ArrayRef}; +use datafusion_common::{arrow_err, DataFusionError, ScalarValue}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, InListExpr, IsNullExpr, Literal, +}; +use datafusion_physical_expr::PhysicalExpr; +use hashbrown::HashSet; +use parking_lot::Mutex; +use std::fmt; +use std::sync::Arc; + +const UNIQUE_VALUES_THRESHOLD: usize = 0; + +macro_rules! process_unique_values { + ($array:expr, $array_type:ty, $scalar_type:ident, $unique_set:expr, $should_track:expr) => {{ + if let Some(array) = $array.as_any().downcast_ref::<$array_type>() { + if $should_track { + let mut has_null = false; + for value in array.iter() { + match value { + Some(value) => { + $unique_set.insert(ScalarValue::$scalar_type(Some(value))); + if $unique_set.len() > UNIQUE_VALUES_THRESHOLD { + $unique_set.clear(); + return Ok(false); + } + } + None => { + has_null = true; + } + } + } + if has_null { + $unique_set.insert(ScalarValue::$scalar_type(None)); + } + } + } + Ok(true) + }}; +} +macro_rules! process_min_max { + ($array:expr, $array_type:ty, $scalar_type:ident) => {{ + if let Some(array) = $array.as_any().downcast_ref::<$array_type>() { + let min = min(array) + .ok_or_else(|| DataFusionError::Internal("Empty array".to_string()))?; + let max = max(array) + .ok_or_else(|| DataFusionError::Internal("Empty array".to_string()))?; + Ok(( + ScalarValue::$scalar_type(Some(min)), + ScalarValue::$scalar_type(Some(max)), + )) + } else { + Err(DataFusionError::Internal("Invalid array type".to_string())) + } + }}; +} + +struct DynamicFilterInfoInner { + unique_values: Vec>, + value_ranges: Vec>, + batches: Vec>>, + final_expr: Option>, + batch_count: usize, + processed_partitions: HashSet, + should_track_unique: Vec, +} + +pub struct DynamicFilterInfo { + columns: Vec>, + build_side_names: Vec, + inner: Mutex, +} + +impl DynamicFilterInfo { + pub fn try_new( + columns: Vec>, + build_side_names: Vec, + ) -> Result { + let col_count = columns.len(); + Ok(Self { + columns, + build_side_names, + inner: Mutex::new(DynamicFilterInfoInner { + unique_values: vec![HashSet::new(); col_count], + value_ranges: vec![None; col_count], + batches: vec![Vec::new(); col_count], + final_expr: None, + batch_count: 0, + processed_partitions: HashSet::new(), + should_track_unique: vec![true; col_count], + }), + }) + } + + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + partition: usize, + ) -> Result { + let mut inner = self.inner.lock(); + if inner.final_expr.is_some() + || (inner.processed_partitions.contains(&partition) + && records.num_rows() == 0) + { + return Ok(true); + } + if !inner.processed_partitions.insert(partition) { + return Ok(false); + } + + inner.batch_count = inner.batch_count.saturating_sub(1); + let finalize = inner.batch_count == 0; + + let schema = records.schema(); + let columns = records.columns(); + + for (i, _) in self.columns.iter().enumerate() { + let index = schema.index_of(&self.build_side_names[i])?; + let column_data = &columns[index]; + + let should_track = inner.should_track_unique[i]; + + if should_track { + let still_tracking = update_unique_values( + &mut inner.unique_values[i], + column_data, + should_track, + )?; + + if !still_tracking { + inner.should_track_unique[i] = false; + } + } + + update_range_stats(&mut inner.value_ranges[i], column_data)?; + inner.batches[i].push(Arc::clone(column_data)); + } + + if finalize { + drop(inner); + self.finalize_filter()?; + return Ok(true); + } + + Ok(false) + } + + fn finalize_filter(&self) -> Result<(), DataFusionError> { + let mut inner = self.inner.lock(); + let filter_expr = + self.columns.iter().enumerate().try_fold::<_, _, Result< + Option>, + DataFusionError, + >>(None, |acc, (i, column)| { + let unique_values = &inner.unique_values[i]; + let value_range = &inner.value_ranges[i]; + let should_track_unique = inner.should_track_unique[i]; + + let use_unique_list = should_track_unique + && !unique_values.is_empty() + && !is_continuous_type(inner.batches[i][0].data_type()); + + let column_expr = if use_unique_list { + let values: Vec> = unique_values + .iter() + .cloned() + .map(|value| { + Arc::new(Literal::new(value)) as Arc + }) + .collect(); + Arc::new(InListExpr::new( + Arc::::clone(column), + values, + false, + None, + )) as Arc + } else { + match value_range { + Some((min_value, max_value)) => Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(min_value.clone())), + Operator::LtEq, + Arc::::clone(column), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::::clone(column), + Operator::LtEq, + Arc::new(Literal::new(max_value.clone())), + )), + )) + as Arc, + None => Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + )) as Arc, + } + }; + + match acc { + Some(expr) => Ok(Some(Arc::new(BinaryExpr::new( + expr, + Operator::And, + column_expr, + )))), + None => Ok(Some(column_expr)), + } + })?; + let final_expr = match filter_expr { + Some(expr) => Arc::new(BinaryExpr::new( + expr, + Operator::Or, + Arc::new(IsNullExpr::new(Arc::::clone(&self.columns[0]))), + )) as Arc, + None => Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + )) as Arc, + }; + + inner.final_expr = Some(final_expr); + Ok(()) + } + + pub fn add_count(&self) -> Result<(), DataFusionError> { + let mut inner = self.inner.lock(); + inner.batch_count = inner.batch_count.saturating_add(1); + Ok(()) + } + + pub fn filter_batch( + &self, + records: &RecordBatch, + ) -> Result { + let filter_expr = match self.inner.lock().final_expr.as_ref() { + Some(expr) => Arc::clone(expr), + None => return Err(DataFusionError::Internal( + "Filter expression should have been created before calling filter_batch" + .to_string(), + )), + }; + let boolean_array = filter_expr + .evaluate(records)? + .into_array(records.num_rows())?; + + Ok(filter_record_batch( + records, + boolean_array.as_ref().as_boolean(), + )?) + } + + pub fn has_final_expr(&self) -> bool { + self.inner.lock().final_expr.is_some() + } + + pub fn final_predicate( + &self, + predicate: Option>, + ) -> (Option>, String) { + let inner = self.inner.lock(); + + let result = match (inner.final_expr.clone(), predicate) { + (Some(self_expr), Some(input_expr)) => { + Some( + Arc::new(BinaryExpr::new(self_expr, Operator::And, input_expr)) + as Arc, + ) + } + (Some(self_expr), None) => Some(self_expr), + (None, Some(input_expr)) => Some(input_expr), + (None, None) => None, + }; + + let debug_info = inner + .final_expr + .as_ref() + .map(|expr| format!("{}", expr)) + .unwrap_or_default(); + + (result, debug_info) + } +} + +fn update_unique_values( + unique_set: &mut HashSet, + array: &dyn Array, + should_track: bool, +) -> Result { + if !should_track { + return Ok(false); + } + + match array.data_type() { + DataType::Int8 => { + process_unique_values!(array, Int8Array, Int8, unique_set, should_track) + } + DataType::Int16 => { + process_unique_values!(array, Int16Array, Int16, unique_set, should_track) + } + DataType::Int32 => { + process_unique_values!(array, Int32Array, Int32, unique_set, should_track) + } + DataType::Int64 => { + process_unique_values!(array, Int64Array, Int64, unique_set, should_track) + } + DataType::UInt8 => { + process_unique_values!(array, UInt8Array, UInt8, unique_set, should_track) + } + DataType::UInt16 => { + process_unique_values!(array, UInt16Array, UInt16, unique_set, should_track) + } + DataType::UInt32 => { + process_unique_values!(array, UInt32Array, UInt32, unique_set, should_track) + } + DataType::UInt64 => { + process_unique_values!(array, UInt64Array, UInt64, unique_set, should_track) + } + DataType::Float32 => { + process_unique_values!(array, Float32Array, Float32, unique_set, should_track) + } + DataType::Float64 => { + process_unique_values!(array, Float64Array, Float64, unique_set, should_track) + } + DataType::Utf8 => { + if let Some(array) = array.as_any().downcast_ref::() { + if should_track { + for value in array.iter().flatten() { + unique_set.insert(ScalarValue::Utf8(Some(value.to_string()))); + if unique_set.len() > UNIQUE_VALUES_THRESHOLD { + unique_set.clear(); + return Ok(false); + } + } + } + } + Ok(true) + } + _ => Err(DataFusionError::NotImplemented(format!( + "Unique value tracking not implemented for type {}", + array.data_type() + ))), + } +} + +fn compute_min_max( + array: &dyn Array, +) -> Result<(ScalarValue, ScalarValue), DataFusionError> { + match array.data_type() { + DataType::Int8 => process_min_max!(array, Int8Array, Int8), + DataType::Int16 => process_min_max!(array, Int16Array, Int16), + DataType::Int32 => process_min_max!(array, Int32Array, Int32), + DataType::Int64 => process_min_max!(array, Int64Array, Int64), + DataType::UInt8 => process_min_max!(array, UInt8Array, UInt8), + DataType::UInt16 => process_min_max!(array, UInt16Array, UInt16), + DataType::UInt32 => process_min_max!(array, UInt32Array, UInt32), + DataType::UInt64 => process_min_max!(array, UInt64Array, UInt64), + DataType::Float32 => process_min_max!(array, Float32Array, Float32), + DataType::Float64 => process_min_max!(array, Float64Array, Float64), + DataType::Utf8 => { + if let Some(array) = array.as_any().downcast_ref::() { + let min = min_string(array).ok_or_else(|| { + DataFusionError::Internal("Empty array".to_string()) + })?; + let max = max_string(array).ok_or_else(|| { + DataFusionError::Internal("Empty array".to_string()) + })?; + Ok(( + ScalarValue::Utf8(Some(min.to_string())), + ScalarValue::Utf8(Some(max.to_string())), + )) + } else { + Err(DataFusionError::Internal("Invalid array type".to_string())) + } + } + _ => Err(DataFusionError::NotImplemented(format!( + "Min/Max not implemented for type {}", + array.data_type() + ))), + } +} + +fn update_range_stats( + range: &mut Option<(ScalarValue, ScalarValue)>, + array: &dyn Array, +) -> Result<(), DataFusionError> { + if array.is_empty() { + return Ok(()); + } + let (min, max) = compute_min_max(array)?; + + *range = match range.take() { + Some((curr_min, curr_max)) => { + let min_value = match curr_min.partial_cmp(&min) { + Some(std::cmp::Ordering::Less) | Some(std::cmp::Ordering::Equal) => { + curr_min + } + _ => min, + }; + let max_value = match curr_max.partial_cmp(&max) { + Some(std::cmp::Ordering::Greater) | Some(std::cmp::Ordering::Equal) => { + curr_max + } + _ => max, + }; + Some((min_value, max_value)) + } + None => Some((min, max)), + }; + + Ok(()) +} + +fn is_continuous_type(data_type: &DataType) -> bool { + matches!(data_type, DataType::Float32 | DataType::Float64) +} + +pub struct PartitionedDynamicFilterInfo { + partition: usize, + dynamic_filter_info: Arc, +} + +impl PartitionedDynamicFilterInfo { + pub fn new(partition: usize, dynamic_filter_info: Arc) -> Self { + Self { + partition, + dynamic_filter_info, + } + } + + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + ) -> Result { + self.dynamic_filter_info + .merge_batch_and_check_finalized(records, self.partition) + } + + pub fn _filter_probe_batch( + &self, + batch: &RecordBatch, + hashes: &[u64], + hash_map: &JoinHashMap, + ) -> Result<(RecordBatch, Vec), DataFusionError> { + let left_hash_set = hash_map.extract_unique_keys(); + + let mut mask_builder = BooleanBuilder::new(); + for hash in hashes.iter() { + mask_builder.append_value(left_hash_set.contains(hash)); + } + let mask = mask_builder.finish(); + + let filtered_columns = batch + .columns() + .iter() + .map(|col| match arrow::compute::filter(col, &mask) { + Ok(array) => Ok(array), + Err(e) => arrow_err!(e), + }) + .collect::, DataFusionError>>()?; + + let filtered_batch = RecordBatch::try_new(batch.schema(), filtered_columns)?; + + let filtered_hashes = hashes + .iter() + .zip(mask.iter()) + .filter_map(|(hash, keep)| { + keep.and_then(|k| if k { Some(*hash) } else { None }) + }) + .collect(); + + Ok((filtered_batch, filtered_hashes)) + } +} + +impl fmt::Debug for PartitionedDynamicFilterInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PartitionedDynamicFilterInfo") + .field("partition", &self.partition) + .field("dynamic_filter_info", &"") + .finish() + } +} + +impl fmt::Debug for DynamicFilterInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DynamicFilterInfo") + .field("columns", &self.columns) + .field("build_side_names", &self.build_side_names) + .field("inner", &"") + .finish() + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8ab292c14269..633a9a9206c4 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use std::task::Poll; use std::{any::Any, vec}; +use super::dynamic_filters::{DynamicFilterInfo, PartitionedDynamicFilterInfo}; use super::utils::asymmetric_join_output_partitioning; use super::{ utils::{OnceAsync, OnceFut}, @@ -338,6 +339,8 @@ pub struct HashJoinExec { pub null_equals_null: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// The dynamic filter which should be pushed to probe side + pub dynamic_filters_pushdown: Option>, } impl HashJoinExec { @@ -355,6 +358,7 @@ impl HashJoinExec { projection: Option>, partition_mode: PartitionMode, null_equals_null: bool, + dynamic_filters: Option>, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -399,6 +403,7 @@ impl HashJoinExec { column_indices, null_equals_null, cache, + dynamic_filters_pushdown: dynamic_filters, }) } @@ -479,7 +484,7 @@ impl HashJoinExec { }, None => None, }; - Self::try_new( + let hash_join = Self::try_new( Arc::clone(&self.left), Arc::clone(&self.right), self.on.clone(), @@ -488,7 +493,9 @@ impl HashJoinExec { projection, self.mode, self.null_equals_null, - ) + self.dynamic_filters_pushdown.clone(), + )?; + Ok(hash_join) } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -613,6 +620,27 @@ impl ExecutionPlan for HashJoinExec { self } + fn support_dynamic_filter(&self) -> bool { + true + } + + fn with_dynamic_filter( + &self, + dynamic_filter_info: Option>, + ) -> Result>> { + let plan = HashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + &self.join_type, + self.projection.clone(), + self.mode, + self.null_equals_null, + dynamic_filter_info, + )?; + Ok(Some(Arc::new(plan))) + } fn properties(&self) -> &PlanProperties { &self.cache } @@ -669,7 +697,7 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( + let plan = HashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), self.on.clone(), @@ -678,7 +706,9 @@ impl ExecutionPlan for HashJoinExec { self.projection.clone(), self.mode, self.null_equals_null, - )?)) + self.dynamic_filters_pushdown.clone(), + )?; + Ok(Arc::new(plan)) } fn execute( @@ -696,6 +726,10 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.1)) .collect::>(); + // if there is dynamic filters, we add the counter + if let Some(dynamic_filters) = &self.dynamic_filters_pushdown { + dynamic_filters.add_count()? + } let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -764,6 +798,16 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let partitioned_dynamic_info = + self.dynamic_filters_pushdown + .as_ref() + .map(|dynamic_filters| { + PartitionedDynamicFilterInfo::new( + partition, + Arc::::clone(dynamic_filters), + ) + }); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_left, @@ -780,6 +824,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + dynamic_filter_info: partitioned_dynamic_info, })) } @@ -859,7 +904,6 @@ async fn collect_left_input( reservation.try_grow(estimated_hashtable_size)?; metrics.build_mem_used.add(estimated_hashtable_size); - let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -883,7 +927,6 @@ async fn collect_left_input( } // Merge all batches into a single batch, so we can directly index into the arrays let single_batch = concat_batches(&schema, batches_iter)?; - // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); @@ -1101,6 +1144,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// dynamic filters after calculating the build sides + dynamic_filter_info: Option, } impl RecordBatchStream for HashJoinStream { @@ -1298,7 +1343,6 @@ impl HashJoinStream { } /// Collects build-side data by polling `OnceFut` future from initialized build-side - /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` fn collect_build_side( &mut self, @@ -1312,10 +1356,20 @@ impl HashJoinStream { .left_fut .get_shared(cx))?; build_timer.done(); + // Merge the information to dynamic filters (if there is any) and check if it's finalized + let filter_finalized = if let Some(filter_info) = &self.dynamic_filter_info { + filter_info.merge_batch_and_check_finalized(left_data.batch())? + } else { + true // If there's no dynamic filter, we consider it as "finalized" + }; + // If the filter is not finalized after this merge, we need to wait + if !filter_finalized { + cx.waker().wake_by_ref(); + return Poll::Pending; + } self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); - Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1598,6 +1652,7 @@ mod tests { None, PartitionMode::CollectLeft, null_equals_null, + None, ) } @@ -1618,6 +1673,7 @@ mod tests { None, PartitionMode::CollectLeft, null_equals_null, + None, ) } @@ -1715,6 +1771,7 @@ mod tests { None, partition_mode, null_equals_null, + None, )?; let columns = columns(&join.schema()); @@ -3965,6 +4022,7 @@ mod tests { None, PartitionMode::Partitioned, false, + None, )?; let stream = join.execute(1, task_ctx)?; diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 6ddf19c51193..472631ffcfee 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -21,9 +21,11 @@ pub use cross_join::CrossJoinExec; pub use hash_join::HashJoinExec; pub use nested_loop_join::NestedLoopJoinExec; // Note: SortMergeJoin is not used in plans yet +pub use dynamic_filters::DynamicFilterInfo; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; +mod dynamic_filters; mod hash_join; mod nested_loop_join; mod sort_merge_join; diff --git a/datafusion/physical-plan/src/joins/test_utils.rs b/datafusion/physical-plan/src/joins/test_utils.rs index 421fd0da808c..17ae4af774b2 100644 --- a/datafusion/physical-plan/src/joins/test_utils.rs +++ b/datafusion/physical-plan/src/joins/test_utils.rs @@ -153,6 +153,7 @@ pub async fn partitioned_hash_join_with_filter( None, PartitionMode::Partitioned, null_equals_null, + None, )?); let mut batches = vec![]; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 0366c9fa5e46..0b777949b554 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -140,6 +140,18 @@ impl JoinHashMap { next: vec![0; capacity], } } + + /// extract all unique keys of this join hash map + pub fn extract_unique_keys(&self) -> HashSet { + let mut unique_keys = HashSet::new(); + unsafe { + self.map.iter().for_each(|entry| { + let (hash, _) = entry.as_ref(); + unique_keys.insert(hash.to_owned()); + }) + }; + unique_keys + } } // Type of offsets for obtaining indices from JoinHashMap. @@ -371,8 +383,48 @@ impl JoinHashMapType for JoinHashMap { } impl Debug for JoinHashMap { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { - Ok(()) + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "JoinHashMap {{")?; + writeln!(f, " map:")?; + writeln!(f, " ----------")?; + + let mut entries: Vec<_> = unsafe { self.map.iter().collect() }; + entries.sort_by_key(|bucket| unsafe { bucket.as_ref().0 }); + + for bucket in entries { + let mut indices = Vec::new(); + let mut curr_idx = unsafe { bucket.as_ref().1 }; + + while curr_idx > 0 { + indices.push(curr_idx - 1); + curr_idx = self.next[(curr_idx - 1) as usize]; + } + + indices.reverse(); + + writeln!( + f, + " | {:3} | {} | -> {:?}", + unsafe { bucket.as_ref().0 }, + unsafe { bucket.as_ref().1 }, + indices + )?; + } + + writeln!(f, " ----------")?; + writeln!(f, "\n next:")?; + writeln!(f, " ---------------------")?; + write!(f, " |")?; + for &next_idx in self.next.iter() { + write!(f, " {:2} |", next_idx)?; + } + writeln!(f)?; + write!(f, " |")?; + for i in 0..self.next.len() { + write!(f, " {:2} |", i)?; + } + writeln!(f, "\n ---------------------")?; + writeln!(f, "}}") } } diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 4712729bdaf5..c19927dceefe 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -263,6 +263,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::DynamicFilter(_) => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 2eb01914ee0a..0a49713eb484 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -400,6 +400,9 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + + /// Dynamic filters + DynamicFilter(String), } impl MetricValue { @@ -417,6 +420,7 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::DynamicFilter(_) => "dynamic_filters", } } @@ -442,6 +446,7 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + Self::DynamicFilter(_) => 1, } } @@ -469,6 +474,7 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::DynamicFilter(name) => Self::DynamicFilter(name.clone()), } } @@ -515,6 +521,7 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + (Self::DynamicFilter(_), _) => {} m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -539,6 +546,7 @@ impl MetricValue { Self::Time { .. } => 8, Self::StartTimestamp(_) => 9, // show timestamps last Self::EndTimestamp(_) => 10, + Self::DynamicFilter(_) => 11, } } @@ -574,6 +582,9 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::DynamicFilter(filter) => { + write!(f, "{filter}") + } } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 64e462d1695f..f65945a3a251 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -663,6 +663,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { projection, partition_mode, hashjoin.null_equals_null, + None, )?)) } PhysicalPlanType::SymmetricHashJoin(sym_join) => { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index aab63dd8bd66..54594bf6d9ae 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -229,6 +229,7 @@ fn roundtrip_hash_join() -> Result<()> { None, *partition_mode, false, + None, )?))?; } } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index f3fee4f1fca6..2573d4ab4359 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -205,6 +205,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE @@ -230,6 +231,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections SAME TEXT AS ABOVE +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true initial_physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -251,6 +253,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N] @@ -327,6 +330,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -367,6 +371,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 4d51a61c8a52..9b35e2c4008d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -237,6 +237,7 @@ datafusion.explain.show_sizes true datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.dynamic_join_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true @@ -330,6 +331,7 @@ datafusion.explain.show_sizes true When set to true, the explain statement will datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.dynamic_join_pushdown true when set to true, datafusion would try to push the build side statistic to probe phase datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6a49fda668a9..ebcd2f806a90 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -115,6 +115,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.dynamic_join_pushdown | true | when set to true, datafusion would try to push the build side statistic to probe phase | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |