From f3b1141d0f417e9d9e6c0ada03592c9d9ec60cd4 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Fri, 20 Dec 2024 17:51:06 +0800 Subject: [PATCH] Replace `execution_mode` with `emission_type` and `boundedness` (#13823) * feat: update execution modes and add bitflags dependency - Introduced `Incremental` execution mode alongside existing modes in the DataFusion execution plan. - Updated various execution plans to utilize the new `Incremental` mode where applicable, enhancing streaming capabilities. - Added `bitflags` dependency to `Cargo.toml` for better management of execution modes. - Adjusted execution mode handling in multiple files to ensure compatibility with the new structure. * add exec API Signed-off-by: Jay Zhan * replace done but has stackoverflow Signed-off-by: Jay Zhan * exec API done Signed-off-by: Jay Zhan * Refactor execution plan properties to remove execution mode - Removed the `ExecutionMode` parameter from `PlanProperties` across multiple physical plan implementations. - Updated related functions to utilize the new structure, ensuring compatibility with the changes. - Adjusted comments and cleaned up imports to reflect the removal of execution mode handling. This refactor simplifies the execution plan properties and enhances maintainability. * Refactor execution plan to remove `ExecutionMode` and introduce `EmissionType` - Removed the `ExecutionMode` parameter from `PlanProperties` and related implementations across multiple files. - Introduced `EmissionType` to better represent the output characteristics of execution plans. - Updated functions and tests to reflect the new structure, ensuring compatibility and enhancing maintainability. - Cleaned up imports and adjusted comments accordingly. This refactor simplifies the execution plan properties and improves the clarity of memory handling in execution plans. * fix test Signed-off-by: Jay Zhan * Refactor join handling and emission type logic - Updated test cases in `sanity_checker.rs` to reflect changes in expected outcomes for bounded and unbounded joins, ensuring accurate test coverage. - Simplified the `is_pipeline_breaking` method in `execution_plan.rs` to clarify the conditions under which a plan is considered pipeline-breaking. - Enhanced the emission type determination logic in `execution_plan.rs` to prioritize `Final` over `Both` and `Incremental`, improving clarity in execution plan behavior. - Adjusted join type handling in `hash_join.rs` to classify `Right` joins as `Incremental`, allowing for immediate row emission. These changes improve the accuracy of tests and the clarity of execution plan properties. * Implement emission type for execution plans - Updated multiple execution plan implementations to replace `unimplemented!()` with `EmissionType::Incremental`, ensuring that the emission type is correctly defined for various plans. - This change enhances the clarity and functionality of the execution plans by explicitly specifying their emission behavior. These updates contribute to a more robust execution plan framework within the DataFusion project. * Enhance join type documentation and refine emission type logic - Updated the `JoinType` enum in `join_type.rs` to include detailed descriptions for each join type, improving clarity on their behavior and expected results. - Modified the emission type logic in `hash_join.rs` to ensure that `Right` and `RightAnti` joins are classified as `Incremental`, allowing for immediate row emission when applicable. These changes improve the documentation and functionality of join operations within the DataFusion project. * Refactor emission type logic in join and sort execution plans - Updated the emission type determination in `SortMergeJoinExec` and `SymmetricHashJoinExec` to utilize the `emission_type_from_children` function, enhancing the accuracy of emission behavior based on input characteristics. - Clarified comments in `sort.rs` regarding the conditions under which results are emitted, emphasizing the relationship between input sorting and emission type. - These changes improve the clarity and functionality of the execution plans within the DataFusion project, ensuring more robust handling of emission types. * Refactor emission type handling in execution plans - Updated the `emission_type_from_children` function to accept an iterator instead of a slice, enhancing flexibility in how child execution plans are passed. - Modified the `SymmetricHashJoinExec` implementation to utilize the new function signature, improving code clarity and maintainability. These changes streamline the emission type determination process within the DataFusion project, contributing to a more robust execution plan framework. * Enhance execution plan properties with boundedness and emission type - Introduced `boundedness` and `pipeline_behavior` methods to the `ExecutionPlanProperties` trait, improving the handling of execution plan characteristics. - Updated the `CsvExec`, `SortExec`, and related implementations to utilize the new methods for determining boundedness and emission behavior. - Refactored the `ensure_distribution` function to use the new boundedness logic, enhancing clarity in distribution decisions. - These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project. * Refactor execution plans to enhance boundedness and emission type handling - Updated multiple execution plan implementations to incorporate `Boundedness` and `EmissionType`, improving the clarity and functionality of execution plans. - Replaced instances of `unimplemented!()` with appropriate emission types, ensuring that plans correctly define their output behavior. - Refactored the `PlanProperties` structure to utilize the new boundedness logic, enhancing decision-making in execution plans. - These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project. * Refactor memory handling in execution plans - Updated the condition for checking memory requirements in execution plans from `has_finite_memory()` to `boundedness().requires_finite_memory()`, improving clarity in memory management. - This change enhances the robustness of execution plans within the DataFusion project by ensuring more accurate assessments of memory constraints. * Refactor boundedness checks in execution plans - Updated conditions for checking boundedness in various execution plans to use `is_unbounded()` instead of `requires_finite_memory()`, enhancing clarity in memory management. - Adjusted the `PlanProperties` structure to reflect these changes, ensuring more accurate assessments of memory constraints across the DataFusion project. - These modifications contribute to a more robust and maintainable execution plan framework, improving the handling of boundedness in execution strategies. * Remove TODO comment regarding unbounded execution plans in `UnboundedExec` implementation - Eliminated the outdated comment suggesting a switch to unbounded execution with finite memory, streamlining the code and improving clarity. - This change contributes to a cleaner and more maintainable codebase within the DataFusion project. * Refactor execution plan boundedness and emission type handling - Updated the `is_pipeline_breaking` method to use `requires_finite_memory()` for improved clarity in determining pipeline behavior. - Enhanced the `Boundedness` enum to include detailed documentation on memory requirements for unbounded streams. - Refactored `compute_properties` methods in `GlobalLimitExec` and `LocalLimitExec` to directly use the input's boundedness, simplifying the logic. - Adjusted emission type determination in `NestedLoopJoinExec` to utilize the `emission_type_from_children` function, ensuring accurate output behavior based on input characteristics. These changes contribute to a more robust and maintainable execution plan framework within the DataFusion project, improving clarity and functionality in handling boundedness and emission types. * Refactor emission type and boundedness handling in execution plans - Removed the `OptionalEmissionType` struct from `plan_properties.rs`, simplifying the codebase. - Updated the `is_pipeline_breaking` function in `execution_plan.rs` for improved readability by formatting the condition across multiple lines. - Adjusted the `GlobalLimitExec` implementation in `limit.rs` to directly use the input's boundedness, enhancing clarity in memory management. These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, improving the handling of emission types and boundedness. * Refactor GlobalLimitExec and LocalLimitExec to enhance boundedness handling - Updated the `compute_properties` methods in both `GlobalLimitExec` and `LocalLimitExec` to replace `EmissionType::Final` with `Boundedness::Bounded`, reflecting that limit operations always produce a finite number of rows. - Changed the input's boundedness reference to `pipeline_behavior()` for improved clarity in execution plan properties. These changes contribute to a more streamlined and maintainable execution plan framework within the DataFusion project, enhancing the handling of boundedness in limit operations. * Review Part1 * Update sanity_checker.rs * addressing reviews * Review Part 1 * Update datafusion/physical-plan/src/execution_plan.rs * Update datafusion/physical-plan/src/execution_plan.rs * Shorten imports * Enhance documentation for JoinType and Boundedness enums - Improved descriptions for the Inner and Full join types in join_type.rs to clarify their behavior and examples. - Added explanations regarding the boundedness of output streams and memory requirements in execution_plan.rs, including specific examples for operators like Median and Min/Max. --------- Signed-off-by: Jay Zhan Co-authored-by: berkaysynnada Co-authored-by: Mehmet Ozan Kabak --- datafusion-cli/src/exec.rs | 14 +- .../examples/custom_datasource.rs | 8 +- datafusion/common/src/join_type.rs | 25 +- .../datasource/physical_plan/arrow_file.rs | 6 +- .../core/src/datasource/physical_plan/avro.rs | 8 +- .../core/src/datasource/physical_plan/csv.rs | 8 +- .../core/src/datasource/physical_plan/json.rs | 8 +- .../datasource/physical_plan/parquet/mod.rs | 11 +- .../enforce_distribution.rs | 15 +- .../src/physical_optimizer/enforce_sorting.rs | 2 +- .../src/physical_optimizer/join_selection.rs | 28 +- .../replace_with_order_preserving_variants.rs | 6 +- .../src/physical_optimizer/sanity_checker.rs | 19 +- datafusion/core/src/physical_planner.rs | 11 +- datafusion/core/src/test/mod.rs | 13 +- datafusion/core/src/test_util/mod.rs | 23 +- .../core/tests/custom_sources_cases/mod.rs | 12 +- .../provider_filter_pushdown.rs | 13 +- .../tests/custom_sources_cases/statistics.rs | 14 +- .../tests/user_defined/insert_operation.rs | 18 +- .../tests/user_defined/user_defined_plan.rs | 19 +- datafusion/ffi/src/execution_plan.rs | 11 +- datafusion/ffi/src/plan_properties.rs | 107 ++++++-- .../src/output_requirements.rs | 3 +- .../physical-plan/src/aggregates/mod.rs | 36 +-- datafusion/physical-plan/src/analyze.rs | 10 +- .../physical-plan/src/coalesce_batches.rs | 3 +- .../physical-plan/src/coalesce_partitions.rs | 3 +- datafusion/physical-plan/src/empty.rs | 20 +- .../physical-plan/src/execution_plan.rs | 253 ++++++++++++------ datafusion/physical-plan/src/explain.rs | 9 +- datafusion/physical-plan/src/filter.rs | 4 +- datafusion/physical-plan/src/insert.rs | 8 +- .../physical-plan/src/joins/cross_join.rs | 20 +- .../physical-plan/src/joins/hash_join.rs | 48 ++-- .../src/joins/nested_loop_join.rs | 37 ++- .../src/joins/sort_merge_join.rs | 21 +- .../src/joins/symmetric_hash_join.rs | 12 +- datafusion/physical-plan/src/lib.rs | 3 +- datafusion/physical-plan/src/limit.rs | 12 +- datafusion/physical-plan/src/memory.rs | 16 +- .../physical-plan/src/placeholder_row.rs | 17 +- datafusion/physical-plan/src/projection.rs | 3 +- .../physical-plan/src/recursive_query.rs | 6 +- .../physical-plan/src/repartition/mod.rs | 10 +- .../physical-plan/src/sorts/partial_sort.rs | 10 +- datafusion/physical-plan/src/sorts/sort.rs | 68 +++-- .../src/sorts/sort_preserving_merge.rs | 16 +- datafusion/physical-plan/src/streaming.rs | 23 +- datafusion/physical-plan/src/test/exec.rs | 51 ++-- datafusion/physical-plan/src/union.rs | 19 +- datafusion/physical-plan/src/unnest.rs | 9 +- datafusion/physical-plan/src/values.rs | 13 +- .../src/windows/bounded_window_agg_exec.rs | 8 +- datafusion/physical-plan/src/windows/mod.rs | 2 +- .../src/windows/window_agg_exec.rs | 23 +- datafusion/physical-plan/src/work_table.rs | 10 +- 57 files changed, 748 insertions(+), 457 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 18906536691e..a4f154b2de92 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -33,11 +33,12 @@ use crate::{ }; use datafusion::common::instant::Instant; -use datafusion::common::plan_datafusion_err; +use datafusion::common::{plan_datafusion_err, plan_err}; use datafusion::config::ConfigFileType; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; +use datafusion::physical_plan::execution_plan::EmissionType; use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser::dialect::dialect_from_str; @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print( let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - if physical_plan.execution_mode().is_unbounded() { + if physical_plan.boundedness().is_unbounded() { + if physical_plan.pipeline_behavior() == EmissionType::Final { + return plan_err!( + "The given query can generate a valid result only once \ + the source finishes, but the source is unbounded" + ); + } + // As the input stream comes, we can generate results. + // However, memory safety is not guaranteed. let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { + // Bounded stream; collected results are printed after all input consumed. let schema = physical_plan.schema(); let results = collect(physical_plan, task_ctx.clone()).await?; adjusted.into_inner().print_batches(schema, &results, now)?; diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 90e9d2c7a632..bc865fac5a33 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -30,10 +30,11 @@ use datafusion::error::Result; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::LogicalPlanBuilder; use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ - project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, SendableRecordBatchStream, + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, }; use datafusion::prelude::*; @@ -214,7 +215,8 @@ impl CustomExec { PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index e98f34199b27..bdca253c5f64 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -28,21 +28,30 @@ use crate::{DataFusionError, Result}; /// Join type #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)] pub enum JoinType { - /// Inner Join + /// Inner Join - Returns only rows where there is a matching value in both tables based on the join condition. + /// For example, if joining table A and B on A.id = B.id, only rows where A.id equals B.id will be included. + /// All columns from both tables are returned for the matching rows. Non-matching rows are excluded entirely. Inner, - /// Left Join + /// Left Join - Returns all rows from the left table and matching rows from the right table. + /// If no match, NULL values are returned for columns from the right table. Left, - /// Right Join + /// Right Join - Returns all rows from the right table and matching rows from the left table. + /// If no match, NULL values are returned for columns from the left table. Right, - /// Full Join + /// Full Join (also called Full Outer Join) - Returns all rows from both tables, matching rows where possible. + /// When a row from either table has no match in the other table, the missing columns are filled with NULL values. + /// For example, if table A has row X with no match in table B, the result will contain row X with NULL values for all of table B's columns. + /// This join type preserves all records from both tables, making it useful when you need to see all data regardless of matches. Full, - /// Left Semi Join + /// Left Semi Join - Returns rows from the left table that have matching rows in the right table. + /// Only columns from the left table are returned. LeftSemi, - /// Right Semi Join + /// Right Semi Join - Returns rows from the right table that have matching rows in the left table. + /// Only columns from the right table are returned. RightSemi, - /// Left Anti Join + /// Left Anti Join - Returns rows from the left table that do not have a matching row in the right table. LeftAnti, - /// Right Anti Join + /// Right Anti Join - Returns rows from the right table that do not have a matching row in the left table. RightAnti, /// Left Mark join /// diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 8df5ef82cd0c..4e76b087abb1 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::Statistics; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; -use datafusion_physical_plan::{ExecutionMode, PlanProperties}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::PlanProperties; use futures::StreamExt; use itertools::Itertools; @@ -97,7 +98,8 @@ impl ArrowExec { PlanProperties::new( eq_properties, Self::output_partitioning_helper(file_scan_config), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EmissionType::Incremental, + Boundedness::Bounded, ) } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index def6504189bb..fb36179c3cf6 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -24,13 +24,14 @@ use super::FileScanConfig; use crate::error::Result; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] @@ -81,7 +82,8 @@ impl AvroExec { PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index c54c663dca7d..a00e74cf4fcd 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, }; use arrow::csv; @@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; @@ -327,7 +328,8 @@ impl CsvExec { PlanProperties::new( eq_properties, Self::output_partitioning_helper(file_scan_config), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EmissionType::Incremental, + Boundedness::Bounded, ) } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 5c70968fbb42..879c9817a382 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, }; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; @@ -107,7 +108,8 @@ impl NdJsonExec { PlanProperties::new( eq_properties, Self::output_partitioning_helper(file_scan_config), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EmissionType::Incremental, + Boundedness::Bounded, ) } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 446d5f531185..cb79055ce301 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -34,13 +34,14 @@ use crate::{ physical_optimizer::pruning::PruningPredicate, physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, + DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, }, }; use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use itertools::Itertools; use log::debug; @@ -654,13 +655,11 @@ impl ParquetExec { orderings: &[LexOrdering], file_config: &FileScanConfig, ) -> PlanProperties { - // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new_with_orderings(schema, orderings), Self::output_partitioning_helper(file_config), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EmissionType::Incremental, + Boundedness::Bounded, ) } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 27323eaedccc..76c4d668d797 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -52,12 +52,13 @@ use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef, }; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_optimizer::output_requirements::OutputRequirementExec; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use datafusion_physical_plan::ExecutionPlanProperties; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; /// The `EnforceDistribution` rule ensures that distribution requirements are @@ -1161,12 +1162,17 @@ fn ensure_distribution( let should_use_estimates = config .execution .use_row_number_estimates_to_optimize_partitioning; - let is_unbounded = dist_context.plan.execution_mode().is_unbounded(); + let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded() + && matches!( + dist_context.plan.pipeline_behavior(), + EmissionType::Incremental | EmissionType::Both + ); // Use order preserving variants either of the conditions true // - it is desired according to config // - when plan is unbounded + // - when it is pipeline friendly (can incrementally produce results) let order_preserving_variants_desirable = - is_unbounded || config.optimizer.prefer_existing_sort; + unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort; // Remove unnecessary repartition from the physical plan if any let DistributionContext { @@ -1459,7 +1465,8 @@ pub(crate) mod tests { PlanProperties::new( input.equivalence_properties().clone(), // Equivalence Properties input.output_partitioning().clone(), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), // Pipeline Behavior + input.boundedness(), // Boundedness ) } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index cfc08562f7d7..85fe9ecfcdb0 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -214,7 +214,7 @@ fn replace_with_partial_sort( let plan_any = plan.as_any(); if let Some(sort_plan) = plan_any.downcast_ref::() { let child = Arc::clone(sort_plan.children()[0]); - if !child.execution_mode().is_unbounded() { + if !child.boundedness().is_unbounded() { return Ok(plan); } diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 9d65c6ded423..009757f3a938 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::execution_plan::EmissionType; /// The [`JoinSelection`] rule tries to modify a given plan so that it can /// accommodate infinite sources and optimize joins in the plan according to @@ -516,7 +517,8 @@ fn statistical_join_selection_subrule( pub type PipelineFixerSubrule = dyn Fn(Arc, &ConfigOptions) -> Result>; -/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides. +/// Converts a hash join to a symmetric hash join if both its inputs are +/// unbounded and incremental. /// /// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing /// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and @@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule( ) -> Result> { // Check if the current plan node is a HashJoinExec. if let Some(hash_join) = input.as_any().downcast_ref::() { - let left_unbounded = hash_join.left.execution_mode().is_unbounded(); - let right_unbounded = hash_join.right.execution_mode().is_unbounded(); - // Process only if both left and right sides are unbounded. - if left_unbounded && right_unbounded { + let left_unbounded = hash_join.left.boundedness().is_unbounded(); + let left_incremental = matches!( + hash_join.left.pipeline_behavior(), + EmissionType::Incremental | EmissionType::Both + ); + let right_unbounded = hash_join.right.boundedness().is_unbounded(); + let right_incremental = matches!( + hash_join.right.pipeline_behavior(), + EmissionType::Incremental | EmissionType::Both + ); + // Process only if both left and right sides are unbounded and incrementally emit. + if left_unbounded && right_unbounded & left_incremental & right_incremental { // Determine the partition mode based on configuration. let mode = if config_options.optimizer.repartition_joins { StreamJoinPartitionMode::Partitioned @@ -669,8 +679,8 @@ fn hash_join_swap_subrule( _config_options: &ConfigOptions, ) -> Result> { if let Some(hash_join) = input.as_any().downcast_ref::() { - if hash_join.left.execution_mode().is_unbounded() - && !hash_join.right.execution_mode().is_unbounded() + if hash_join.left.boundedness().is_unbounded() + && !hash_join.right.boundedness().is_unbounded() && matches!( *hash_join.join_type(), JoinType::Inner @@ -2025,12 +2035,12 @@ mod hash_join_tests { assert_eq!( ( t.case.as_str(), - if left.execution_mode().is_unbounded() { + if left.boundedness().is_unbounded() { SourceType::Unbounded } else { SourceType::Bounded }, - if right.execution_mode().is_unbounded() { + if right.boundedness().is_unbounded() { SourceType::Unbounded } else { SourceType::Bounded diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 2f6b7a51ee75..96b2454fa330 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -29,11 +29,12 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::tree_node::PlanContext; use datafusion_physical_plan::ExecutionPlanProperties; -use datafusion_physical_expr_common::sort_expr::LexOrdering; use itertools::izip; /// For a given `plan`, this object carries the information one needs from its @@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants( // For unbounded cases, we replace with the order-preserving variant in any // case, as doing so helps fix the pipeline. Also replace if config allows. let use_order_preserving_variant = config.optimizer.prefer_existing_sort - || !requirements.plan.execution_mode().pipeline_friendly(); + || (requirements.plan.boundedness().is_unbounded() + && requirements.plan.pipeline_behavior() == EmissionType::Final); // Create an alternate plan with order-preserving variants: let mut alternate_plan = plan_with_order_preserving_variants( diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 99bd1cab3ed4..b6d22320d086 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -30,6 +30,7 @@ use datafusion_common::config::{ConfigOptions, OptimizerOptions}; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; @@ -85,7 +86,15 @@ pub fn check_finiteness_requirements( the 'allow_symmetric_joins_without_pruning' configuration flag"); } } - if !input.execution_mode().pipeline_friendly() { + + if matches!( + input.boundedness(), + Boundedness::Unbounded { + requires_infinite_memory: true + } + ) || (input.boundedness().is_unbounded() + && input.pipeline_behavior() == EmissionType::Final) + { plan_err!( "Cannot execute pipeline breaking queries, operator: {:?}", input @@ -215,7 +224,9 @@ mod tests { let test2 = BinaryTestCase { source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, + // Left join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, }; let test3 = BinaryTestCase { source_types: (SourceType::Bounded, SourceType::Bounded), @@ -290,7 +301,9 @@ mod tests { }; let test2 = BinaryTestCase { source_types: (SourceType::Bounded, SourceType::Unbounded), - expect_fail: true, + // Full join for bounded build side and unbounded probe side can generate + // both incremental matched rows and final non-matched rows. + expect_fail: false, }; let test3 = BinaryTestCase { source_types: (SourceType::Bounded, SourceType::Bounded), diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 44537c951f94..47b31d2f4e2d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2017,7 +2017,7 @@ mod tests { use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::MemTable; use crate::physical_plan::{ - expressions, DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties, + expressions, DisplayAs, DisplayFormatType, PlanProperties, SendableRecordBatchStream, }; use crate::prelude::{SessionConfig, SessionContext}; @@ -2032,6 +2032,7 @@ mod tests { use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore}; use datafusion_functions_aggregate::expr_fn::sum; use datafusion_physical_expr::EquivalenceProperties; + use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); @@ -2619,13 +2620,11 @@ mod tests { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, - // Output Partitioning + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - // Execution Mode - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index d8304c2f0a86..e5ce28e73806 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -45,10 +45,9 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; -use datafusion_physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties, -}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, PlanProperties}; #[cfg(feature = "compression")] use bzip2::write::BzEncoder; @@ -386,13 +385,11 @@ impl StatisticsExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, - // Output Partitioning + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(2), - // Execution Mode - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 09608887c0f1..aa134f28fe5b 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -39,22 +39,23 @@ use crate::error::Result; use crate::execution::context::TaskContext; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, }; use crate::prelude::{CsvReadOptions, SessionContext}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +use datafusion_catalog::Session; use datafusion_common::TableReference; use datafusion_expr::utils::COUNT_STAR_EXPANSION; use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType}; use datafusion_functions_aggregate::count::count_udaf; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::{expressions, EquivalenceProperties, PhysicalExpr}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; -use datafusion_catalog::Session; -use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use futures::Stream; use tempfile::TempDir; // backwards compatibility @@ -259,16 +260,18 @@ impl UnboundedExec { batch_produce: Option, n_partitions: usize, ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - let mode = if batch_produce.is_none() { - ExecutionMode::Unbounded + let boundedness = if batch_produce.is_none() { + Boundedness::Unbounded { + requires_infinite_memory: false, + } } else { - ExecutionMode::Bounded + Boundedness::Bounded }; PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(n_partitions), - mode, + EmissionType::Incremental, + boundedness, ) } } diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index e1bd14105e23..aafefac04e32 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -35,15 +35,16 @@ use datafusion::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion::scalar::ScalarValue; +use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; use datafusion_common::project_schema; use datafusion_common::stats::Precision; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; -use datafusion_physical_plan::{ExecutionMode, PlanProperties}; +use datafusion_physical_plan::PlanProperties; use async_trait::async_trait; -use datafusion_catalog::Session; use futures::stream::Stream; mod provider_filter_pushdown; @@ -91,12 +92,11 @@ impl CustomExecutionPlan { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, - // Output Partitioning + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 09f7265d639a..af0506a50558 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -28,19 +28,20 @@ use datafusion::execution::context::TaskContext; use datafusion::logical_expr::TableProviderFilterPushDown; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, Statistics, }; use datafusion::prelude::*; use datafusion::scalar::ScalarValue; +use datafusion_catalog::Session; use datafusion_common::cast::as_primitive_array; use datafusion_common::{internal_err, not_impl_err}; use datafusion_expr::expr::{BinaryExpr, Cast}; use datafusion_functions_aggregate::expr_fn::count; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; -use datafusion_catalog::Session; fn create_batch(value: i32, num_rows: usize) -> Result { let mut builder = Int32Builder::with_capacity(num_rows); @@ -72,11 +73,11 @@ impl CustomPlan { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 41d182a3767b..9d3bd594a929 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -26,17 +26,18 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, }, prelude::SessionContext, scalar::ScalarValue, }; +use datafusion_catalog::Session; use datafusion_common::{project_schema, stats::Precision}; use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use async_trait::async_trait; -use datafusion_catalog::Session; /// This is a testing structure for statistics /// It will act both as a table provider and execution plan @@ -64,12 +65,11 @@ impl StatisticsValidation { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(2), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index ff14fa0be3fb..aa531632c60b 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -26,7 +26,10 @@ use datafusion::{ use datafusion_catalog::{Session, TableProvider}; use datafusion_expr::{dml::InsertOp, Expr, TableType}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; -use datafusion_physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties}; +use datafusion_physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + DisplayAs, ExecutionPlan, PlanProperties, +}; #[tokio::test] async fn insert_operation_is_passed_correctly_to_table_provider() { @@ -122,15 +125,14 @@ struct TestInsertExec { impl TestInsertExec { fn new(op: InsertOp) -> Self { - let eq_properties = EquivalenceProperties::new(make_count_schema()); - let plan_properties = PlanProperties::new( - eq_properties, - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ); Self { op, - plan_properties, + plan_properties: PlanProperties::new( + EquivalenceProperties::new(make_count_schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ), } } } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 520a91aeb4d6..77753290c37e 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -68,9 +68,6 @@ use arrow::{ record_batch::RecordBatch, util::pretty::pretty_format_batches, }; -use async_trait::async_trait; -use futures::{Stream, StreamExt}; - use datafusion::execution::session_state::SessionStateBuilder; use datafusion::{ common::cast::{as_int64_array, as_string_array}, @@ -87,9 +84,8 @@ use datafusion::{ optimizer::{OptimizerConfig, OptimizerRule}, physical_expr::EquivalenceProperties, physical_plan::{ - DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }, physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, prelude::{SessionConfig, SessionContext}, @@ -100,6 +96,10 @@ use datafusion_common::ScalarValue; use datafusion_expr::{FetchType, Projection, SortExpr}; use datafusion_optimizer::optimizer::ApplyOrder; use datafusion_optimizer::AnalyzerRule; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; + +use async_trait::async_trait; +use futures::{Stream, StreamExt}; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. @@ -495,12 +495,11 @@ impl TopKExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index d10eda8990b8..5ab321cc0114 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -272,7 +272,13 @@ impl ExecutionPlan for ForeignExecutionPlan { #[cfg(test)] mod tests { - use datafusion::{physical_plan::Partitioning, prelude::SessionContext}; + use datafusion::{ + physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + Partitioning, + }, + prelude::SessionContext, + }; use super::*; @@ -287,7 +293,8 @@ mod tests { props: PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), - datafusion::physical_plan::ExecutionMode::Unbounded, + EmissionType::Incremental, + Boundedness::Bounded, ), } } diff --git a/datafusion/ffi/src/plan_properties.rs b/datafusion/ffi/src/plan_properties.rs index 722681ae4a1d..3c7bc886aede 100644 --- a/datafusion/ffi/src/plan_properties.rs +++ b/datafusion/ffi/src/plan_properties.rs @@ -28,7 +28,10 @@ use arrow::datatypes::SchemaRef; use datafusion::{ error::{DataFusionError, Result}, physical_expr::EquivalenceProperties, - physical_plan::{ExecutionMode, PlanProperties}, + physical_plan::{ + execution_plan::{Boundedness, EmissionType}, + PlanProperties, + }, prelude::SessionContext, }; use datafusion_proto::{ @@ -53,8 +56,11 @@ pub struct FFI_PlanProperties { pub output_partitioning: unsafe extern "C" fn(plan: &Self) -> RResult, RStr<'static>>, - /// Return the execution mode of the plan. - pub execution_mode: unsafe extern "C" fn(plan: &Self) -> FFI_ExecutionMode, + /// Return the emission type of the plan. + pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType, + + /// Indicate boundedness of the plan and its memory requirements. + pub boundedness: unsafe extern "C" fn(plan: &Self) -> FFI_Boundedness, /// The output ordering is a [`PhysicalSortExprNodeCollection`] protobuf message /// serialized into bytes to pass across the FFI boundary. @@ -98,12 +104,20 @@ unsafe extern "C" fn output_partitioning_fn_wrapper( ROk(output_partitioning.into()) } -unsafe extern "C" fn execution_mode_fn_wrapper( +unsafe extern "C" fn emission_type_fn_wrapper( properties: &FFI_PlanProperties, -) -> FFI_ExecutionMode { +) -> FFI_EmissionType { let private_data = properties.private_data as *const PlanPropertiesPrivateData; let props = &(*private_data).props; - props.execution_mode().into() + props.emission_type.into() +} + +unsafe extern "C" fn boundedness_fn_wrapper( + properties: &FFI_PlanProperties, +) -> FFI_Boundedness { + let private_data = properties.private_data as *const PlanPropertiesPrivateData; + let props = &(*private_data).props; + props.boundedness.into() } unsafe extern "C" fn output_ordering_fn_wrapper( @@ -164,7 +178,8 @@ impl From<&PlanProperties> for FFI_PlanProperties { FFI_PlanProperties { output_partitioning: output_partitioning_fn_wrapper, - execution_mode: execution_mode_fn_wrapper, + emission_type: emission_type_fn_wrapper, + boundedness: boundedness_fn_wrapper, output_ordering: output_ordering_fn_wrapper, schema: schema_fn_wrapper, release: release_fn_wrapper, @@ -220,9 +235,6 @@ impl TryFrom for PlanProperties { RErr(e) => Err(DataFusionError::Plan(e.to_string())), }?; - let execution_mode: ExecutionMode = - unsafe { (ffi_props.execution_mode)(&ffi_props).into() }; - let eq_properties = match orderings { Some(ordering) => { EquivalenceProperties::new_with_orderings(Arc::new(schema), &[ordering]) @@ -230,40 +242,82 @@ impl TryFrom for PlanProperties { None => EquivalenceProperties::new(Arc::new(schema)), }; + let emission_type: EmissionType = + unsafe { (ffi_props.emission_type)(&ffi_props).into() }; + + let boundedness: Boundedness = + unsafe { (ffi_props.boundedness)(&ffi_props).into() }; + Ok(PlanProperties::new( eq_properties, partitioning, - execution_mode, + emission_type, + boundedness, )) } } -/// FFI safe version of [`ExecutionMode`]. +/// FFI safe version of [`Boundedness`]. #[repr(C)] #[allow(non_camel_case_types)] #[derive(Clone, StableAbi)] -pub enum FFI_ExecutionMode { +pub enum FFI_Boundedness { Bounded, - Unbounded, - PipelineBreaking, + Unbounded { requires_infinite_memory: bool }, +} + +impl From for FFI_Boundedness { + fn from(value: Boundedness) -> Self { + match value { + Boundedness::Bounded => FFI_Boundedness::Bounded, + Boundedness::Unbounded { + requires_infinite_memory, + } => FFI_Boundedness::Unbounded { + requires_infinite_memory, + }, + } + } +} + +impl From for Boundedness { + fn from(value: FFI_Boundedness) -> Self { + match value { + FFI_Boundedness::Bounded => Boundedness::Bounded, + FFI_Boundedness::Unbounded { + requires_infinite_memory, + } => Boundedness::Unbounded { + requires_infinite_memory, + }, + } + } +} + +/// FFI safe version of [`EmissionType`]. +#[repr(C)] +#[allow(non_camel_case_types)] +#[derive(Clone, StableAbi)] +pub enum FFI_EmissionType { + Incremental, + Final, + Both, } -impl From for FFI_ExecutionMode { - fn from(value: ExecutionMode) -> Self { +impl From for FFI_EmissionType { + fn from(value: EmissionType) -> Self { match value { - ExecutionMode::Bounded => FFI_ExecutionMode::Bounded, - ExecutionMode::Unbounded => FFI_ExecutionMode::Unbounded, - ExecutionMode::PipelineBreaking => FFI_ExecutionMode::PipelineBreaking, + EmissionType::Incremental => FFI_EmissionType::Incremental, + EmissionType::Final => FFI_EmissionType::Final, + EmissionType::Both => FFI_EmissionType::Both, } } } -impl From for ExecutionMode { - fn from(value: FFI_ExecutionMode) -> Self { +impl From for EmissionType { + fn from(value: FFI_EmissionType) -> Self { match value { - FFI_ExecutionMode::Bounded => ExecutionMode::Bounded, - FFI_ExecutionMode::Unbounded => ExecutionMode::Unbounded, - FFI_ExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking, + FFI_EmissionType::Incremental => EmissionType::Incremental, + FFI_EmissionType::Final => EmissionType::Final, + FFI_EmissionType::Both => EmissionType::Both, } } } @@ -283,7 +337,8 @@ mod tests { let original_props = PlanProperties::new( EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), - ExecutionMode::Unbounded, + EmissionType::Incremental, + Boundedness::Bounded, ); let local_props_ptr = FFI_PlanProperties::from(&original_props); diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 6c8e76bff82b..d5ffaad6d872 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -121,7 +121,8 @@ impl OutputRequirementExec { PlanProperties::new( input.equivalence_properties().clone(), // Equivalence Properties input.output_partitioning().clone(), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), // Pipeline Behavior + input.boundedness(), // Boundedness ) } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index feca5eb1db38..2e0103defd9f 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -20,11 +20,12 @@ use std::any::Any; use std::sync::Arc; -use super::{DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties}; +use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; use crate::aggregates::{ no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; +use crate::execution_plan::{CardinalityEffect, EmissionType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::get_field_metadata; use crate::windows::get_ordered_partition_by_indices; @@ -41,6 +42,7 @@ use datafusion_common::stats::Precision; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_execution::TaskContext; use datafusion_expr::{Accumulator, Aggregate}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::Column, @@ -48,8 +50,6 @@ use datafusion_physical_expr::{ PhysicalExpr, PhysicalSortRequirement, }; -use crate::execution_plan::CardinalityEffect; -use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use itertools::Itertools; pub(crate) mod group_values; @@ -663,16 +663,19 @@ impl AggregateExec { input_partitioning.clone() }; - // Determine execution mode: - let mut exec_mode = input.execution_mode(); - if exec_mode == ExecutionMode::Unbounded - && *input_order_mode == InputOrderMode::Linear - { - // Cannot run without breaking the pipeline - exec_mode = ExecutionMode::PipelineBreaking; - } + // TODO: Emission type and boundedness information can be enhanced here + let emission_type = if *input_order_mode == InputOrderMode::Linear { + EmissionType::Final + } else { + input.pipeline_behavior() + }; - PlanProperties::new(eq_properties, output_partitioning, exec_mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + input.boundedness(), + ) } pub fn input_order_mode(&self) -> &InputOrderMode { @@ -1298,6 +1301,7 @@ mod tests { use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; + use crate::execution_plan::Boundedness; use crate::expressions::col; use crate::memory::MemoryExec; use crate::test::assert_is_pending; @@ -1730,13 +1734,11 @@ mod tests { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, - // Output Partitioning + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - // Execution Mode - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c8b329fabdaa..1fc3280ceb16 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -89,10 +89,12 @@ impl AnalyzeExec { input: &Arc, schema: SchemaRef, ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - let output_partitioning = Partitioning::UnknownPartitioning(1); - let exec_mode = input.execution_mode(); - PlanProperties::new(eq_properties, output_partitioning, exec_mode) + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + input.pipeline_behavior(), + input.boundedness(), + ) } } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 11678e7a4696..fa8d125d62d1 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -97,7 +97,8 @@ impl CoalesceBatchesExec { PlanProperties::new( input.equivalence_properties().clone(), // Equivalence Properties input.output_partitioning().clone(), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), + input.boundedness(), ) } } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 3da101d6092f..7c1bdba2f339 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -70,7 +70,8 @@ impl CoalescePartitionsExec { PlanProperties::new( eq_properties, // Equivalence Properties Partitioning::UnknownPartitioning(1), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), + input.boundedness(), ) } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 192619f69f6a..5168c3cc101f 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -20,11 +20,12 @@ use std::any::Any; use std::sync::Arc; -use super::{ - common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream, - Statistics, +use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; +use crate::{ + execution_plan::{Boundedness, EmissionType}, + memory::MemoryStream, + DisplayFormatType, ExecutionPlan, Partitioning, }; -use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -74,14 +75,11 @@ impl EmptyExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - let output_partitioning = Self::output_partitioning_helper(n_partitions); PlanProperties::new( - eq_properties, - // Output Partitioning - output_partitioning, - // Execution Mode - ExecutionMode::Bounded, + EquivalenceProperties::new(schema), + Self::output_partitioning_helper(n_partitions), + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index ba9e4b0697c1..09bb80734401 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -431,11 +431,6 @@ pub trait ExecutionPlanProperties { /// partitions. fn output_partitioning(&self) -> &Partitioning; - /// Specifies whether this plan generates an infinite stream of records. - /// If the plan does not support pipelining, but its input(s) are - /// infinite, returns [`ExecutionMode::PipelineBreaking`] to indicate this. - fn execution_mode(&self) -> ExecutionMode; - /// If the output of this `ExecutionPlan` within each partition is sorted, /// returns `Some(keys)` describing the ordering. A `None` return value /// indicates no assumptions should be made on the output ordering. @@ -445,6 +440,14 @@ pub trait ExecutionPlanProperties { /// output if its input is sorted as it does not reorder the input rows. fn output_ordering(&self) -> Option<&LexOrdering>; + /// Boundedness information of the stream corresponding to this `ExecutionPlan`. + /// For more details, see [`Boundedness`]. + fn boundedness(&self) -> Boundedness; + + /// Indicates how the stream of this `ExecutionPlan` emits its results. + /// For more details, see [`EmissionType`]. + fn pipeline_behavior(&self) -> EmissionType; + /// Get the [`EquivalenceProperties`] within the plan. /// /// Equivalence properties tell DataFusion what columns are known to be @@ -470,14 +473,18 @@ impl ExecutionPlanProperties for Arc { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } @@ -488,95 +495,159 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { self.properties().output_partitioning() } - fn execution_mode(&self) -> ExecutionMode { - self.properties().execution_mode() - } - fn output_ordering(&self) -> Option<&LexOrdering> { self.properties().output_ordering() } + fn boundedness(&self) -> Boundedness { + self.properties().boundedness + } + + fn pipeline_behavior(&self) -> EmissionType { + self.properties().emission_type + } + fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } } -/// Describes the execution mode of the result of calling -/// [`ExecutionPlan::execute`] with respect to its size and behavior. +/// Represents whether a stream of data **generated** by an operator is bounded (finite) +/// or unbounded (infinite). /// -/// The mode of the execution plan is determined by the mode of its input -/// execution plans and the details of the operator itself. For example, a -/// `FilterExec` operator will have the same execution mode as its input, but a -/// `SortExec` operator may have a different execution mode than its input, -/// depending on how the input stream is sorted. +/// This is used to determine whether an execution plan will eventually complete +/// processing all its data (bounded) or could potentially run forever (unbounded). /// -/// There are three possible execution modes: `Bounded`, `Unbounded` and -/// `PipelineBreaking`. -#[derive(Clone, Copy, PartialEq, Debug)] -pub enum ExecutionMode { - /// The stream is bounded / finite. - /// - /// In this case the stream will eventually return `None` to indicate that - /// there are no more records to process. +/// For unbounded streams, it also tracks whether the operator requires finite memory +/// to process the stream or if memory usage could grow unbounded. +/// +/// Bounedness of the output stream is based on the the boundedness of the input stream and the nature of +/// the operator. For example, limit or topk with fetch operator can convert an unbounded stream to a bounded stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Boundedness { + /// The data stream is bounded (finite) and will eventually complete Bounded, - /// The stream is unbounded / infinite. - /// - /// In this case, the stream will never be done (never return `None`), - /// except in case of error. - /// - /// This mode is often used in "Steaming" use cases where data is - /// incrementally processed as it arrives. - /// - /// Note that even though the operator generates an unbounded stream of - /// results, it can execute with bounded memory and incrementally produces - /// output. - Unbounded, - /// Some of the operator's input stream(s) are unbounded, but the operator - /// cannot generate streaming results from these streaming inputs. - /// - /// In this case, the execution mode will be pipeline breaking, e.g. the - /// operator requires unbounded memory to generate results. This - /// information is used by the planner when performing sanity checks - /// on plans processings unbounded data sources. - PipelineBreaking, + /// The data stream is unbounded (infinite) and could run forever + Unbounded { + /// Whether this operator requires infinite memory to process the unbounded stream. + /// If false, the operator can process an infinite stream with bounded memory. + /// If true, memory usage may grow unbounded while processing the stream. + /// + /// For example, `Median` requires infinite memory to compute the median of an unbounded stream. + /// `Min/Max` requires infinite memory if the stream is unordered, but can be computed with bounded memory if the stream is ordered. + requires_infinite_memory: bool, + }, } -impl ExecutionMode { - /// Check whether the execution mode is unbounded or not. +impl Boundedness { pub fn is_unbounded(&self) -> bool { - matches!(self, ExecutionMode::Unbounded) + matches!(self, Boundedness::Unbounded { .. }) } +} - /// Check whether the execution is pipeline friendly. If so, operator can - /// execute safely. - pub fn pipeline_friendly(&self) -> bool { - matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded) - } +/// Represents how an operator emits its output records. +/// +/// This is used to determine whether an operator emits records incrementally as they arrive, +/// only emits a final result at the end, or can do both. Note that it generates the output -- record batch with `batch_size` rows +/// but it may still buffer data internally until it has enough data to emit a record batch or the source is exhausted. +/// +/// For example, in the following plan: +/// ```text +/// SortExec [EmissionType::Final] +/// |_ on: [col1 ASC] +/// FilterExec [EmissionType::Incremental] +/// |_ pred: col2 > 100 +/// CsvExec [EmissionType::Incremental] +/// |_ file: "data.csv" +/// ``` +/// - CsvExec emits records incrementally as it reads from the file +/// - FilterExec processes and emits filtered records incrementally as they arrive +/// - SortExec must wait for all input records before it can emit the sorted result, +/// since it needs to see all values to determine their final order +/// +/// Left joins can emit both incrementally and finally: +/// - Incrementally emit matches as they are found +/// - Finally emit non-matches after all input is processed +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EmissionType { + /// Records are emitted incrementally as they arrive and are processed + Incremental, + /// Records are only emitted once all input has been processed + Final, + /// Records can be emitted both incrementally and as a final result + Both, } -/// Conservatively "combines" execution modes of a given collection of operators. -pub(crate) fn execution_mode_from_children<'a>( +/// Utility to determine an operator's boundedness based on its children's boundedness. +/// +/// Assumes boundedness can be inferred from child operators: +/// - Unbounded (requires_infinite_memory: true) takes precedence. +/// - Unbounded (requires_infinite_memory: false) is considered next. +/// - Otherwise, the operator is bounded. +/// +/// **Note:** This is a general-purpose utility and may not apply to +/// all multi-child operators. Ensure your operator's behavior aligns +/// with these assumptions before using. +pub(crate) fn boundedness_from_children<'a>( children: impl IntoIterator>, -) -> ExecutionMode { - let mut result = ExecutionMode::Bounded; - for mode in children.into_iter().map(|child| child.execution_mode()) { - match (mode, result) { - (ExecutionMode::PipelineBreaking, _) - | (_, ExecutionMode::PipelineBreaking) => { - // If any of the modes is `PipelineBreaking`, so is the result: - return ExecutionMode::PipelineBreaking; - } - (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => { - // Unbounded mode eats up bounded mode: - result = ExecutionMode::Unbounded; +) -> Boundedness { + let mut unbounded_with_finite_mem = false; + + for child in children { + match child.boundedness() { + Boundedness::Unbounded { + requires_infinite_memory: true, + } => { + return Boundedness::Unbounded { + requires_infinite_memory: true, + } } - (ExecutionMode::Bounded, ExecutionMode::Bounded) => { - // When both modes are bounded, so is the result: - result = ExecutionMode::Bounded; + Boundedness::Unbounded { + requires_infinite_memory: false, + } => { + unbounded_with_finite_mem = true; } + Boundedness::Bounded => {} } } - result + + if unbounded_with_finite_mem { + Boundedness::Unbounded { + requires_infinite_memory: false, + } + } else { + Boundedness::Bounded + } +} + +/// Determines the emission type of an operator based on its children's pipeline behavior. +/// +/// The precedence of emission types is: +/// - `Final` has the highest precedence. +/// - `Both` is next: if any child emits both incremental and final results, the parent inherits this behavior unless a `Final` is present. +/// - `Incremental` is the default if all children emit incremental results. +/// +/// **Note:** This is a general-purpose utility and may not apply to +/// all multi-child operators. Verify your operator's behavior aligns +/// with these assumptions. +pub(crate) fn emission_type_from_children<'a>( + children: impl IntoIterator>, +) -> EmissionType { + let mut inc_and_final = false; + + for child in children { + match child.pipeline_behavior() { + EmissionType::Final => return EmissionType::Final, + EmissionType::Both => inc_and_final = true, + EmissionType::Incremental => continue, + } + } + + if inc_and_final { + EmissionType::Both + } else { + EmissionType::Incremental + } } /// Stores certain, often expensive to compute, plan properties used in query @@ -591,8 +662,10 @@ pub struct PlanProperties { pub eq_properties: EquivalenceProperties, /// See [ExecutionPlanProperties::output_partitioning] pub partitioning: Partitioning, - /// See [ExecutionPlanProperties::execution_mode] - pub execution_mode: ExecutionMode, + /// See [ExecutionPlanProperties::pipeline_behavior] + pub emission_type: EmissionType, + /// See [ExecutionPlanProperties::boundedness] + pub boundedness: Boundedness, /// See [ExecutionPlanProperties::output_ordering] output_ordering: Option, } @@ -602,14 +675,16 @@ impl PlanProperties { pub fn new( eq_properties: EquivalenceProperties, partitioning: Partitioning, - execution_mode: ExecutionMode, + emission_type: EmissionType, + boundedness: Boundedness, ) -> Self { // Output ordering can be derived from `eq_properties`. let output_ordering = eq_properties.output_ordering(); Self { eq_properties, partitioning, - execution_mode, + emission_type, + boundedness, output_ordering, } } @@ -620,12 +695,6 @@ impl PlanProperties { self } - /// Overwrite the execution Mode with its new value. - pub fn with_execution_mode(mut self, execution_mode: ExecutionMode) -> Self { - self.execution_mode = execution_mode; - self - } - /// Overwrite equivalence properties with its new value. pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { // Changing equivalence properties also changes output ordering, so @@ -635,6 +704,18 @@ impl PlanProperties { self } + /// Overwrite boundedness with its new value. + pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self { + self.boundedness = boundedness; + self + } + + /// Overwrite emission type with its new value. + pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self { + self.emission_type = emission_type; + self + } + pub fn equivalence_properties(&self) -> &EquivalenceProperties { &self.eq_properties } @@ -647,10 +728,6 @@ impl PlanProperties { self.output_ordering.as_ref() } - pub fn execution_mode(&self) -> ExecutionMode { - self.execution_mode - } - /// Get schema of the node. fn schema(&self) -> &SchemaRef { self.eq_properties.schema() diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index cc42e0587151..cb00958cec4c 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -20,7 +20,8 @@ use std::any::Any; use std::sync::Arc; -use super::{DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream}; +use super::{DisplayAs, PlanProperties, SendableRecordBatchStream}; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; @@ -74,11 +75,11 @@ impl ExplainExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Final, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 07898e8d22d8..901907cf38fa 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -272,10 +272,12 @@ impl FilterExec { output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); } + Ok(PlanProperties::new( eq_properties, output_partitioning, - input.execution_mode(), + input.pipeline_behavior(), + input.boundedness(), )) } } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index ae8a2acce696..e8486403868b 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -23,11 +23,12 @@ use std::fmt::Debug; use std::sync::Arc; use super::{ - execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, - ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, + execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, }; use crate::metrics::MetricsSet; use crate::stream::RecordBatchStreamAdapter; +use crate::ExecutionPlanProperties; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -140,7 +141,8 @@ impl DataSinkExec { PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(1), - input.execution_mode(), + input.pipeline_behavior(), + input.boundedness(), ) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8bf675e87362..b70eeb313b2a 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -24,11 +24,11 @@ use super::utils::{ StatefulStreamResult, }; use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - execution_mode_from_children, handle_state, ColumnStatistics, DisplayAs, - DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, - ExecutionPlanProperties, PlanProperties, RecordBatchStream, + handle_state, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use arrow::compute::concat_batches; @@ -161,14 +161,12 @@ impl CrossJoinExec { left.schema().fields.len(), ); - // Determine the execution mode: - let mut mode = execution_mode_from_children([left, right]); - if mode.is_unbounded() { - // If any of the inputs is unbounded, cross join breaks the pipeline. - mode = ExecutionMode::PipelineBreaking; - } - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + EmissionType::Final, + boundedness_from_children([left, right]), + ) } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 532a91da75b7..ef70392a01b7 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -29,11 +29,12 @@ use super::{ utils::{OnceAsync, OnceFut}, PartitionMode, }; +use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::ExecutionPlanProperties; use crate::{ coalesce_partitions::CoalescePartitionsExec, common::can_project, - execution_mode_from_children, handle_state, + handle_state, hash_utils::create_hashes, joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, @@ -44,9 +45,8 @@ use crate::{ JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult, }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, - DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, - Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use arrow::array::{ @@ -526,24 +526,26 @@ impl HashJoinExec { } }; - // Determine execution mode by checking whether this join is pipeline - // breaking. This happens when the left side is unbounded, or the right - // side is unbounded with `Left`, `Full`, `LeftAnti` or `LeftSemi` join types. - let pipeline_breaking = left.execution_mode().is_unbounded() - || (right.execution_mode().is_unbounded() - && matches!( - join_type, - JoinType::Left - | JoinType::Full - | JoinType::LeftAnti - | JoinType::LeftSemi - | JoinType::LeftMark - )); - - let mode = if pipeline_breaking { - ExecutionMode::PipelineBreaking + let emission_type = if left.boundedness().is_unbounded() { + EmissionType::Final + } else if right.pipeline_behavior() == EmissionType::Incremental { + match join_type { + // If we only need to generate matched rows from the probe side, + // we can emit rows incrementally. + JoinType::Inner + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::Right + | JoinType::RightAnti => EmissionType::Incremental, + // If we need to generate unmatched rows from the *build side*, + // we need to emit them at the end. + JoinType::Left + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::Full => EmissionType::Both, + } } else { - execution_mode_from_children([left, right]) + right.pipeline_behavior() }; // If contains projection, update the PlanProperties. @@ -556,10 +558,12 @@ impl HashJoinExec { output_partitioning.project(&projection_mapping, &eq_properties); eq_properties = eq_properties.project(&projection_mapping, out_schema); } + Ok(PlanProperties::new( eq_properties, output_partitioning, - mode, + emission_type, + boundedness_from_children([left, right]), )) } } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d174564178df..8caf5d9b5de1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -28,6 +28,7 @@ use super::utils::{ BatchTransformer, NoopBatchTransformer, StatefulStreamResult, }; use crate::coalesce_partitions::CoalescePartitionsExec; +use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::joins::utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, check_join_is_valid, estimate_join_statistics, @@ -36,9 +37,9 @@ use crate::joins::utils::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - execution_mode_from_children, handle_state, DisplayAs, DisplayFormatType, - Distribution, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, + handle_state, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + ExecutionPlanProperties, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, }; use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array}; @@ -241,14 +242,34 @@ impl NestedLoopJoinExec { let output_partitioning = asymmetric_join_output_partitioning(left, right, &join_type); - // Determine execution mode: - let mode = if left.execution_mode().is_unbounded() { - ExecutionMode::PipelineBreaking + let emission_type = if left.boundedness().is_unbounded() { + EmissionType::Final + } else if right.pipeline_behavior() == EmissionType::Incremental { + match join_type { + // If we only need to generate matched rows from the probe side, + // we can emit rows incrementally. + JoinType::Inner + | JoinType::LeftSemi + | JoinType::RightSemi + | JoinType::Right + | JoinType::RightAnti => EmissionType::Incremental, + // If we need to generate unmatched rows from the *build side*, + // we need to emit them at the end. + JoinType::Left + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::Full => EmissionType::Both, + } } else { - execution_mode_from_children([left, right]) + right.pipeline_behavior() }; - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness_from_children([left, right]), + ) } /// Returns a vector indicating whether the left and right inputs maintain their order. diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 5e387409da16..f17b99d81d7b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -53,8 +53,8 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; -use futures::{Stream, StreamExt}; +use crate::execution_plan::{boundedness_from_children, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, @@ -63,11 +63,13 @@ use crate::joins::utils::{ use crate::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::spill::spill_record_batches; use crate::{ - execution_mode_from_children, metrics, DisplayAs, DisplayFormatType, Distribution, - ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, + metrics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; +use futures::{Stream, StreamExt}; + /// Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge /// join algorithm and applies an optional filter post join. Can be used to join arbitrarily large /// inputs where one or both of the inputs don't fit in the available memory. @@ -302,10 +304,13 @@ impl SortMergeJoinExec { let output_partitioning = symmetric_join_output_partitioning(left, right, &join_type); - // Determine execution mode: - let mode = execution_mode_from_children([left, right]); - - PlanProperties::new(eq_properties, output_partitioning, mode) + // TODO: Emission type may be incremental if the input is sorted + PlanProperties::new( + eq_properties, + output_partitioning, + EmissionType::Final, + boundedness_from_children([left, right]), + ) } } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 94ef4d5bc34c..72fd5a0feb1a 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -33,6 +33,7 @@ use std::task::{Context, Poll}; use std::vec; use crate::common::SharedMemoryReservation; +use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::joins::hash_join::{equal_rows_arr, update_hash}; use crate::joins::stream_join_utils::{ calculate_filter_expr_intervals, combine_two_batches, @@ -47,7 +48,6 @@ use crate::joins::utils::{ NoopBatchTransformer, StatefulStreamResult, }; use crate::{ - execution_mode_from_children, joins::StreamJoinPartitionMode, metrics::{ExecutionPlanMetricsSet, MetricsSet}, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, @@ -275,10 +275,12 @@ impl SymmetricHashJoinExec { let output_partitioning = symmetric_join_output_partitioning(left, right, &join_type); - // Determine execution mode: - let mode = execution_mode_from_children([left, right]); - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type_from_children([left, right]), + boundedness_from_children([left, right]), + ) } /// left stream diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 845a74eaea48..5ad37f0b1ac0 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -35,11 +35,10 @@ pub use datafusion_physical_expr::{ }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; -pub(crate) use crate::execution_plan::execution_mode_from_children; pub use crate::execution_plan::{ collect, collect_partitioned, displayable, execute_input_stream, execute_stream, execute_stream_partitioned, get_plan_string, with_new_children_if_necessary, - ExecutionMode, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + ExecutionPlan, ExecutionPlanProperties, PlanProperties, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index ab1e6cb37bc8..9665a09e42c9 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -24,9 +24,10 @@ use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{ - DisplayAs, ExecutionMode, ExecutionPlanProperties, PlanProperties, RecordBatchStream, + DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::{Boundedness, CardinalityEffect}; use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning}; use arrow::datatypes::SchemaRef; @@ -34,7 +35,6 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; -use crate::execution_plan::CardinalityEffect; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -86,7 +86,9 @@ impl GlobalLimitExec { PlanProperties::new( input.equivalence_properties().clone(), // Equivalence Properties Partitioning::UnknownPartitioning(1), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + input.pipeline_behavior(), + // Limit operations are always bounded since they output a finite number of rows + Boundedness::Bounded, ) } } @@ -242,7 +244,9 @@ impl LocalLimitExec { PlanProperties::new( input.equivalence_properties().clone(), // Equivalence Properties input.output_partitioning().clone(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + input.pipeline_behavior(), + // Limit operations are always bounded since they output a finite number of rows + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index bf6294f5a55b..521008ce9b02 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -24,9 +24,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{ - common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + common, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::{Boundedness, EmissionType}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -285,11 +286,11 @@ impl MemoryExec { orderings: &[LexOrdering], partitions: &[Vec], ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); PlanProperties::new( - eq_properties, // Equivalence Properties - Partitioning::UnknownPartitioning(partitions.len()), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode + EquivalenceProperties::new_with_orderings(schema, orderings), + Partitioning::UnknownPartitioning(partitions.len()), + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -393,7 +394,8 @@ impl LazyMemoryExec { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&schema)), Partitioning::RoundRobinBatch(generators.len()), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ); Ok(Self { schema, diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index f9437f46f8a6..355e51070f1f 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -20,10 +20,8 @@ use std::any::Any; use std::sync::Arc; -use super::{ - common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream, - Statistics, -}; +use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::{memory::MemoryStream, DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::array::{ArrayRef, NullArray}; @@ -96,11 +94,12 @@ impl PlaceholderRowExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - // Get output partitioning: - let output_partitioning = Self::output_partitioning_helper(n_partitions); - - PlanProperties::new(eq_properties, output_partitioning, ExecutionMode::Bounded) + PlanProperties::new( + EquivalenceProperties::new(schema), + Self::output_partitioning_helper(n_partitions), + EmissionType::Incremental, + Boundedness::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index c1d3f368366f..e37a6b0dfb85 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -132,7 +132,8 @@ impl ProjectionExec { Ok(PlanProperties::new( eq_properties, output_partitioning, - input.execution_mode(), + input.pipeline_behavior(), + input.boundedness(), )) } } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 0137e5d52fea..0e49a791cbae 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -26,7 +26,8 @@ use super::{ work_table::{ReservedBatches, WorkTable, WorkTableExec}, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan}; +use crate::execution_plan::{Boundedness, EmissionType}; +use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -122,7 +123,8 @@ impl RecursiveQueryExec { PlanProperties::new( eq_properties, Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 0a80dcd34e05..963ccc6fd809 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -726,13 +726,11 @@ impl RepartitionExec { partitioning: Partitioning, preserve_order: bool, ) -> PlanProperties { - // Equivalence Properties - let eq_properties = Self::eq_properties_helper(input, preserve_order); - PlanProperties::new( - eq_properties, // Equivalence Properties - partitioning, // Output Partitioning - input.execution_mode(), // Execution Mode + Self::eq_properties_helper(input, preserve_order), + partitioning, + input.pipeline_behavior(), + input.boundedness(), ) } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 77636f9e4991..f14ba6606e89 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -201,10 +201,12 @@ impl PartialSortExec { let output_partitioning = Self::output_partitioning_helper(input, preserve_partitioning); - // Determine execution mode: - let mode = input.execution_mode(); - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + input.pipeline_behavior(), + input.boundedness(), + ) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c2d8b093a923..8d8a5c5f7055 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -25,6 +25,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use crate::common::spawn_buffered; +use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{ @@ -37,9 +38,9 @@ use crate::spill::{ use crate::stream::RecordBatchStreamAdapter; use crate::topk::TopK; use crate::{ - DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionMode, - ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, - SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, + ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, + Statistics, }; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, SortColumn}; @@ -56,7 +57,6 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; -use crate::execution_plan::CardinalityEffect; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -763,11 +763,15 @@ impl SortExec { /// can be dropped. pub fn with_fetch(&self, fetch: Option) -> Self { let mut cache = self.cache.clone(); - if fetch.is_some() && self.cache.execution_mode == ExecutionMode::Unbounded { - // When a theoretically unnecessary sort becomes a top-K (which - // sometimes arises as an intermediate state before full removal), - // its execution mode should become `Bounded`. - cache.execution_mode = ExecutionMode::Bounded; + // If the SortExec can emit incrementally (that means the sort requirements + // and properties of the input match), the SortExec can generate its result + // without scanning the entire input when a fetch value exists. + let is_pipeline_friendly = matches!( + self.cache.emission_type, + EmissionType::Incremental | EmissionType::Both + ); + if fetch.is_some() && is_pipeline_friendly { + cache = cache.with_boundedness(Boundedness::Bounded); } SortExec { input: Arc::clone(&self.input), @@ -817,10 +821,30 @@ impl SortExec { let sort_satisfied = input .equivalence_properties() .ordering_satisfy_requirement(&requirement); - let mode = match input.execution_mode() { - ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded, - ExecutionMode::Bounded => ExecutionMode::Bounded, - _ => ExecutionMode::PipelineBreaking, + + // The emission type depends on whether the input is already sorted: + // - If already sorted, we can emit results in the same way as the input + // - If not sorted, we must wait until all data is processed to emit results (Final) + let emission_type = if sort_satisfied { + input.pipeline_behavior() + } else { + EmissionType::Final + }; + + // The boundedness depends on whether the input is already sorted: + // - If already sorted, we have the same property as the input + // - If not sorted and input is unbounded, we require infinite memory and generates + // unbounded data (not practical). + // - If not sorted and input is bounded, then the SortExec is bounded, too. + let boundedness = if sort_satisfied { + input.boundedness() + } else { + match input.boundedness() { + Boundedness::Unbounded { .. } => Boundedness::Unbounded { + requires_infinite_memory: true, + }, + bounded => bounded, + } }; // Calculate equivalence properties; i.e. reset the ordering equivalence @@ -835,7 +859,12 @@ impl SortExec { let output_partitioning = Self::output_partitioning_helper(input, preserve_partitioning); - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type, + boundedness, + ) } } @@ -1006,6 +1035,7 @@ mod tests { use super::*; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::collect; + use crate::execution_plan::Boundedness; use crate::expressions::col; use crate::memory::MemoryExec; use crate::test; @@ -1049,8 +1079,14 @@ mod tests { eq_properties.add_new_orderings(vec![LexOrdering::new(vec![ PhysicalSortExpr::new_default(Arc::new(Column::new("c1", 0))), ])]); - let mode = ExecutionMode::Unbounded; - PlanProperties::new(eq_properties, Partitioning::UnknownPartitioning(1), mode) + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 906164f21b8c..21597fb85662 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -139,7 +139,8 @@ impl SortPreservingMergeExec { PlanProperties::new( eq_properties, // Equivalence Properties Partitioning::UnknownPartitioning(1), // Output Partitioning - input.execution_mode(), // Execution Mode + input.pipeline_behavior(), // Pipeline Behavior + input.boundedness(), // Boundedness ) } } @@ -323,6 +324,7 @@ mod tests { use super::*; use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; + use crate::execution_plan::{Boundedness, EmissionType}; use crate::expressions::col; use crate::memory::MemoryExec; use crate::metrics::{MetricValue, Timestamp}; @@ -331,7 +333,7 @@ mod tests { use crate::stream::RecordBatchReceiverStream; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; use crate::test::{self, assert_is_pending, make_partition}; - use crate::{collect, common, ExecutionMode}; + use crate::{collect, common}; use arrow::array::{ArrayRef, Int32Array, StringArray, TimestampNanosecondArray}; use arrow::compute::SortOptions; @@ -1268,8 +1270,14 @@ mod tests { .iter() .map(|expr| PhysicalSortExpr::new_default(Arc::clone(expr))) .collect::()]); - let mode = ExecutionMode::Unbounded; - PlanProperties::new(eq_properties, Partitioning::Hash(columns, 3), mode) + PlanProperties::new( + eq_properties, + Partitioning::Hash(columns, 3), + EmissionType::Incremental, + Boundedness::Unbounded { + requires_infinite_memory: false, + }, + ) } } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 7ccef3248069..da8b0e877dcc 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -21,8 +21,9 @@ use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use super::{DisplayAs, DisplayFormatType, ExecutionMode, PlanProperties}; +use super::{DisplayAs, DisplayFormatType, PlanProperties}; use crate::display::{display_orderings, ProjectSchemaDisplay}; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::stream::RecordBatchStreamAdapter; use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; @@ -145,22 +146,26 @@ impl StreamingTableExec { schema: SchemaRef, orderings: &[LexOrdering], partitions: &[Arc], - is_infinite: bool, + infinite: bool, ) -> PlanProperties { // Calculate equivalence properties: let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); // Get output partitioning: let output_partitioning = Partitioning::UnknownPartitioning(partitions.len()); - - // Determine execution mode: - let mode = if is_infinite { - ExecutionMode::Unbounded + let boundedness = if infinite { + Boundedness::Unbounded { + requires_infinite_memory: false, + } } else { - ExecutionMode::Bounded + Boundedness::Bounded }; - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + EmissionType::Incremental, + boundedness, + ) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index cc0a7cbd9b52..b31a53e55e88 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -24,10 +24,14 @@ use std::{ task::{Context, Poll}, }; -use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; use crate::{ - common, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, - PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, + common, execution_plan::Boundedness, DisplayAs, DisplayFormatType, ExecutionPlan, + Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + Statistics, +}; +use crate::{ + execution_plan::EmissionType, + stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -152,12 +156,11 @@ impl MockExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -315,11 +318,11 @@ impl BarrierExec { schema: SchemaRef, data: &[Vec], ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(data.len()), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -427,12 +430,11 @@ impl ErrorExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -509,12 +511,11 @@ impl StatisticsExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(2), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -610,12 +611,11 @@ impl BlockingExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef, n_partitions: usize) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(n_partitions), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } @@ -752,13 +752,12 @@ impl PanicExec { schema: SchemaRef, batches_until_panics: &[usize], ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); let num_partitions = batches_until_panics.len(); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(num_partitions), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index bd36753880eb..6e768a3d87bc 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -27,12 +27,12 @@ use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use super::{ - execution_mode_from_children, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; use crate::metrics::BaselineMetrics; use crate::stream::ObservedStream; @@ -135,14 +135,11 @@ impl UnionExec { .map(|plan| plan.output_partitioning().partition_count()) .sum(); let output_partitioning = Partitioning::UnknownPartitioning(num_partitions); - - // Determine execution mode: - let mode = execution_mode_from_children(inputs.iter()); - Ok(PlanProperties::new( eq_properties, output_partitioning, - mode, + emission_type_from_children(inputs), + boundedness_from_children(inputs), )) } } @@ -335,10 +332,12 @@ impl InterleaveExec { let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: let output_partitioning = inputs[0].output_partitioning().clone(); - // Determine execution mode: - let mode = execution_mode_from_children(inputs.iter()); - - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + emission_type_from_children(inputs), + boundedness_from_children(inputs), + ) } } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 19b1b46953d8..19a090ca284f 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -100,12 +100,11 @@ impl UnnestExec { input: &Arc, schema: SchemaRef, ) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, - input.output_partitioning().clone(), - input.execution_mode(), + EquivalenceProperties::new(schema), + input.output_partitioning().to_owned(), + input.pipeline_behavior(), + input.boundedness(), ) } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index edadf98cb10c..5089b1e626d4 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -20,10 +20,8 @@ use std::any::Any; use std::sync::Arc; -use super::{ - common, DisplayAs, ExecutionMode, PlanProperties, SendableRecordBatchStream, - Statistics, -}; +use super::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::{ memory::MemoryStream, ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, @@ -133,12 +131,11 @@ impl ValuesExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index c6003fe0a82f..b66147bf7439 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -202,9 +202,11 @@ impl BoundedWindowAggExec { // Construct properties cache PlanProperties::new( - eq_properties, // Equivalence Properties - output_partitioning, // Output Partitioning - input.execution_mode(), // Execution Mode + eq_properties, + output_partitioning, + // TODO: Emission type and boundedness information can be enhanced here + input.pipeline_behavior(), + input.boundedness(), ) } } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 222a8bb71a02..36c4b9f18da9 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -383,7 +383,7 @@ pub fn get_best_fitting_window( } else { return Ok(None); }; - let is_unbounded = input.execution_mode().is_unbounded(); + let is_unbounded = input.boundedness().is_unbounded(); if !is_unbounded && input_order_mode != InputOrderMode::Sorted { // Executor has bounded input and `input_order_mode` is not `InputOrderMode::Sorted` // in this case removing the sort is not helpful, return: diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index c0ac96d22ef7..b132c3247072 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -23,15 +23,16 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; +use crate::execution_plan::EmissionType; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, }; use crate::{ - ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionMode, - ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties, - RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr, + ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, + ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, Statistics, WindowExpr, }; use arrow::array::ArrayRef; use arrow::compute::{concat, concat_batches}; @@ -127,16 +128,14 @@ impl WindowAggExec { // would be either 1 or more than 1 depending on the presence of repartitioning. let output_partitioning = input.output_partitioning().clone(); - // Determine execution mode: - let mode = match input.execution_mode() { - ExecutionMode::Bounded => ExecutionMode::Bounded, - ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { - ExecutionMode::PipelineBreaking - } - }; - // Construct properties cache: - PlanProperties::new(eq_properties, output_partitioning, mode) + PlanProperties::new( + eq_properties, + output_partitioning, + // TODO: Emission type and boundedness information can be enhanced here + EmissionType::Final, + input.boundedness(), + ) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index add386319253..b1dd4d9308f4 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -24,8 +24,9 @@ use super::{ metrics::{ExecutionPlanMetricsSet, MetricsSet}, SendableRecordBatchStream, Statistics, }; +use crate::execution_plan::{Boundedness, EmissionType}; use crate::memory::MemoryStream; -use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties}; +use crate::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -142,12 +143,11 @@ impl WorkTableExec { /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties(schema: SchemaRef) -> PlanProperties { - let eq_properties = EquivalenceProperties::new(schema); - PlanProperties::new( - eq_properties, + EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, + EmissionType::Incremental, + Boundedness::Bounded, ) } }