From e3dc155a492a098d0c1d562be4a06e5df6e7c6cd Mon Sep 17 00:00:00 2001 From: Mrinal Paliwal Date: Sat, 11 Jan 2025 11:14:55 +0530 Subject: [PATCH 1/4] chore: move into crate --- datafusion/core/src/physical_optimizer/mod.rs | 2 -- .../core/src/physical_optimizer/test_utils.rs | 18 +++++++++--------- datafusion/physical-optimizer/Cargo.toml | 2 ++ datafusion/physical-optimizer/src/lib.rs | 1 + .../src}/sanity_checker.rs | 10 +++++----- 5 files changed, 17 insertions(+), 16 deletions(-) rename datafusion/{core/src/physical_optimizer => physical-optimizer/src}/sanity_checker.rs (99%) diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 000c27effdb6..a01f6f518653 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -28,8 +28,6 @@ pub mod join_selection; pub mod optimizer; pub mod projection_pushdown; pub mod replace_with_order_preserving_variants; -pub mod sanity_checker; -#[cfg(test)] pub mod test_utils; mod sort_pushdown; diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 9156301393c0..8b863c62f3e5 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -98,8 +98,8 @@ pub trait SqlTestCase { /// [UnaryTestCase] is designed for single input [ExecutionPlan]s. pub struct UnaryTestCase { - pub(crate) source_type: SourceType, - pub(crate) expect_fail: bool, + pub source_type: SourceType, + pub expect_fail: bool, } #[async_trait] @@ -116,8 +116,8 @@ impl SqlTestCase for UnaryTestCase { } /// [BinaryTestCase] is designed for binary input [ExecutionPlan]s. pub struct BinaryTestCase { - pub(crate) source_types: (SourceType, SourceType), - pub(crate) expect_fail: bool, + pub source_types: (SourceType, SourceType), + pub expect_fail: bool, } #[async_trait] @@ -136,14 +136,14 @@ impl SqlTestCase for BinaryTestCase { } pub struct QueryCase { - pub(crate) sql: String, - pub(crate) cases: Vec>, - pub(crate) error_operator: String, + pub sql: String, + pub cases: Vec>, + pub error_operator: String, } impl QueryCase { /// Run the test cases - pub(crate) async fn run(&self) -> Result<()> { + pub async fn run(&self) -> Result<()> { for case in &self.cases { let ctx = SessionContext::new(); case.register_table(&ctx).await?; @@ -215,7 +215,7 @@ pub fn coalesce_partitions_exec(input: Arc) -> Arc Arc { +pub fn memory_exec(schema: &SchemaRef) -> Arc { Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()) } diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 3454209445dc..e19f0b3b8aa6 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -40,12 +40,14 @@ datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } datafusion-physical-expr = { workspace = true } +datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } itertools = { workspace = true } log = { workspace = true } recursive = { workspace = true, optional = true } [dev-dependencies] +datafusion = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c4f5fa74e122..75f47ccce745 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -26,6 +26,7 @@ pub mod limited_distinct_aggregation; mod optimizer; pub mod output_requirements; pub mod pruning; +pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs similarity index 99% rename from datafusion/core/src/physical_optimizer/sanity_checker.rs rename to datafusion/physical-optimizer/src/sanity_checker.rs index 8e8787aec96b..aa637d4497f9 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -23,8 +23,8 @@ use std::sync::Arc; -use crate::error::Result; -use crate::physical_plan::ExecutionPlan; +use datafusion_common::Result; +use datafusion_physical_plan::ExecutionPlan; use datafusion_common::config::{ConfigOptions, OptimizerOptions}; use datafusion_common::plan_err; @@ -34,8 +34,8 @@ use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; +use crate::PhysicalOptimizerRule; use datafusion_physical_expr_common::sort_expr::format_physical_sort_requirement_list; -use datafusion_physical_optimizer::PhysicalOptimizerRule; use itertools::izip; /// The SanityCheckPlan rule rejects the following query plans: @@ -109,7 +109,7 @@ pub fn check_finiteness_requirements( /// all involved [`PhysicalExpr`]s, [`Operator`]s and data types support /// interval calculations. /// -/// [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr +/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr /// [`Operator`]: datafusion_expr::Operator fn is_prunable(join: &SymmetricHashJoinExec) -> bool { join.filter().is_some_and(|filter| { @@ -171,7 +171,7 @@ pub fn check_plan_sanity( mod tests { use super::*; - use crate::physical_optimizer::test_utils::{ + use datafusion::physical_optimizer::test_utils::{ bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, BinaryTestCase, QueryCase, SourceType, UnaryTestCase, From 193c2ff4c337ae9749343f49bda24c0d2da61001 Mon Sep 17 00:00:00 2001 From: Mrinal Paliwal Date: Wed, 15 Jan 2025 20:16:07 +0530 Subject: [PATCH 2/4] chore: move SanityChecker tests out to datafusion/core/tests --- datafusion/core/Cargo.toml | 1 + .../enforce_distribution.rs | 6 +- .../src/physical_optimizer/enforce_sorting.rs | 16 +- datafusion/core/src/physical_optimizer/mod.rs | 1 + .../replace_with_order_preserving_variants.rs | 2 +- .../core/src/physical_optimizer/test_utils.rs | 430 +------------- .../core/tests/physical_optimizer/mod.rs | 3 +- .../physical_optimizer/sanity_checker.rs | 536 ++++++++++++++++++ .../tests/physical_optimizer/test_util.rs | 119 ++++ datafusion/physical-optimizer/Cargo.toml | 4 +- datafusion/physical-optimizer/src/lib.rs | 1 + .../physical-optimizer/src/sanity_checker.rs | 516 ----------------- .../physical-optimizer/src/test_utils.rs | 336 +++++++++++ 13 files changed, 1014 insertions(+), 957 deletions(-) create mode 100644 datafusion/core/tests/physical_optimizer/sanity_checker.rs create mode 100644 datafusion/physical-optimizer/src/test_utils.rs diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 97b88a0b0c3d..e341816b2b8a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -141,6 +141,7 @@ async-trait = { workspace = true } criterion = { version = "0.5", features = ["async_tokio"] } ctor = { workspace = true } datafusion-functions-window-common = { workspace = true } +datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } env_logger = { workspace = true } paste = "^1.0" diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c44200a492eb..fbadceba0948 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1416,9 +1416,6 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; - use crate::physical_optimizer::test_utils::{ - check_integrity, coalesce_partitions_exec, repartition_exec, - }; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::expressions::col; use crate::physical_plan::filter::FilterExec; @@ -1427,6 +1424,9 @@ pub(crate) mod tests { use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; use datafusion_physical_optimizer::output_requirements::OutputRequirements; + use datafusion_physical_optimizer::test_utils::{ + check_integrity, coalesce_partitions_exec, repartition_exec, + }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::ScalarValue; diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index dd8e9d900b7d..167f9d6d45e7 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -658,17 +658,17 @@ fn get_sort_exprs( mod tests { use super::*; use crate::physical_optimizer::enforce_distribution::EnforceDistribution; - use crate::physical_optimizer::test_utils::{ - aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, - coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec, - limit_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_sorted, - repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, spr_repartition_exec, union_exec, - RequirementsTestExec, - }; + use crate::physical_optimizer::test_utils::{parquet_exec, parquet_exec_sorted}; use crate::physical_plan::{displayable, get_plan_string, Partitioning}; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered}; + use datafusion_physical_optimizer::test_utils::{ + aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, + coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec, + limit_exec, local_limit_exec, memory_exec, repartition_exec, sort_exec, + sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + spr_repartition_exec, union_exec, RequirementsTestExec, + }; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index 84490c2a33ba..63fe115e602c 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -27,6 +27,7 @@ pub mod enforce_sorting; pub mod optimizer; pub mod projection_pushdown; pub mod replace_with_order_preserving_variants; +#[cfg(test)] pub mod test_utils; mod sort_pushdown; diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 9f5afc7abc2e..1bbe9d483cbd 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -286,7 +286,6 @@ mod tests { use super::*; use crate::execution::TaskContext; - use crate::physical_optimizer::test_utils::check_integrity; use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::{HashJoinExec, PartitionMode}; @@ -296,6 +295,7 @@ mod tests { }; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::TestStreamPartition; + use datafusion_physical_optimizer::test_utils::check_integrity; use arrow::array::{ArrayRef, Int32Array}; use arrow::compute::SortOptions; diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 8b863c62f3e5..aba24309b2a0 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -19,268 +19,15 @@ #![allow(missing_docs)] -use std::any::Any; -use std::fmt::Formatter; use std::sync::Arc; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; -use crate::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; -use crate::error::Result; -use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; -use crate::physical_plan::coalesce_batches::CoalesceBatchesExec; -use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use crate::physical_plan::filter::FilterExec; -use crate::physical_plan::joins::utils::{JoinFilter, JoinOn}; -use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; -use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use crate::physical_plan::memory::MemoryExec; -use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; -use crate::physical_plan::union::UnionExec; -use crate::physical_plan::windows::create_window_expr; -use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning}; -use crate::prelude::{CsvReadOptions, SessionContext}; +use crate::physical_plan::ExecutionPlan; -use arrow_schema::{Schema, SchemaRef, SortOptions}; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::JoinType; +use arrow_schema::SchemaRef; use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; -use datafusion_functions_aggregate::count::count_udaf; -use datafusion_physical_expr::expressions::col; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::{ - displayable, DisplayAs, DisplayFormatType, PlanProperties, -}; - -use async_trait::async_trait; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; - -async fn register_current_csv( - ctx: &SessionContext, - table_name: &str, - infinite: bool, -) -> Result<()> { - let testdata = crate::test_util::arrow_test_data(); - let schema = crate::test_util::aggr_test_schema(); - let path = format!("{testdata}/csv/aggregate_test_100.csv"); - - match infinite { - true => { - let source = FileStreamProvider::new_file(schema, path.into()); - let config = StreamConfig::new(Arc::new(source)); - ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; - } - false => { - ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema)) - .await?; - } - } - - Ok(()) -} - -#[derive(Eq, PartialEq, Debug)] -pub enum SourceType { - Unbounded, - Bounded, -} - -#[async_trait] -pub trait SqlTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()>; - fn expect_fail(&self) -> bool; -} - -/// [UnaryTestCase] is designed for single input [ExecutionPlan]s. -pub struct UnaryTestCase { - pub source_type: SourceType, - pub expect_fail: bool, -} - -#[async_trait] -impl SqlTestCase for UnaryTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()> { - let table_is_infinite = self.source_type == SourceType::Unbounded; - register_current_csv(ctx, "test", table_is_infinite).await?; - Ok(()) - } - - fn expect_fail(&self) -> bool { - self.expect_fail - } -} -/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s. -pub struct BinaryTestCase { - pub source_types: (SourceType, SourceType), - pub expect_fail: bool, -} - -#[async_trait] -impl SqlTestCase for BinaryTestCase { - async fn register_table(&self, ctx: &SessionContext) -> Result<()> { - let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded; - let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded; - register_current_csv(ctx, "left", left_table_is_infinite).await?; - register_current_csv(ctx, "right", right_table_is_infinite).await?; - Ok(()) - } - - fn expect_fail(&self) -> bool { - self.expect_fail - } -} - -pub struct QueryCase { - pub sql: String, - pub cases: Vec>, - pub error_operator: String, -} - -impl QueryCase { - /// Run the test cases - pub async fn run(&self) -> Result<()> { - for case in &self.cases { - let ctx = SessionContext::new(); - case.register_table(&ctx).await?; - let error = if case.expect_fail() { - Some(&self.error_operator) - } else { - None - }; - self.run_case(ctx, error).await?; - } - Ok(()) - } - async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> { - let dataframe = ctx.sql(self.sql.as_str()).await?; - let plan = dataframe.create_physical_plan().await; - if let Some(error) = error { - let plan_error = plan.unwrap_err(); - assert!( - plan_error.to_string().contains(error.as_str()), - "plan_error: {:?} doesn't contain message: {:?}", - plan_error, - error.as_str() - ); - } else { - assert!(plan.is_ok()) - } - Ok(()) - } -} - -pub fn sort_merge_join_exec( - left: Arc, - right: Arc, - join_on: &JoinOn, - join_type: &JoinType, -) -> Arc { - Arc::new( - SortMergeJoinExec::try_new( - left, - right, - join_on.clone(), - None, - *join_type, - vec![SortOptions::default(); join_on.len()], - false, - ) - .unwrap(), - ) -} - -/// make PhysicalSortExpr with default options -pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { - sort_expr_options(name, schema, SortOptions::default()) -} - -/// PhysicalSortExpr with specified options -pub fn sort_expr_options( - name: &str, - schema: &Schema, - options: SortOptions, -) -> PhysicalSortExpr { - PhysicalSortExpr { - expr: col(name, schema).unwrap(), - options, - } -} - -pub fn coalesce_partitions_exec(input: Arc) -> Arc { - Arc::new(CoalescePartitionsExec::new(input)) -} - -pub fn memory_exec(schema: &SchemaRef) -> Arc { - Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()) -} - -pub fn hash_join_exec( - left: Arc, - right: Arc, - on: JoinOn, - filter: Option, - join_type: &JoinType, -) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( - left, - right, - on, - filter, - join_type, - None, - PartitionMode::Partitioned, - true, - )?)) -} - -pub fn bounded_window_exec( - col_name: &str, - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs: LexOrdering = sort_exprs.into_iter().collect(); - let schema = input.schema(); - - Arc::new( - crate::physical_plan::windows::BoundedWindowAggExec::try_new( - vec![create_window_expr( - &WindowFunctionDefinition::AggregateUDF(count_udaf()), - "count".to_owned(), - &[col(col_name, &schema).unwrap()], - &[], - sort_exprs.as_ref(), - Arc::new(WindowFrame::new(Some(false))), - schema.as_ref(), - false, - ) - .unwrap()], - input.clone(), - vec![], - InputOrderMode::Sorted, - ) - .unwrap(), - ) -} - -pub fn filter_exec( - predicate: Arc, - input: Arc, -) -> Arc { - Arc::new(FilterExec::try_new(predicate, input).unwrap()) -} - -pub fn sort_preserving_merge_exec( - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) -} +use datafusion_physical_expr::PhysicalSortExpr; /// Create a non sorted parquet exec pub fn parquet_exec(schema: &SchemaRef) -> Arc { @@ -305,174 +52,3 @@ pub fn parquet_exec_sorted( ) .build_arc() } - -pub fn union_exec(input: Vec>) -> Arc { - Arc::new(UnionExec::new(input)) -} - -pub fn limit_exec(input: Arc) -> Arc { - global_limit_exec(local_limit_exec(input)) -} - -pub fn local_limit_exec(input: Arc) -> Arc { - Arc::new(LocalLimitExec::new(input, 100)) -} - -pub fn global_limit_exec(input: Arc) -> Arc { - Arc::new(GlobalLimitExec::new(input, 0, Some(100))) -} - -pub fn repartition_exec(input: Arc) -> Arc { - Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap()) -} - -pub fn spr_repartition_exec(input: Arc) -> Arc { - Arc::new( - RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)) - .unwrap() - .with_preserve_order(), - ) -} - -pub fn aggregate_exec(input: Arc) -> Arc { - let schema = input.schema(); - Arc::new( - AggregateExec::try_new( - AggregateMode::Final, - PhysicalGroupBy::default(), - vec![], - vec![], - input, - schema, - ) - .unwrap(), - ) -} - -pub fn coalesce_batches_exec(input: Arc) -> Arc { - Arc::new(CoalesceBatchesExec::new(input, 128)) -} - -pub fn sort_exec( - sort_exprs: impl IntoIterator, - input: Arc, -) -> Arc { - let sort_exprs = sort_exprs.into_iter().collect(); - Arc::new(SortExec::new(sort_exprs, input)) -} - -/// A test [`ExecutionPlan`] whose requirements can be configured. -#[derive(Debug)] -pub struct RequirementsTestExec { - required_input_ordering: LexOrdering, - maintains_input_order: bool, - input: Arc, -} - -impl RequirementsTestExec { - pub fn new(input: Arc) -> Self { - Self { - required_input_ordering: LexOrdering::default(), - maintains_input_order: true, - input, - } - } - - /// sets the required input ordering - pub fn with_required_input_ordering( - mut self, - required_input_ordering: LexOrdering, - ) -> Self { - self.required_input_ordering = required_input_ordering; - self - } - - /// set the maintains_input_order flag - pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self { - self.maintains_input_order = maintains_input_order; - self - } - - /// returns this ExecutionPlan as an Arc - pub fn into_arc(self) -> Arc { - Arc::new(self) - } -} - -impl DisplayAs for RequirementsTestExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "RequiredInputOrderingExec") - } -} - -impl ExecutionPlan for RequirementsTestExec { - fn name(&self) -> &str { - "RequiredInputOrderingExec" - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - self.input.properties() - } - - fn required_input_ordering(&self) -> Vec> { - let requirement = LexRequirement::from(self.required_input_ordering.clone()); - vec![Some(requirement)] - } - - fn maintains_input_order(&self) -> Vec { - vec![self.maintains_input_order] - } - - fn children(&self) -> Vec<&Arc> { - vec![&self.input] - } - - fn with_new_children( - self: Arc, - children: Vec>, - ) -> Result> { - assert_eq!(children.len(), 1); - Ok(RequirementsTestExec::new(children[0].clone()) - .with_required_input_ordering(self.required_input_ordering.clone()) - .with_maintains_input_order(self.maintains_input_order) - .into_arc()) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> Result { - unimplemented!("Test exec does not support execution") - } -} - -/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after -/// untested mutable operations. It is crucial that there be no discrepancies between a plan -/// associated with the root node and the plan generated after traversing all nodes -/// within the [`PlanContext`] tree. In addition to verifying the plans resulting from optimizer -/// rules, it is essential to ensure that the overall tree structure corresponds with the plans -/// contained within the node contexts. -/// TODO: Once [`ExecutionPlan`] implements [`PartialEq`], string comparisons should be -/// replaced with direct plan equality checks. -pub fn check_integrity(context: PlanContext) -> Result> { - context - .transform_up(|node| { - let children_plans = node.plan.children(); - assert_eq!(node.children.len(), children_plans.len()); - for (child_plan, child_node) in - children_plans.iter().zip(node.children.iter()) - { - assert_eq!( - displayable(child_plan.as_ref()).one_line().to_string(), - displayable(child_node.plan.as_ref()).one_line().to_string() - ); - } - Ok(Transformed::no(node)) - }) - .data() -} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index efe377891128..1fac68e2505c 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -17,4 +17,5 @@ mod combine_partial_final_agg; mod limited_distinct_aggregation; -mod test_util; +mod sanity_checker; +pub(crate) mod test_util; diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs new file mode 100644 index 000000000000..538f0e443ddb --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -0,0 +1,536 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for [`SanityCheckPlan`] physical optimizer rule +//! +//! Note these tests are not in the same module as the optimizer pass because +//! they rely on `ParquetExec` which is in the core crate. + +use crate::physical_optimizer::test_util::{ + BinaryTestCase, QueryCase, SourceType, UnaryTestCase, +}; +use arrow::compute::SortOptions; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; +use datafusion_expr::JoinType; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::Partitioning; +use datafusion_physical_optimizer::test_utils::{ + bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, + repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, +}; +use datafusion_physical_optimizer::{sanity_checker::*, PhysicalOptimizerRule}; +use datafusion_physical_plan::displayable; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::ExecutionPlan; +use std::sync::Arc; + +fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) +} + +fn create_test_schema2() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + ])) +} + +/// Check if sanity checker should accept or reject plans. +fn assert_sanity_check(plan: &Arc, is_sane: bool) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + assert_eq!( + sanity_checker.optimize(plan.clone(), &opts).is_ok(), + is_sane + ); +} + +/// Check if the plan we created is as expected by comparing the plan +/// formatted as a string. +fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { + let plan_str = displayable(plan).indent(true).to_string(); + let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); + assert_eq!(actual_lines, expected_lines); +} + +#[tokio::test] +async fn test_hash_left_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + // Left join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_right_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_inner_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: false, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "Join Error".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_full_outer_join_swap() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + // Full join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" + .to_string(), + cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], + error_operator: "operator: HashJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_aggregate() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: AggregateExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_hash_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test + LIMIT 5".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_window_agg_single_partition() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: true, + }; + let case = QueryCase { + sql: "SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 + FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "operator: SortExec".to_string() + }; + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_hash_cross_join() -> Result<()> { + let test1 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Bounded), + expect_fail: true, + }; + let test2 = BinaryTestCase { + source_types: (SourceType::Unbounded, SourceType::Unbounded), + expect_fail: true, + }; + let test3 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Unbounded), + expect_fail: true, + }; + let test4 = BinaryTestCase { + source_types: (SourceType::Bounded, SourceType::Bounded), + expect_fail: false, + }; + let case = QueryCase { + sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), + cases: vec![ + Arc::new(test1), + Arc::new(test2), + Arc::new(test3), + Arc::new(test4), + ], + error_operator: "operator: CrossJoinExec".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +async fn test_analyzer() -> Result<()> { + let test1 = UnaryTestCase { + source_type: SourceType::Bounded, + expect_fail: false, + }; + let test2 = UnaryTestCase { + source_type: SourceType::Unbounded, + expect_fail: false, + }; + let case = QueryCase { + sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), + cases: vec![Arc::new(test1), Arc::new(test2)], + error_operator: "Analyze Error".to_string(), + }; + + case.run().await?; + Ok(()) +} + +#[tokio::test] +/// Tests that plan is valid when the sort requirements are satisfied. +async fn test_bounded_window_agg_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let sort = sort_exec(sort_exprs.clone(), source); + let bw = bounded_window_exec("c9", sort_exprs, sort); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + assert_sanity_check(&bw, true); + Ok(()) +} + +#[tokio::test] +/// Tests that plan is invalid when the sort requirements are not satisfied. +async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr_options( + "c9", + &source.schema(), + SortOptions { + descending: false, + nulls_first: false, + }, + )]; + let bw = bounded_window_exec("c9", sort_exprs, source); + assert_plan(bw.as_ref(), vec![ + "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", + " MemoryExec: partitions=1, partition_sizes=[0]" + ]); + // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&bw, false); + Ok(()) +} + +#[tokio::test] +/// A valid when a single partition requirement +/// is satisfied. +async fn test_global_limit_single_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// An invalid plan when a single partition requirement +/// is not satisfied. +async fn test_global_limit_multi_partition() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = global_limit_exec(repartition_exec(source)); + + assert_plan( + limit.as_ref(), + vec![ + "GlobalLimitExec: skip=0, fetch=100", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. + assert_sanity_check(&limit, false); + Ok(()) +} + +#[tokio::test] +/// A plan with no requirements should satisfy. +async fn test_local_limit() -> Result<()> { + let schema = create_test_schema(); + let source = memory_exec(&schema); + let limit = local_limit_exec(source); + + assert_plan( + limit.as_ref(), + vec![ + "LocalLimitExec: fetch=100", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&limit, true); + Ok(()) +} + +#[tokio::test] +/// Valid plan with multiple children satisfy both order and distribution. +async fn test_sort_merge_join_satisfied() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + assert_sanity_check(&smj, true); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the order is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_order_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let right = memory_exec(&schema2); + let sort_exprs1 = vec![sort_expr_options( + "c9", + &source1.schema(), + SortOptions::default(), + )]; + let left = sort_exec(sort_exprs1, source1); + // Missing sort of the right child here.. + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::Hash(vec![right_jcol.clone()], 10), + )?); + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} + +#[tokio::test] +/// Invalid case when the distribution is not satisfied by the 2nd +/// child. +async fn test_sort_merge_join_dist_missing() -> Result<()> { + let schema1 = create_test_schema(); + let schema2 = create_test_schema2(); + let source1 = memory_exec(&schema1); + let source2 = memory_exec(&schema2); + let sort_opts = SortOptions::default(); + let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; + let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; + let left = sort_exec(sort_exprs1, source1); + let right = sort_exec(sort_exprs2, source2); + let right = Arc::new(RepartitionExec::try_new( + right, + Partitioning::RoundRobinBatch(10), + )?); + let left_jcol = col("c9", &left.schema()).unwrap(); + let right_jcol = col("a", &right.schema()).unwrap(); + let left = Arc::new(RepartitionExec::try_new( + left, + Partitioning::Hash(vec![left_jcol.clone()], 10), + )?); + + // Missing hash partitioning on right child. + + let join_on = vec![(left_jcol as _, right_jcol as _)]; + let join_ty = JoinType::Inner; + let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); + + assert_plan( + smj.as_ref(), + vec![ + "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", + " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", + " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ], + ); + // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. + assert_sanity_check(&smj, false); + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/test_util.rs b/datafusion/core/tests/physical_optimizer/test_util.rs index 12cd08fb3db3..ea4b80a7899c 100644 --- a/datafusion/core/tests/physical_optimizer/test_util.rs +++ b/datafusion/core/tests/physical_optimizer/test_util.rs @@ -19,6 +19,11 @@ use std::sync::Arc; +use async_trait::async_trait; +use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; +use datafusion::error::Result; +use datafusion::prelude::{CsvReadOptions, SessionContext}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::{ listing::PartitionedFile, @@ -55,3 +60,117 @@ pub(crate) fn trim_plan_display(plan: &str) -> Vec<&str> { .filter(|s| !s.is_empty()) .collect() } + +async fn register_current_csv( + ctx: &SessionContext, + table_name: &str, + infinite: bool, +) -> Result<()> { + let testdata = datafusion::test_util::arrow_test_data(); + let schema = datafusion::test_util::aggr_test_schema(); + let path = format!("{testdata}/csv/aggregate_test_100.csv"); + + match infinite { + true => { + let source = FileStreamProvider::new_file(schema, path.into()); + let config = StreamConfig::new(Arc::new(source)); + ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?; + } + false => { + ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema)) + .await?; + } + } + + Ok(()) +} + +#[derive(Eq, PartialEq, Debug)] +pub enum SourceType { + Unbounded, + Bounded, +} + +#[async_trait] +pub trait SqlTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()>; + fn expect_fail(&self) -> bool; +} + +/// [UnaryTestCase] is designed for single input [ExecutionPlan]s. +pub struct UnaryTestCase { + pub source_type: SourceType, + pub expect_fail: bool, +} + +#[async_trait] +impl SqlTestCase for UnaryTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()> { + let table_is_infinite = self.source_type == SourceType::Unbounded; + register_current_csv(ctx, "test", table_is_infinite).await?; + Ok(()) + } + + fn expect_fail(&self) -> bool { + self.expect_fail + } +} +/// [BinaryTestCase] is designed for binary input [ExecutionPlan]s. +pub struct BinaryTestCase { + pub source_types: (SourceType, SourceType), + pub expect_fail: bool, +} + +#[async_trait] +impl SqlTestCase for BinaryTestCase { + async fn register_table(&self, ctx: &SessionContext) -> Result<()> { + let left_table_is_infinite = self.source_types.0 == SourceType::Unbounded; + let right_table_is_infinite = self.source_types.1 == SourceType::Unbounded; + register_current_csv(ctx, "left", left_table_is_infinite).await?; + register_current_csv(ctx, "right", right_table_is_infinite).await?; + Ok(()) + } + + fn expect_fail(&self) -> bool { + self.expect_fail + } +} + +pub struct QueryCase { + pub sql: String, + pub cases: Vec>, + pub error_operator: String, +} + +impl QueryCase { + /// Run the test cases + pub async fn run(&self) -> Result<()> { + for case in &self.cases { + let ctx = SessionContext::new(); + case.register_table(&ctx).await?; + let error = if case.expect_fail() { + Some(&self.error_operator) + } else { + None + }; + self.run_case(ctx, error).await?; + } + Ok(()) + } + async fn run_case(&self, ctx: SessionContext, error: Option<&String>) -> Result<()> { + let dataframe = ctx.sql(self.sql.as_str()).await?; + let plan = dataframe.create_physical_plan().await; + if let Some(error) = error { + let plan_error = plan.unwrap_err(); + assert!( + plan_error.to_string().contains(error.as_str()), + "plan_error: {:?} doesn't contain message: {:?}", + plan_error, + error.as_str() + ); + } else { + assert!(plan.is_ok()) + } + Ok(()) + } +} diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 4c911fee400a..338d37671f96 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -36,9 +36,12 @@ recursive_protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } datafusion-expr-common = { workspace = true, default-features = true } +datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } @@ -48,7 +51,6 @@ log = { workspace = true } recursive = { workspace = true, optional = true } [dev-dependencies] -datafusion = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions-aggregate = { workspace = true } datafusion-functions-nested = { workspace = true } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index bf66e3eb4505..ccb18f679171 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -28,6 +28,7 @@ mod optimizer; pub mod output_requirements; pub mod pruning; pub mod sanity_checker; +pub mod test_utils; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index aa637d4497f9..1cf89ed8d8a4 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -166,519 +166,3 @@ pub fn check_plan_sanity( Ok(Transformed::no(plan)) } - -#[cfg(test)] -mod tests { - use super::*; - - use datafusion::physical_optimizer::test_utils::{ - bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, - BinaryTestCase, QueryCase, SourceType, UnaryTestCase, - }; - - use arrow::compute::SortOptions; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::Result; - use datafusion_expr::JoinType; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::Partitioning; - use datafusion_physical_plan::displayable; - use datafusion_physical_plan::repartition::RepartitionExec; - - fn create_test_schema() -> SchemaRef { - Arc::new(Schema::new(vec![Field::new("c9", DataType::Int32, true)])) - } - - fn create_test_schema2() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - ])) - } - - /// Check if sanity checker should accept or reject plans. - fn assert_sanity_check(plan: &Arc, is_sane: bool) { - let sanity_checker = SanityCheckPlan::new(); - let opts = ConfigOptions::default(); - assert_eq!( - sanity_checker.optimize(plan.clone(), &opts).is_ok(), - is_sane - ); - } - - /// Check if the plan we created is as expected by comparing the plan - /// formatted as a string. - fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { - let plan_str = displayable(plan).indent(true).to_string(); - let actual_lines: Vec<&str> = plan_str.trim().lines().collect(); - assert_eq!(actual_lines, expected_lines); - } - - #[tokio::test] - async fn test_hash_left_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - // Left join for bounded build side and unbounded probe side can generate - // both incremental matched rows and final non-matched rows. - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 LEFT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_right_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 RIGHT JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_inner_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: false, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "Join Error".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_full_outer_join_swap() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - // Full join for bounded build side and unbounded probe side can generate - // both incremental matched rows and final non-matched rows. - expect_fail: false, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 FULL JOIN right as t2 ON t1.c1 = t2.c1" - .to_string(), - cases: vec![Arc::new(test1), Arc::new(test2), Arc::new(test3)], - error_operator: "operator: HashJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_aggregate() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT c1, MIN(c4) FROM test GROUP BY c1".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: AggregateExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_hash_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(PARTITION BY c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test - LIMIT 5".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_window_agg_single_partition() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: true, - }; - let case = QueryCase { - sql: "SELECT - c9, - SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) as sum1 - FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "operator: SortExec".to_string() - }; - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_hash_cross_join() -> Result<()> { - let test1 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Bounded), - expect_fail: true, - }; - let test2 = BinaryTestCase { - source_types: (SourceType::Unbounded, SourceType::Unbounded), - expect_fail: true, - }; - let test3 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, - }; - let test4 = BinaryTestCase { - source_types: (SourceType::Bounded, SourceType::Bounded), - expect_fail: false, - }; - let case = QueryCase { - sql: "SELECT t2.c1 FROM left as t1 CROSS JOIN right as t2".to_string(), - cases: vec![ - Arc::new(test1), - Arc::new(test2), - Arc::new(test3), - Arc::new(test4), - ], - error_operator: "operator: CrossJoinExec".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - async fn test_analyzer() -> Result<()> { - let test1 = UnaryTestCase { - source_type: SourceType::Bounded, - expect_fail: false, - }; - let test2 = UnaryTestCase { - source_type: SourceType::Unbounded, - expect_fail: false, - }; - let case = QueryCase { - sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), - cases: vec![Arc::new(test1), Arc::new(test2)], - error_operator: "Analyze Error".to_string(), - }; - - case.run().await?; - Ok(()) - } - - #[tokio::test] - /// Tests that plan is valid when the sort requirements are satisfied. - async fn test_bounded_window_agg_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let sort = sort_exec(sort_exprs.clone(), source); - let bw = bounded_window_exec("c9", sort_exprs, sort); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - assert_sanity_check(&bw, true); - Ok(()) - } - - #[tokio::test] - /// Tests that plan is invalid when the sort requirements are not satisfied. - async fn test_bounded_window_agg_no_sort_requirement() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let sort_exprs = vec![sort_expr_options( - "c9", - &source.schema(), - SortOptions { - descending: false, - nulls_first: false, - }, - )]; - let bw = bounded_window_exec("c9", sort_exprs, source); - assert_plan(bw.as_ref(), vec![ - "BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]", - " MemoryExec: partitions=1, partition_sizes=[0]" - ]); - // Order requirement of the `BoundedWindowAggExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&bw, false); - Ok(()) - } - - #[tokio::test] - /// A valid when a single partition requirement - /// is satisfied. - async fn test_global_limit_single_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) - } - - #[tokio::test] - /// An invalid plan when a single partition requirement - /// is not satisfied. - async fn test_global_limit_multi_partition() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = global_limit_exec(repartition_exec(source)); - - assert_plan( - limit.as_ref(), - vec![ - "GlobalLimitExec: skip=0, fetch=100", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement of the `GlobalLimitExec` is not satisfied. We expect to receive error during sanity check. - assert_sanity_check(&limit, false); - Ok(()) - } - - #[tokio::test] - /// A plan with no requirements should satisfy. - async fn test_local_limit() -> Result<()> { - let schema = create_test_schema(); - let source = memory_exec(&schema); - let limit = local_limit_exec(source); - - assert_plan( - limit.as_ref(), - vec![ - "LocalLimitExec: fetch=100", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&limit, true); - Ok(()) - } - - #[tokio::test] - /// Valid plan with multiple children satisfy both order and distribution. - async fn test_sort_merge_join_satisfied() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - assert_sanity_check(&smj, true); - Ok(()) - } - - #[tokio::test] - /// Invalid case when the order is not satisfied by the 2nd - /// child. - async fn test_sort_merge_join_order_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let right = memory_exec(&schema2); - let sort_exprs1 = vec![sort_expr_options( - "c9", - &source1.schema(), - SortOptions::default(), - )]; - let left = sort_exec(sort_exprs1, source1); - // Missing sort of the right child here.. - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::Hash(vec![right_jcol.clone()], 10), - )?); - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Order requirement for the `SortMergeJoin` is not satisfied for right child. We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) - } - - #[tokio::test] - /// Invalid case when the distribution is not satisfied by the 2nd - /// child. - async fn test_sort_merge_join_dist_missing() -> Result<()> { - let schema1 = create_test_schema(); - let schema2 = create_test_schema2(); - let source1 = memory_exec(&schema1); - let source2 = memory_exec(&schema2); - let sort_opts = SortOptions::default(); - let sort_exprs1 = vec![sort_expr_options("c9", &source1.schema(), sort_opts)]; - let sort_exprs2 = vec![sort_expr_options("a", &source2.schema(), sort_opts)]; - let left = sort_exec(sort_exprs1, source1); - let right = sort_exec(sort_exprs2, source2); - let right = Arc::new(RepartitionExec::try_new( - right, - Partitioning::RoundRobinBatch(10), - )?); - let left_jcol = col("c9", &left.schema()).unwrap(); - let right_jcol = col("a", &right.schema()).unwrap(); - let left = Arc::new(RepartitionExec::try_new( - left, - Partitioning::Hash(vec![left_jcol.clone()], 10), - )?); - - // Missing hash partitioning on right child. - - let join_on = vec![(left_jcol as _, right_jcol as _)]; - let join_ty = JoinType::Inner; - let smj = sort_merge_join_exec(left, right, &join_on, &join_ty); - - assert_plan( - smj.as_ref(), - vec![ - "SortMergeJoin: join_type=Inner, on=[(c9@0, a@0)]", - " RepartitionExec: partitioning=Hash([c9@0], 10), input_partitions=1", - " SortExec: expr=[c9@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " MemoryExec: partitions=1, partition_sizes=[0]", - ], - ); - // Distribution requirement for the `SortMergeJoin` is not satisfied for right child (has round-robin partitioning). We expect to receive error during sanity check. - assert_sanity_check(&smj, false); - Ok(()) - } -} diff --git a/datafusion/physical-optimizer/src/test_utils.rs b/datafusion/physical-optimizer/src/test_utils.rs new file mode 100644 index 000000000000..b8c920d866e0 --- /dev/null +++ b/datafusion/physical-optimizer/src/test_utils.rs @@ -0,0 +1,336 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collection of testing utility functions that are leveraged by the query optimizer rules + +use std::sync::Arc; + +use std::any::Any; +use std::fmt::Formatter; + +use arrow_schema::{Schema, SchemaRef, SortOptions}; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{JoinType, Result}; +use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; +use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::expressions::col; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; +use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::joins::utils::{JoinFilter, JoinOn}; +use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; +use datafusion_physical_plan::memory::MemoryExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; +use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::windows::{create_window_expr, BoundedWindowAggExec}; +use datafusion_physical_plan::{InputOrderMode, Partitioning}; + +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::ExecutionPlan; + +use datafusion_physical_plan::tree_node::PlanContext; +use datafusion_physical_plan::{ + displayable, DisplayAs, DisplayFormatType, PlanProperties, +}; + +pub fn sort_merge_join_exec( + left: Arc, + right: Arc, + join_on: &JoinOn, + join_type: &JoinType, +) -> Arc { + Arc::new( + SortMergeJoinExec::try_new( + left, + right, + join_on.clone(), + None, + *join_type, + vec![SortOptions::default(); join_on.len()], + false, + ) + .unwrap(), + ) +} + +/// make PhysicalSortExpr with default options +pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { + sort_expr_options(name, schema, SortOptions::default()) +} + +/// PhysicalSortExpr with specified options +pub fn sort_expr_options( + name: &str, + schema: &Schema, + options: SortOptions, +) -> PhysicalSortExpr { + PhysicalSortExpr { + expr: col(name, schema).unwrap(), + options, + } +} + +pub fn coalesce_partitions_exec(input: Arc) -> Arc { + Arc::new(CoalescePartitionsExec::new(input)) +} + +pub fn memory_exec(schema: &SchemaRef) -> Arc { + Arc::new(MemoryExec::try_new(&[vec![]], Arc::clone(schema), None).unwrap()) +} + +pub fn hash_join_exec( + left: Arc, + right: Arc, + on: JoinOn, + filter: Option, + join_type: &JoinType, +) -> Result> { + Ok(Arc::new(HashJoinExec::try_new( + left, + right, + on, + filter, + join_type, + None, + PartitionMode::Partitioned, + true, + )?)) +} + +pub fn bounded_window_exec( + col_name: &str, + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs: LexOrdering = sort_exprs.into_iter().collect(); + let schema = input.schema(); + + Arc::new( + BoundedWindowAggExec::try_new( + vec![create_window_expr( + &WindowFunctionDefinition::AggregateUDF(count_udaf()), + "count".to_owned(), + &[col(col_name, &schema).unwrap()], + &[], + sort_exprs.as_ref(), + Arc::new(WindowFrame::new(Some(false))), + schema.as_ref(), + false, + ) + .unwrap()], + Arc::clone(&input), + vec![], + InputOrderMode::Sorted, + ) + .unwrap(), + ) +} + +pub fn filter_exec( + predicate: Arc, + input: Arc, +) -> Arc { + Arc::new(FilterExec::try_new(predicate, input).unwrap()) +} + +pub fn sort_preserving_merge_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortPreservingMergeExec::new(sort_exprs, input)) +} + +pub fn union_exec(input: Vec>) -> Arc { + Arc::new(UnionExec::new(input)) +} + +pub fn limit_exec(input: Arc) -> Arc { + global_limit_exec(local_limit_exec(input)) +} + +pub fn local_limit_exec(input: Arc) -> Arc { + Arc::new(LocalLimitExec::new(input, 100)) +} + +pub fn global_limit_exec(input: Arc) -> Arc { + Arc::new(GlobalLimitExec::new(input, 0, Some(100))) +} + +pub fn repartition_exec(input: Arc) -> Arc { + Arc::new(RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap()) +} + +pub fn spr_repartition_exec(input: Arc) -> Arc { + Arc::new( + RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)) + .unwrap() + .with_preserve_order(), + ) +} + +pub fn aggregate_exec(input: Arc) -> Arc { + let schema = input.schema(); + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::default(), + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +pub fn coalesce_batches_exec(input: Arc) -> Arc { + Arc::new(CoalesceBatchesExec::new(input, 128)) +} + +pub fn sort_exec( + sort_exprs: impl IntoIterator, + input: Arc, +) -> Arc { + let sort_exprs = sort_exprs.into_iter().collect(); + Arc::new(SortExec::new(sort_exprs, input)) +} + +/// A test [`ExecutionPlan`] whose requirements can be configured. +#[derive(Debug)] +pub struct RequirementsTestExec { + required_input_ordering: LexOrdering, + maintains_input_order: bool, + input: Arc, +} + +impl RequirementsTestExec { + pub fn new(input: Arc) -> Self { + Self { + required_input_ordering: LexOrdering::default(), + maintains_input_order: true, + input, + } + } + + /// sets the required input ordering + pub fn with_required_input_ordering( + mut self, + required_input_ordering: LexOrdering, + ) -> Self { + self.required_input_ordering = required_input_ordering; + self + } + + /// set the maintains_input_order flag + pub fn with_maintains_input_order(mut self, maintains_input_order: bool) -> Self { + self.maintains_input_order = maintains_input_order; + self + } + + /// returns this ExecutionPlan as an Arc + pub fn into_arc(self) -> Arc { + Arc::new(self) + } +} + +impl DisplayAs for RequirementsTestExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "RequiredInputOrderingExec") + } +} + +impl ExecutionPlan for RequirementsTestExec { + fn name(&self) -> &str { + "RequiredInputOrderingExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn required_input_ordering(&self) -> Vec> { + let requirement = LexRequirement::from(self.required_input_ordering.clone()); + vec![Some(requirement)] + } + + fn maintains_input_order(&self) -> Vec { + vec![self.maintains_input_order] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert_eq!(children.len(), 1); + Ok(RequirementsTestExec::new(Arc::clone(&children[0])) + .with_required_input_ordering(self.required_input_ordering.clone()) + .with_maintains_input_order(self.maintains_input_order) + .into_arc()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!("Test exec does not support execution") + } +} + +/// A [`PlanContext`] object is susceptible to being left in an inconsistent state after +/// untested mutable operations. It is crucial that there be no discrepancies between a plan +/// associated with the root node and the plan generated after traversing all nodes +/// within the [`PlanContext`] tree. In addition to verifying the plans resulting from optimizer +/// rules, it is essential to ensure that the overall tree structure corresponds with the plans +/// contained within the node contexts. +/// TODO: Once [`ExecutionPlan`] implements [`PartialEq`], string comparisons should be +/// replaced with direct plan equality checks. +pub fn check_integrity(context: PlanContext) -> Result> { + context + .transform_up(|node| { + let children_plans = node.plan.children(); + assert_eq!(node.children.len(), children_plans.len()); + for (child_plan, child_node) in + children_plans.iter().zip(node.children.iter()) + { + assert_eq!( + displayable(child_plan.as_ref()).one_line().to_string(), + displayable(child_node.plan.as_ref()).one_line().to_string() + ); + } + Ok(Transformed::no(node)) + }) + .data() +} From 2b2800c979a747ab30dd8d76534aded18e9da6ee Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Jan 2025 11:48:12 -0500 Subject: [PATCH 3/4] chore: update datafusion-cli/Cargo.lock --- datafusion-cli/Cargo.lock | 105 +++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9be9e50cc737..09f09409da1b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -123,11 +123,12 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "3.0.6" +version = "3.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" +checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" dependencies = [ "anstyle", + "once_cell", "windows-sys 0.59.0", ] @@ -531,7 +532,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json 0.61.2", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -553,7 +554,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json 0.61.2", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -575,7 +576,7 @@ dependencies = [ "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json 0.61.1", + "aws-smithy-json 0.61.2", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -613,9 +614,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.3" +version = "1.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427cb637d15d63d6f9aae26358e1c9a9c09d5aa490d64b09354c8217cfef0f28" +checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e" dependencies = [ "futures-util", "pin-project-lite", @@ -624,9 +625,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.11" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -653,9 +654,9 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.1" +version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" +checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422" dependencies = [ "aws-smithy-types", ] @@ -672,9 +673,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.6" +version = "1.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a05dd41a70fc74051758ee75b5c4db2c0ca070ed9229c3df50e9475cda1cb985" +checksum = "865f7050bbc7107a6c98a397a9fcd9413690c27fa718446967cf03b2d3ac517e" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -716,9 +717,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.11" +version = "1.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38ddc9bd6c28aeb303477170ddd183760a956a03e083b3902a990238a7e3792d" +checksum = "a28f6feb647fb5e0d5b50f0472c19a7db9462b74e2fec01bb0b44eedcc834e97" dependencies = [ "base64-simd", "bytes", @@ -822,9 +823,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.7.0" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1be3f42a67d6d345ecd59f675f3f012d6974981560836e938c22b424b85ce1be" +checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" [[package]] name = "blake2" @@ -1592,10 +1593,14 @@ name = "datafusion-physical-optimizer" version = "44.0.0" dependencies = [ "arrow", + "arrow-schema", "datafusion-common", "datafusion-execution", + "datafusion-expr", "datafusion-expr-common", + "datafusion-functions-aggregate", "datafusion-physical-expr", + "datafusion-physical-expr-common", "datafusion-physical-plan", "futures", "itertools 0.14.0", @@ -2459,9 +2464,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.76" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ "once_cell", "wasm-bindgen", @@ -2589,7 +2594,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "libc", ] @@ -2617,9 +2622,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" [[package]] name = "lz4_flex" @@ -2674,9 +2679,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "miniz_oxide" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ffbe83022cedc1d264172192511ae958937694cd57ce297164951b8b3568394" +checksum = "b8402cab7aefae129c6977bb0ff1b8fd9a04eb5b51efc50a70bea51cda0c7924" dependencies = [ "adler2", ] @@ -2707,7 +2712,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "cfg-if", "cfg_aliases 0.1.1", "libc", @@ -2869,9 +2874,9 @@ dependencies = [ [[package]] name = "outref" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] name = "parking_lot" @@ -3234,7 +3239,7 @@ version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03a862b389f93e68874fbf580b9de08dd02facb9a788ebadaf4a3fd33cf58834" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", ] [[package]] @@ -3413,7 +3418,7 @@ version = "0.38.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a78891ee6bf2340288408954ac787aa063d8e8817e9f53abb37c695c6d834ef6" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "errno", "libc", "linux-raw-sys", @@ -3530,7 +3535,7 @@ version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "cfg-if", "clipboard-win", "fd-lock", @@ -3592,7 +3597,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -3605,7 +3610,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags 2.7.0", + "bitflags 2.8.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -4266,9 +4271,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.1" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b913a3b5fe84142e269d63cc62b64319ccaf89b748fc31fe025177f767a756c4" +checksum = "744018581f9a3454a9e15beb8a33b017183f1e7c0cd170232a2d1453b23a51c4" dependencies = [ "getrandom", "serde", @@ -4322,20 +4327,21 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if", "once_cell", + "rustversion", "wasm-bindgen-macro", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", "log", @@ -4347,9 +4353,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.49" +version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ "cfg-if", "js-sys", @@ -4360,9 +4366,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4370,9 +4376,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", @@ -4383,9 +4389,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.99" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] [[package]] name = "wasm-streams" @@ -4402,9 +4411,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.76" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" dependencies = [ "js-sys", "wasm-bindgen", From da25978cbe3c3dbef148c8a98066be4ff980e243 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 15 Jan 2025 11:49:23 -0500 Subject: [PATCH 4/4] fix cargo doc --- datafusion/physical-optimizer/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-optimizer/src/test_utils.rs b/datafusion/physical-optimizer/src/test_utils.rs index b8c920d866e0..dc68f1dc9764 100644 --- a/datafusion/physical-optimizer/src/test_utils.rs +++ b/datafusion/physical-optimizer/src/test_utils.rs @@ -251,7 +251,7 @@ impl RequirementsTestExec { self } - /// returns this ExecutionPlan as an Arc + /// returns this ExecutionPlan as an `Arc` pub fn into_arc(self) -> Arc { Arc::new(self) }