diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 9516dc570d6d..d3cd66b2c9bc 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -35,6 +35,7 @@ use datafusion::physical_plan::{ }; use datafusion::prelude::*; use datafusion_expr::{Expr, LogicalPlanBuilder}; +use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; use tokio::time::timeout; @@ -199,22 +200,21 @@ impl CustomExec { db: CustomDataSource, ) -> Self { let projected_schema = project_schema(&schema, projections).unwrap(); - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let cache = Self::create_cache(projected_schema.clone()); Self { db, projected_schema, cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - .with_partitioning(Partitioning::UnknownPartitioning(1)) - .with_exec_mode(ExecutionMode::Bounded); - - self + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1a27f9315b34..24e825a6920b 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -61,7 +61,11 @@ impl ArrowExec { pub fn new(base_config: FileScanConfig) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let cache = Self::create_cache( + projected_schema.clone(), + &projected_output_ordering, + &base_config, + ); Self { base_config, projected_schema, @@ -70,36 +74,36 @@ impl ArrowExec { metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Ref to the base configs pub fn base_config(&self) -> &FileScanConfig { &self.base_config } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + projected_output_ordering: &[LexOrdering], + file_scan_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = + EquivalenceProperties::new_with_orderings(schema, projected_output_ordering); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, - self.output_partitioning_helper(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - self + Self::output_partitioning_helper(file_scan_config), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(&self.base_config); self.cache = self.cache.with_partitioning(output_partitioning); self } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index fb2cd627a1da..6b6e7bce90c1 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -50,7 +50,11 @@ impl AvroExec { pub fn new(base_config: FileScanConfig) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let cache = Self::create_cache( + projected_schema.clone(), + &projected_output_ordering, + &base_config, + ); Self { base_config, projected_schema, @@ -59,27 +63,26 @@ impl AvroExec { metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Ref to the base configs pub fn base_config(&self) -> &FileScanConfig { &self.base_config } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + file_scan_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); - let n_partitions = self.base_config.file_groups.len(); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + let n_partitions = file_scan_config.file_groups.len(); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning ExecutionMode::Bounded, // Execution Mode - ); - self + ) } } diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 55cf62507788..19281bc3c189 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -53,7 +53,6 @@ use tokio::task::JoinSet; pub struct CsvExec { base_config: FileScanConfig, projected_statistics: Statistics, - projected_output_ordering: Vec, has_header: bool, delimiter: u8, quote: u8, @@ -77,11 +76,14 @@ impl CsvExec { ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema); + let cache = Self::create_cache( + projected_schema, + &projected_output_ordering, + &base_config, + ); Self { base_config, projected_statistics, - projected_output_ordering, has_header, delimiter, quote, @@ -90,7 +92,6 @@ impl CsvExec { file_compression_type, cache, } - .with_cache() } /// Ref to the base configs @@ -116,29 +117,29 @@ impl CsvExec { self.escape } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + file_scan_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, - self.output_partitioning_helper(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - self + Self::output_partitioning_helper(file_scan_config), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(&self.base_config); self.cache = self.cache.with_partitioning(output_partitioning); self } diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 231b48f28d96..6e17e58d8444 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -52,7 +52,6 @@ use tokio::task::JoinSet; pub struct NdJsonExec { base_config: FileScanConfig, projected_statistics: Statistics, - projected_output_ordering: Vec, /// Execution metrics metrics: ExecutionPlanMetricsSet, file_compression_type: FileCompressionType, @@ -67,16 +66,18 @@ impl NdJsonExec { ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema); + let cache = Self::create_cache( + projected_schema, + &projected_output_ordering, + &base_config, + ); Self { base_config, projected_statistics, - projected_output_ordering, metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, } - .with_cache() } /// Ref to the base configs @@ -84,29 +85,29 @@ impl NdJsonExec { &self.base_config } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_scan_config.file_groups.len()) } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + file_scan_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, - self.output_partitioning_helper(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - self + Self::output_partitioning_helper(file_scan_config), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(&self.base_config); self.cache = self.cache.with_partitioning(output_partitioning); self } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index e47c8e516385..810a84646c86 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -88,7 +88,6 @@ pub struct ParquetExec { /// Base configuration for this scan base_config: FileScanConfig, projected_statistics: Statistics, - projected_output_ordering: Vec, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Optional predicate for row filtering during parquet scan @@ -149,7 +148,11 @@ impl ParquetExec { let (projected_schema, projected_statistics, projected_output_ordering) = base_config.project(); - let cache = PlanPropertiesCache::new_default(projected_schema); + let cache = Self::create_cache( + projected_schema, + &projected_output_ordering, + &base_config, + ); Self { pushdown_filters: None, reorder_filters: None, @@ -157,7 +160,6 @@ impl ParquetExec { enable_bloom_filter: None, base_config, projected_statistics, - projected_output_ordering, metrics, predicate, pruning_predicate, @@ -166,7 +168,6 @@ impl ParquetExec { parquet_file_reader_factory: None, cache, } - .with_cache() } /// Ref to the base configs @@ -261,29 +262,29 @@ impl ParquetExec { .unwrap_or(config_options.execution.parquet.bloom_filter_enabled) } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_config.file_groups.len()) } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + file_config: &FileScanConfig, + ) -> PlanPropertiesCache { // Equivalence Properties - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, - self.output_partitioning_helper(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - self + Self::output_partitioning_helper(file_config), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } fn with_file_groups(mut self, file_groups: Vec>) -> Self { self.base_config.file_groups = file_groups; // Changing file groups may invalidate output partitioning. Update it also - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(&self.base_config); self.cache = self.cache.with_partitioning(output_partitioning); self } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c7bfe4742bdf..a5ad2d546d41 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1352,22 +1352,20 @@ pub(crate) mod tests { input: Arc, requirement: Vec, ) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); Self { input, expr: requirement, cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - self.input.output_partitioning().clone(), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - self + fn create_cache(input: &Arc) -> PlanPropertiesCache { + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + input.output_partitioning().clone(), // Output Partitioning + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 5ddba79515ee..a806580ce716 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -99,27 +99,25 @@ impl OutputRequirementExec { requirements: Option, dist_requirement: Distribution, ) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); Self { input, order_requirement: requirements, dist_requirement, cache, } - .with_cache() } pub(crate) fn input(&self) -> Arc { self.input.clone() } - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - self.input.output_partitioning().clone(), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - self + fn create_cache(input: &Arc) -> PlanPropertiesCache { + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + input.output_partitioning().clone(), // Output Partitioning + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 1a334678b6d7..e571bc76f4d5 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2013,6 +2013,7 @@ mod tests { col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder, UserDefinedLogicalNodeCore, }; + use datafusion_physical_expr::EquivalenceProperties; fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); @@ -2579,19 +2580,19 @@ mod tests { impl NoOpExecutionPlan { fn new(schema: SchemaRef) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); - Self { cache }.with_cache() + let cache = Self::create_cache(schema.clone()); + Self { cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) + Partitioning::UnknownPartitioning(1), // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 277901ff9915..f8eb67cfdaf5 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -42,7 +42,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, FileType, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{Partitioning, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, PlanPropertiesCache, @@ -376,24 +376,23 @@ impl StatisticsExec { stats.column_statistics.len(), schema.fields().len(), "if defined, the column statistics vector length should be the number of fields" ); - let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone())); + let cache = Self::create_cache(Arc::new(schema.clone())); Self { stats, schema: Arc::new(schema), cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(2)) + Partitioning::UnknownPartitioning(2), // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index bb016f93c351..dda6d730ce84 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -55,6 +55,7 @@ use tempfile::TempDir; #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; +use datafusion_physical_expr::EquivalenceProperties; /// Scan an empty data source, mainly used in tests pub fn scan_empty( @@ -226,7 +227,6 @@ impl TableProvider for TestTableProvider { pub struct UnboundedExec { batch_produce: Option, batch: RecordBatch, - partitions: usize, cache: PlanPropertiesCache, } impl UnboundedExec { @@ -238,29 +238,30 @@ impl UnboundedExec { batch: RecordBatch, partitions: usize, ) -> Self { - let cache = PlanPropertiesCache::new_default(batch.schema()); + let cache = Self::create_cache(batch.schema(), batch_produce, partitions); Self { batch_produce, batch, - partitions, cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - let mode = if self.batch_produce.is_none() { + fn create_cache( + schema: SchemaRef, + batch_produce: Option, + n_partitions: usize, + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + let mode = if batch_produce.is_none() { ExecutionMode::Unbounded } else { ExecutionMode::Bounded }; - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(self.partitions)) - // Execution Mode - .with_exec_mode(mode); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(n_partitions), + mode, + ) } } diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index a167258ee1d5..11f29192276c 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -38,6 +38,7 @@ use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; use datafusion_common::project_schema; use datafusion_common::stats::Precision; +use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; @@ -81,19 +82,18 @@ impl CustomExecutionPlan { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = project_schema(&schema, projection.as_ref()).expect("projected schema"); - let cache = PlanPropertiesCache::new_default(schema); - Self { projection, cache }.with_cache() + let cache = Self::create_cache(schema); + Self { projection, cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 9423f0170c7e..da00effa00a8 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -35,6 +35,7 @@ use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; use datafusion_common::{internal_err, not_impl_err, DataFusionError}; use datafusion_expr::expr::{BinaryExpr, Cast}; +use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; @@ -62,19 +63,17 @@ struct CustomPlan { impl CustomPlan { fn new(schema: SchemaRef, batches: Vec) -> Self { - let cache = PlanPropertiesCache::new_default(schema); - Self { batches, cache }.with_cache() + let cache = Self::create_cache(schema); + Self { batches, cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 315c7cb6dd26..37854908f021 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -33,6 +33,7 @@ use datafusion::{ scalar::ScalarValue, }; use datafusion_common::{project_schema, stats::Precision}; +use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; @@ -52,24 +53,22 @@ impl StatisticsValidation { schema.fields().len(), "the column statistics vector length should be the number of fields" ); - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone()); Self { stats, schema, cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(2)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(2), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 947376dfb6a0..f2b6f6c93615 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -91,6 +91,7 @@ use datafusion::{ }; use async_trait::async_trait; +use datafusion_physical_expr::EquivalenceProperties; use futures::{Stream, StreamExt}; /// Execute the specified sql and return the resulting record batches @@ -416,19 +417,18 @@ struct TopKExec { impl TopKExec { fn new(input: Arc, k: usize) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); - Self { input, k, cache }.with_cache() + let cache = Self::create_cache(input.schema()); + Self { input, k, cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index ea3fc3a737b2..fa5b65e40123 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -260,9 +260,6 @@ pub struct AggregateExec { /// We need the input schema of partial aggregate to be able to deserialize aggregate /// expressions from protobuf for final aggregate. pub input_schema: SchemaRef, - /// The mapping used to normalize expressions like Partitioning and - /// PhysicalSortExpr that maps input to output - projection_mapping: ProjectionMapping, /// Execution metrics metrics: ExecutionPlanMetricsSet, required_input_ordering: Option, @@ -365,8 +362,14 @@ impl AggregateExec { let required_input_ordering = (!new_requirement.is_empty()).then_some(new_requirement); - let cache = PlanPropertiesCache::new_default(schema.clone()); - let aggregate = AggregateExec { + let cache = Self::create_cache( + &input, + schema.clone(), + &projection_mapping, + &mode, + &input_order_mode, + ); + Ok(AggregateExec { mode, group_by, aggr_expr, @@ -374,14 +377,12 @@ impl AggregateExec { input, schema, input_schema, - projection_mapping, metrics: ExecutionPlanMetricsSet::new(), required_input_ordering, limit: None, input_order_mode, cache, - }; - Ok(aggregate.with_cache()) + }) } /// Aggregation mode (full, partial) @@ -505,26 +506,31 @@ impl AggregateExec { true } - fn with_cache(mut self) -> Self { + fn create_cache( + input: &Arc, + schema: SchemaRef, + projection_mapping: &ProjectionMapping, + mode: &AggregateMode, + input_order_mode: &InputOrderMode, + ) -> PlanPropertiesCache { // Construct equivalence properties: - let eq_properties = self - .input + let eq_properties = input .equivalence_properties() - .project(&self.projection_mapping, self.schema()); + .project(projection_mapping, schema); // Get output partitioning: - let mut output_partitioning = self.input.output_partitioning().clone(); - if self.mode.is_first_stage() { + let mut output_partitioning = input.output_partitioning().clone(); + if mode.is_first_stage() { // First stage aggregation will not change the output partitioning, // but needs to respect aliases (e.g. mapping in the GROUP BY // expression). - let input_eq_properties = self.input.equivalence_properties(); + let input_eq_properties = input.equivalence_properties(); if let Partitioning::Hash(exprs, part) = output_partitioning { let normalized_exprs = exprs .iter() .map(|expr| { input_eq_properties - .project_expr(expr, &self.projection_mapping) + .project_expr(expr, projection_mapping) .unwrap_or_else(|| { Arc::new(UnKnownColumn::new(&expr.to_string())) }) @@ -535,18 +541,15 @@ impl AggregateExec { } // Determine execution mode: - let mut exec_mode = self.input.execution_mode(); + let mut exec_mode = input.execution_mode(); if exec_mode == ExecutionMode::Unbounded - && self.input_order_mode == InputOrderMode::Linear + && *input_order_mode == InputOrderMode::Linear { // Cannot run without breaking the pipeline exec_mode = ExecutionMode::PipelineBreaking; } - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); - - self + PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode) } pub fn input_order_mode(&self) -> &InputOrderMode { @@ -1622,19 +1625,19 @@ mod tests { impl TestYieldingExec { fn new(yield_first: bool) -> Self { let schema = some_data().0; - let cache = PlanPropertiesCache::new_default(schema); - Self { yield_first, cache }.with_cache() + let cache = Self::create_cache(schema); + Self { yield_first, cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) + Partitioning::UnknownPartitioning(1), // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 99b3add2acd6..731f3e3c7ebf 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -29,6 +29,7 @@ use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use futures::StreamExt; @@ -55,7 +56,7 @@ impl AnalyzeExec { input: Arc, schema: SchemaRef, ) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(&input, schema.clone()); AnalyzeExec { verbose, show_statistics, @@ -63,7 +64,6 @@ impl AnalyzeExec { schema, cache, } - .with_cache() } /// access to verbose @@ -81,15 +81,14 @@ impl AnalyzeExec { &self.input } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(self.input.execution_mode()); - - self + fn create_cache( + input: &Arc, + schema: SchemaRef, + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + let output_partitioning = Partitioning::UnknownPartitioning(1); + let exec_mode = input.execution_mode(); + PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode) } } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index e01060f3784d..e83bce0664a3 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -54,14 +54,13 @@ pub struct CoalesceBatchesExec { impl CoalesceBatchesExec { /// Create a new CoalesceBatchesExec pub fn new(input: Arc, target_batch_size: usize) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); Self { input, target_batch_size, metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// The input plan @@ -74,16 +73,14 @@ impl CoalesceBatchesExec { self.target_batch_size } - fn with_cache(mut self) -> Self { + fn create_cache(input: &Arc) -> PlanPropertiesCache { // The coalesce batches operator does not make any changes to the // partitioning of its input. - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - self.input.output_partitioning().clone(), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - - self + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + input.output_partitioning().clone(), // Output Partitioning + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 255e996bd122..27f58c9bfd85 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -44,13 +44,12 @@ pub struct CoalescePartitionsExec { impl CoalescePartitionsExec { /// Create a new CoalescePartitionsExec pub fn new(input: Arc) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); CoalescePartitionsExec { input, metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Input execution plan @@ -58,18 +57,16 @@ impl CoalescePartitionsExec { &self.input } - fn with_cache(mut self) -> Self { + fn create_cache(input: &Arc) -> PlanPropertiesCache { // Coalescing partitions loses existing orderings: - let mut eq_properties = self.input.equivalence_properties().clone(); + let mut eq_properties = input.equivalence_properties().clone(); eq_properties.clear_orderings(); - self.cache = PlanPropertiesCache::new( + PlanPropertiesCache::new( eq_properties, // Equivalence Properties Partitioning::UnknownPartitioning(1), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - - self + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index d91395825135..942bee81f472 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -30,6 +30,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use log::trace; @@ -46,20 +47,19 @@ pub struct EmptyExec { impl EmptyExec { /// Create a new EmptyExec pub fn new(schema: SchemaRef) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone(), 1); EmptyExec { schema, partitions: 1, cache, } - .with_cache() } /// Create a new EmptyExec with specified partition number pub fn with_partitions(mut self, partitions: usize) -> Self { self.partitions = partitions; // Changing partitions may invalidate output partitioning, so update it: - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(self.partitions); self.cache = self.cache.with_partitioning(output_partitioning); self } @@ -68,21 +68,20 @@ impl EmptyExec { Ok(vec![]) } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partitions) + fn output_partitioning_helper(n_partitions: usize) -> Partitioning { + Partitioning::UnknownPartitioning(n_partitions) } - fn with_cache(mut self) -> Self { - let output_partitioning = self.output_partitioning_helper(); - - self.cache = self - .cache + fn create_cache(schema: SchemaRef, n_partitions: usize) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + let output_partitioning = Self::output_partitioning_helper(n_partitions); + PlanPropertiesCache::new( + eq_properties, // Output Partitioning - .with_partitioning(output_partitioning) + output_partitioning, // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 935b37c168da..689ef32aa1a9 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -28,6 +28,7 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc use datafusion_common::display::StringifiedPlan; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use log::trace; @@ -52,14 +53,13 @@ impl ExplainExec { stringified_plans: Vec, verbose: bool, ) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone()); ExplainExec { schema, stringified_plans, verbose, cache, } - .with_cache() } /// The strings to be printed @@ -72,15 +72,13 @@ impl ExplainExec { self.verbose } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index cc8fdcbcd0cd..d6942f0d5678 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -73,15 +73,15 @@ impl FilterExec { ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { - let cache = PlanPropertiesCache::new_default(input.schema()); + let default_selectivity = 20; + let cache = Self::create_cache(&input, &predicate, default_selectivity)?; Ok(Self { predicate, input: input.clone(), metrics: ExecutionPlanMetricsSet::new(), - default_selectivity: 20, + default_selectivity, cache, - } - .with_cache()) + }) } other => { plan_err!("Filter predicate must return boolean values, not {other:?}") @@ -115,12 +115,58 @@ impl FilterExec { self.default_selectivity } - fn with_cache(mut self) -> Self { + fn statistics_helper( + input: &Arc, + predicate: &Arc, + default_selectivity: u8, + ) -> Result { + let input_stats = input.statistics()?; + let schema = input.schema(); + if !check_support(predicate, &schema) { + let selectivity = default_selectivity as f64 / 100.0; + let mut stats = input_stats.into_inexact(); + stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); + stats.total_byte_size = stats + .total_byte_size + .with_estimated_selectivity(selectivity); + return Ok(stats); + } + + let num_rows = input_stats.num_rows; + let total_byte_size = input_stats.total_byte_size; + let input_analysis_ctx = AnalysisContext::try_from_statistics( + &input.schema(), + &input_stats.column_statistics, + )?; + + let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; + + // Estimate (inexact) selectivity of predicate + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let num_rows = num_rows.with_estimated_selectivity(selectivity); + let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); + + let column_statistics = collect_new_statistics( + &input_stats.column_statistics, + analysis_ctx.boundaries, + ); + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) + } + + fn create_cache( + input: &Arc, + predicate: &Arc, + default_selectivity: u8, + ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: - let stats = self.statistics().unwrap(); - let mut eq_properties = self.input.equivalence_properties().clone(); - let (equal_pairs, _) = collect_columns_from_predicate(&self.predicate); + let stats = Self::statistics_helper(input, predicate, default_selectivity)?; + let mut eq_properties = input.equivalence_properties().clone(); + let (equal_pairs, _) = collect_columns_from_predicate(predicate); for (lhs, rhs) in equal_pairs { let lhs_expr = Arc::new(lhs.clone()) as _; let rhs_expr = Arc::new(rhs.clone()) as _; @@ -128,19 +174,17 @@ impl FilterExec { } // Add the columns that have only one viable value (singleton) after // filtering to constants. - let constants = collect_columns(self.predicate()) + let constants = collect_columns(predicate) .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| Arc::new(column) as _); eq_properties = eq_properties.add_constants(constants); - self.cache = PlanPropertiesCache::new( + Ok(PlanPropertiesCache::new( eq_properties, - self.input.output_partitioning().clone(), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - - self + input.output_partitioning().clone(), // Output Partitioning + input.execution_mode(), // Execution Mode + )) } } @@ -211,43 +255,7 @@ impl ExecutionPlan for FilterExec { /// The output statistics of a filtering operation can be estimated if the /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { - let predicate = self.predicate(); - - let input_stats = self.input.statistics()?; - let schema = self.schema(); - if !check_support(predicate, &schema) { - let selectivity = self.default_selectivity as f64 / 100.0; - let mut stats = input_stats.into_inexact(); - stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); - stats.total_byte_size = stats - .total_byte_size - .with_estimated_selectivity(selectivity); - return Ok(stats); - } - - let num_rows = input_stats.num_rows; - let total_byte_size = input_stats.total_byte_size; - let input_analysis_ctx = AnalysisContext::try_from_statistics( - &self.input.schema(), - &input_stats.column_statistics, - )?; - - let analysis_ctx = analyze(predicate, input_analysis_ctx, &self.schema())?; - - // Estimate (inexact) selectivity of predicate - let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); - let num_rows = num_rows.with_estimated_selectivity(selectivity); - let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); - - let column_statistics = collect_new_statistics( - &input_stats.column_statistics, - analysis_ctx.boundaries, - ); - Ok(Statistics { - num_rows, - total_byte_size, - column_statistics, - }) + Self::statistics_helper(&self.input, self.predicate(), self.default_selectivity) } } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index b20e8cac7926..472c65f25b30 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -35,7 +35,9 @@ use arrow_array::{ArrayRef, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{Distribution, PhysicalSortRequirement}; +use datafusion_physical_expr::{ + Distribution, EquivalenceProperties, PhysicalSortRequirement, +}; use async_trait::async_trait; use futures::StreamExt; @@ -104,7 +106,7 @@ impl FileSinkExec { sort_order: Option>, ) -> Self { let count_schema = make_count_schema(); - let cache = PlanPropertiesCache::new_default(count_schema); + let cache = Self::create_schema(&input, count_schema); Self { input, sink, @@ -113,7 +115,6 @@ impl FileSinkExec { sort_order, cache, } - .with_cache() } fn execute_input_stream( @@ -176,15 +177,16 @@ impl FileSinkExec { self.sink.metrics() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(self.input.execution_mode()); - - self + fn create_schema( + input: &Arc, + schema: SchemaRef, + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + input.execution_mode(), + ) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8b12b02b4667..42758e635060 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -27,9 +27,9 @@ use crate::coalesce_batches::concat_batches; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionMode, - ExecutionPlan, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, - Statistics, + exec_mode_flatten, ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, + ExecutionMode, ExecutionPlan, PlanPropertiesCache, RecordBatchStream, + SendableRecordBatchStream, Statistics, }; use arrow::datatypes::{Fields, Schema, SchemaRef}; @@ -77,7 +77,7 @@ impl CrossJoinExec { }; let schema = Arc::new(Schema::new(all_columns)); - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(&left, &right, schema.clone()); CrossJoinExec { left, right, @@ -86,7 +86,6 @@ impl CrossJoinExec { metrics: ExecutionPlanMetricsSet::default(), cache, } - .with_cache() } /// left (build) side which gets loaded in memory @@ -99,15 +98,19 @@ impl CrossJoinExec { &self.right } - fn with_cache(mut self) -> Self { + fn create_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + ) -> PlanPropertiesCache { // Calculate equivalence properties // TODO: Check equivalence properties of cross join, it may preserve // ordering in some cases. let eq_properties = join_equivalence_properties( - self.left.equivalence_properties().clone(), - self.right.equivalence_properties().clone(), + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), &JoinType::Full, - self.schema(), + schema, &[false, false], None, &[], @@ -117,19 +120,18 @@ impl CrossJoinExec { // TODO: Optimize the cross join implementation to generate M * N // partitions. let output_partitioning = adjust_right_output_partitioning( - self.right.output_partitioning(), - self.left.schema().fields.len(), + right.output_partitioning(), + left.schema().fields.len(), ); // Determine the execution mode: - let mode = match (self.left.execution_mode(), self.right.execution_mode()) { - (ExecutionMode::Bounded, ExecutionMode::Bounded) => ExecutionMode::Bounded, + let mut mode = exec_mode_flatten([left, right]); + if mode.is_unbounded() { // If any of the inputs is unbounded, cross join breaks the pipeline. - (_, _) => ExecutionMode::PipelineBreaking, - }; + mode = ExecutionMode::PipelineBreaking; + } - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 3aae053151cd..2b88ec449a04 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -65,6 +65,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::PhysicalExprRef; +use crate::joins::utils::JoinOnRef; use ahash::RandomState; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -327,7 +328,14 @@ impl HashJoinExec { let random_state = RandomState::with_seeds(0, 0, 0, 0); - let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone())); + let cache = Self::create_cache( + &left, + &right, + Arc::new(schema.clone()), + *join_type, + &on, + partition_mode, + ); Ok(HashJoinExec { left, @@ -343,8 +351,7 @@ impl HashJoinExec { column_indices, null_equals_null, cache, - } - .with_cache()) + }) } /// left (build) side which gets hashed @@ -399,25 +406,29 @@ impl HashJoinExec { JoinSide::Right } - fn with_cache(mut self) -> Self { - let left = &self.left; - let right = &self.right; - let schema = self.schema(); + fn create_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + on: JoinOnRef, + mode: PartitionMode, + ) -> PlanPropertiesCache { // Calculate equivalence properties: let eq_properties = join_equivalence_properties( left.equivalence_properties().clone(), right.equivalence_properties().clone(), - &self.join_type, + &join_type, schema, - &Self::maintains_input_order(self.join_type), + &Self::maintains_input_order(join_type), Some(Self::probe_side()), - &self.on, + on, ); // Get output partitioning: let left_columns_len = left.schema().fields.len(); - let output_partitioning = match self.mode { - PartitionMode::CollectLeft => match self.join_type { + let output_partitioning = match mode { + PartitionMode::CollectLeft => match join_type { JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( right.output_partitioning(), left_columns_len, @@ -433,7 +444,7 @@ impl HashJoinExec { ), }, PartitionMode::Partitioned => partitioned_join_output_partitioning( - self.join_type, + join_type, left.output_partitioning(), right.output_partitioning(), left_columns_len, @@ -449,7 +460,7 @@ impl HashJoinExec { let pipeline_breaking = left.execution_mode().is_unbounded() || (right.execution_mode().is_unbounded() && matches!( - self.join_type, + join_type, JoinType::Left | JoinType::Full | JoinType::LeftAnti @@ -462,8 +473,7 @@ impl HashJoinExec { exec_mode_flatten([left, right]) }; - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 1978338d2b6a..89beac14816d 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -109,19 +109,19 @@ impl NestedLoopJoinExec { check_join_is_valid(&left_schema, &right_schema, &[])?; let (schema, column_indices) = build_join_schema(&left_schema, &right_schema, join_type); - let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone())); + let schema = Arc::new(schema); + let cache = Self::create_cache(&left, &right, schema.clone(), *join_type); Ok(NestedLoopJoinExec { left, right, filter, join_type: *join_type, - schema: Arc::new(schema), + schema, inner_table: Default::default(), column_indices, metrics: Default::default(), cache, - } - .with_cache()) + }) } /// left side @@ -144,39 +144,43 @@ impl NestedLoopJoinExec { &self.join_type } - fn with_cache(mut self) -> Self { + fn create_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + ) -> PlanPropertiesCache { // Calculate equivalence properties: let eq_properties = join_equivalence_properties( - self.left.equivalence_properties().clone(), - self.right.equivalence_properties().clone(), - &self.join_type, - self.schema(), - &self.maintains_input_order(), + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + schema, + &[false, false], None, // No on columns in nested loop join &[], ); // Get output partitioning, - let output_partitioning = if self.join_type == JoinType::Full { - self.left.output_partitioning().clone() + let output_partitioning = if join_type == JoinType::Full { + left.output_partitioning().clone() } else { partitioned_join_output_partitioning( - self.join_type, - self.left.output_partitioning(), - self.right.output_partitioning(), - self.left.schema().fields.len(), + join_type, + left.output_partitioning(), + right.output_partitioning(), + left.schema().fields.len(), ) }; // Determine execution mode: - let mut mode = exec_mode_flatten([&self.left, &self.right]); + let mut mode = exec_mode_flatten([left, right]); if mode.is_unbounded() { mode = ExecutionMode::PipelineBreaking; } - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 0d7cd995a5a2..f7d754a99e0e 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -33,7 +33,7 @@ use std::task::{Context, Poll}; use crate::expressions::PhysicalSortExpr; use crate::joins::utils::{ build_join_schema, check_join_is_valid, estimate_join_statistics, - partitioned_join_output_partitioning, JoinFilter, JoinOn, + partitioned_join_output_partitioning, JoinFilter, JoinOn, JoinOnRef, }; use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use crate::{ @@ -137,7 +137,7 @@ impl SortMergeJoinExec { let schema = Arc::new(build_join_schema(&left_schema, &right_schema, &join_type).0); - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(&left, &right, schema.clone(), join_type, &on); Ok(Self { left, right, @@ -151,8 +151,7 @@ impl SortMergeJoinExec { sort_options, null_equals_null, cache, - } - .with_cache()) + }) } /// Get probe side (e.g streaming side) information for this sort merge join. @@ -201,32 +200,37 @@ impl SortMergeJoinExec { self.left.as_ref() } - fn with_cache(mut self) -> Self { + fn create_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + join_on: JoinOnRef, + ) -> PlanPropertiesCache { // Calculate equivalence properties: let eq_properties = join_equivalence_properties( - self.left.equivalence_properties().clone(), - self.right.equivalence_properties().clone(), - &self.join_type, - self.schema(), - &self.maintains_input_order(), - Some(Self::probe_side(&self.join_type)), - self.on(), + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + schema, + &Self::maintains_input_order(join_type), + Some(Self::probe_side(&join_type)), + join_on, ); // Get output partitioning: - let left_columns_len = self.left.schema().fields.len(); + let left_columns_len = left.schema().fields.len(); let output_partitioning = partitioned_join_output_partitioning( - self.join_type, - self.left.output_partitioning(), - self.right.output_partitioning(), + join_type, + left.output_partitioning(), + right.output_partitioning(), left_columns_len, ); // Determine execution mode: - let mode = exec_mode_flatten([&self.left, &self.right]); + let mode = exec_mode_flatten([left, right]); - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 5fd89e98a58f..4e07b10dd517 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -43,7 +43,7 @@ use crate::joins::stream_join_utils::{ use crate::joins::utils::{ apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, check_join_is_valid, partitioned_join_output_partitioning, ColumnIndex, JoinFilter, - JoinHashMapType, JoinOn, StatefulStreamResult, + JoinHashMapType, JoinOn, JoinOnRef, StatefulStreamResult, }; use crate::{ exec_mode_flatten, @@ -233,8 +233,8 @@ impl SymmetricHashJoinExec { // Initialize the random state for the join operation: let random_state = RandomState::with_seeds(0, 0, 0, 0); - - let cache = PlanPropertiesCache::new_default(Arc::new(schema)); + let schema = Arc::new(schema); + let cache = Self::create_cache(&left, &right, schema.clone(), *join_type, &on); Ok(SymmetricHashJoinExec { left, right, @@ -249,37 +249,41 @@ impl SymmetricHashJoinExec { right_sort_exprs, mode, cache, - } - .with_cache()) + }) } - fn with_cache(mut self) -> Self { + fn create_cache( + left: &Arc, + right: &Arc, + schema: SchemaRef, + join_type: JoinType, + join_on: JoinOnRef, + ) -> PlanPropertiesCache { // Calculate equivalence properties: let eq_properties = join_equivalence_properties( - self.left.equivalence_properties().clone(), - self.right.equivalence_properties().clone(), - &self.join_type, - self.schema(), - &self.maintains_input_order(), + left.equivalence_properties().clone(), + right.equivalence_properties().clone(), + &join_type, + schema, + &[false, false], // Has alternating probe side None, - self.on(), + join_on, ); // Get output partitioning: - let left_columns_len = self.left.schema().fields.len(); + let left_columns_len = left.schema().fields.len(); let output_partitioning = partitioned_join_output_partitioning( - self.join_type, - self.left.output_partitioning(), - self.right.output_partitioning(), + join_type, + left.output_partitioning(), + right.output_partitioning(), left_columns_len, ); // Determine execution mode: - let mode = exec_mode_flatten([&self.left, &self.right]); + let mode = exec_mode_flatten([left, right]); - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } /// left stream diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 52bfb0063d40..f90bbf061d38 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -540,22 +540,6 @@ impl PlanPropertiesCache { } } - /// Construct a default `PlanPropertiesCache`, for a given schema. - pub fn new_default(schema: SchemaRef) -> PlanPropertiesCache { - // Default values are the most restrictive possible values. - let eq_properties = EquivalenceProperties::new(schema); - // Please note that this default is not safe, and should be overwritten. - let partitioning = Partitioning::UnknownPartitioning(0); - let exec_mode = ExecutionMode::PipelineBreaking; - let output_ordering = None; - Self { - eq_properties, - partitioning, - exec_mode, - output_ordering, - } - } - /// Overwrite output partitioning with its new value. pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { self.partitioning = partitioning; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 59559e84fd75..a4b924d71066 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -57,7 +57,7 @@ pub struct GlobalLimitExec { impl GlobalLimitExec { /// Create a new GlobalLimitExec pub fn new(input: Arc, skip: usize, fetch: Option) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); GlobalLimitExec { input, skip, @@ -65,7 +65,6 @@ impl GlobalLimitExec { metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Input execution plan @@ -83,14 +82,12 @@ impl GlobalLimitExec { self.fetch } - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - Partitioning::UnknownPartitioning(1), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - - self + fn create_cache(input: &Arc) -> PlanPropertiesCache { + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } } @@ -276,14 +273,13 @@ pub struct LocalLimitExec { impl LocalLimitExec { /// Create a new LocalLimitExec partition pub fn new(input: Arc, fetch: usize) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); Self { input, fetch, metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Input execution plan @@ -296,14 +292,12 @@ impl LocalLimitExec { self.fetch } - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - self.input.output_partitioning().clone(), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - - self + fn create_cache(input: &Arc) -> PlanPropertiesCache { + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + input.output_partitioning().clone(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 206d22e72226..f6039ee8b3ed 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -153,7 +153,7 @@ impl MemoryExec { projection: Option>, ) -> Result { let projected_schema = project_schema(&schema, projection.as_ref())?; - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let cache = Self::create_cache(projected_schema.clone(), &[], partitions); Ok(Self { partitions: partitions.to_vec(), schema, @@ -161,8 +161,7 @@ impl MemoryExec { projection, sort_information: vec![], cache, - } - .with_cache()) + }) } pub fn partitions(&self) -> &[Vec] { @@ -193,27 +192,29 @@ impl MemoryExec { self.sort_information = sort_information; // We need to update equivalence properties when updating sort information. - let eq_properties = self.equivalent_properties_helper(); + let eq_properties = EquivalenceProperties::new_with_orderings( + self.schema(), + &self.sort_information, + ); self.cache = self.cache.with_eq_properties(eq_properties); - self.with_cache() + self } pub fn original_schema(&self) -> SchemaRef { self.schema.clone() } - fn equivalent_properties_helper(&self) -> EquivalenceProperties { - EquivalenceProperties::new_with_orderings(self.schema(), &self.sort_information) - } - - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.equivalent_properties_helper(), // Equivalence Properties - Partitioning::UnknownPartitioning(self.partitions.len()), // Output Partitioning - ExecutionMode::Bounded, // Execution Mode - ); - - self + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + partitions: &[Vec], + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); + PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + Partitioning::UnknownPartitioning(partitions.len()), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index a9b27cb13fe1..9d4204ddb589 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -32,6 +32,7 @@ use arrow::record_batch::RecordBatch; use arrow_array::RecordBatchOptions; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use log::trace; @@ -48,20 +49,20 @@ pub struct PlaceholderRowExec { impl PlaceholderRowExec { /// Create a new PlaceholderRowExec pub fn new(schema: SchemaRef) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let partitions = 1; + let cache = Self::create_cache(schema.clone(), partitions); PlaceholderRowExec { schema, - partitions: 1, + partitions, cache, } - .with_cache() } /// Create a new PlaceholderRowExecPlaceholderRowExec with specified partition number pub fn with_partitions(mut self, partitions: usize) -> Self { self.partitions = partitions; // Update output partitioning when updating partitions: - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(self.partitions); self.cache = self.cache.with_partitioning(output_partitioning); self } @@ -89,20 +90,20 @@ impl PlaceholderRowExec { }) } - fn output_partitioning_helper(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partitions) + fn output_partitioning_helper(n_partitions: usize) -> Partitioning { + Partitioning::UnknownPartitioning(n_partitions) } - fn with_cache(mut self) -> Self { + fn create_cache(schema: SchemaRef, n_partitions: usize) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: - let output_partitioning = self.output_partitioning_helper(); + let output_partitioning = Self::output_partitioning_helper(n_partitions); - self.cache = self - .cache - .with_partitioning(output_partitioning) - .with_exec_mode(ExecutionMode::Bounded); - - self + PlanPropertiesCache::new( + eq_properties, + output_partitioning, + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 4c17aa3d834a..7420cf58b5ce 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -56,9 +56,6 @@ pub struct ProjectionExec { schema: SchemaRef, /// The input plan input: Arc, - /// The mapping used to normalize expressions like Partitioning and - /// PhysicalSortExpr that maps input to output - projection_mapping: ProjectionMapping, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. @@ -96,16 +93,14 @@ impl ProjectionExec { // construct a map from the input expressions to the output expression of the Projection let projection_mapping = ProjectionMapping::try_new(&expr, &input_schema)?; - let cache = PlanPropertiesCache::new_default(schema.clone()); - let projection = Self { + let cache = Self::create_cache(&input, &projection_mapping, schema.clone())?; + Ok(Self { expr, schema, input, - projection_mapping, metrics: ExecutionPlanMetricsSet::new(), cache, - }; - projection.with_cache() + }) } /// The projection expressions stored as tuples of (expression, output column name) @@ -118,13 +113,15 @@ impl ProjectionExec { &self.input } - fn with_cache(mut self) -> Result { - let input = &self.input; + fn create_cache( + input: &Arc, + projection_mapping: &ProjectionMapping, + schema: SchemaRef, + ) -> Result { // Calculate equivalence properties: let mut input_eq_properties = input.equivalence_properties().clone(); - input_eq_properties.substitute_oeq_class(&self.projection_mapping)?; - let eq_properties = - input_eq_properties.project(&self.projection_mapping, self.schema.clone()); + input_eq_properties.substitute_oeq_class(projection_mapping)?; + let eq_properties = input_eq_properties.project(projection_mapping, schema); // Calculate output partitioning, which needs to respect aliases: let input_partition = input.output_partitioning(); @@ -134,7 +131,7 @@ impl ProjectionExec { .iter() .map(|expr| { input_eq_properties - .project_expr(expr, &self.projection_mapping) + .project_expr(expr, projection_mapping) .unwrap_or_else(|| { Arc::new(UnKnownColumn::new(&expr.to_string())) }) @@ -145,13 +142,11 @@ impl ProjectionExec { input_partition.clone() }; - self.cache = PlanPropertiesCache::new( + Ok(PlanPropertiesCache::new( eq_properties, output_partitioning, input.execution_mode(), - ); - - Ok(self) + )) } } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 97a626c25116..adc675ba2730 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -33,7 +33,8 @@ use arrow::record_batch::RecordBatch; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{not_impl_err, DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; + use futures::{ready, Stream, StreamExt}; /// Recursive query execution plan. @@ -81,7 +82,7 @@ impl RecursiveQueryExec { let work_table = Arc::new(WorkTable::new()); // Use the same work table for both the WorkTableExec and the recursive term let recursive_term = assign_work_table(recursive_term, work_table.clone())?; - let cache = PlanPropertiesCache::new_default(static_term.schema()); + let cache = Self::create_cache(static_term.schema()); Ok(RecursiveQueryExec { name, static_term, @@ -90,17 +91,17 @@ impl RecursiveQueryExec { work_table, metrics: ExecutionPlanMetricsSet::new(), cache, - } - .with_cache()) + }) } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - .with_partitioning(Partitioning::UnknownPartitioning(1)) - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 6d2835df05f0..dc1e88f52e56 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -44,7 +44,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr}; use futures::stream::Stream; use futures::{FutureExt, StreamExt}; @@ -436,12 +436,7 @@ impl ExecutionPlan for RepartitionExec { } fn maintains_input_order(&self) -> Vec { - if self.preserve_order { - vec![true] - } else { - // We preserve ordering when input partitioning is 1 - vec![self.input().output_partitioning().partition_count() <= 1] - } + Self::maintains_input_order_helper(self.input(), self.preserve_order) } fn execute( @@ -602,7 +597,8 @@ impl RepartitionExec { input: Arc, partitioning: Partitioning, ) -> Result { - let cache = PlanPropertiesCache::new_default(input.schema()); + let preserve_order = false; + let cache = Self::create_cache(&input, partitioning.clone(), preserve_order); Ok(RepartitionExec { input, partitioning, @@ -611,27 +607,49 @@ impl RepartitionExec { abort_helper: Arc::new(AbortOnDropMany::<()>(vec![])), })), metrics: ExecutionPlanMetricsSet::new(), - preserve_order: false, + preserve_order, cache, + }) + } + + fn maintains_input_order_helper( + input: &Arc, + preserve_order: bool, + ) -> Vec { + if preserve_order { + vec![true] + } else { + // We preserve ordering when input partitioning is 1 + vec![input.output_partitioning().partition_count() <= 1] } - .with_cache()) } - fn with_cache(mut self) -> Self { + fn eq_properties_helper( + input: &Arc, + preserve_order: bool, + ) -> EquivalenceProperties { // Equivalence Properties - let mut eq_properties = self.input.equivalence_properties().clone(); + let mut eq_properties = input.equivalence_properties().clone(); // If the ordering is lost, reset the ordering equivalence class: - if !self.maintains_input_order()[0] { + if !Self::maintains_input_order_helper(input, preserve_order)[0] { eq_properties.clear_orderings(); } + eq_properties + } - self.cache = PlanPropertiesCache::new( - eq_properties, // Equivalence Properties - self.partitioning.clone(), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); + fn create_cache( + input: &Arc, + partitioning: Partitioning, + preserve_order: bool, + ) -> PlanPropertiesCache { + // Equivalence Properties + let eq_properties = Self::eq_properties_helper(input, preserve_order); - self + PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + partitioning, // Output Partitioning + input.execution_mode(), // Execution Mode + ) } /// Specify if this reparititoning operation should preserve the order of @@ -648,7 +666,9 @@ impl RepartitionExec { // if there is only one input partition, merging is not required // to maintain order self.input.output_partitioning().partition_count() > 1; - self.with_cache() + let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); + self.cache = self.cache.with_eq_properties(eq_properties); + self } /// Return the sort expressions that are used to merge diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 99ead9f904a1..16c4bc8601b6 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -71,6 +71,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_common::Result; use datafusion_execution::{RecordBatchStream, TaskContext}; +use datafusion_physical_expr::LexOrdering; use futures::{ready, Stream, StreamExt}; use log::trace; @@ -104,17 +105,17 @@ impl PartialSortExec { common_prefix_length: usize, ) -> Self { assert!(common_prefix_length > 0); - let cache = PlanPropertiesCache::new_default(input.schema()); + let preserve_partitioning = false; + let cache = Self::create_cache(&input, expr.clone(), preserve_partitioning); Self { input, expr, common_prefix_length, metrics_set: ExecutionPlanMetricsSet::new(), - preserve_partitioning: false, + preserve_partitioning, fetch: None, cache, } - .with_cache() } /// Whether this `PartialSortExec` preserves partitioning of the children @@ -131,6 +132,12 @@ impl PartialSortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; + self.cache = self + .cache + .with_partitioning(Self::output_partitioning_helper( + &self.input, + self.preserve_partitioning, + )); self } @@ -161,27 +168,38 @@ impl PartialSortExec { self.fetch } - fn with_cache(mut self) -> Self { + fn output_partitioning_helper( + input: &Arc, + preserve_partitioning: bool, + ) -> Partitioning { + // Get output partitioning: + if preserve_partitioning { + input.output_partitioning().clone() + } else { + Partitioning::UnknownPartitioning(1) + } + } + + fn create_cache( + input: &Arc, + sort_exprs: LexOrdering, + preserve_partitioning: bool, + ) -> PlanPropertiesCache { // Calculate equivalence properties; i.e. reset the ordering equivalence // class with the new ordering: - let eq_properties = self - .input + let eq_properties = input .equivalence_properties() .clone() - .with_reorder(self.expr.to_vec()); + .with_reorder(sort_exprs); // Get output partitioning: - let output_partitioning = if self.preserve_partitioning { - self.input.output_partitioning().clone() - } else { - Partitioning::UnknownPartitioning(1) - }; + let output_partitioning = + Self::output_partitioning_helper(input, preserve_partitioning); // Determine execution mode: - let mode = self.input.execution_mode(); + let mode = input.execution_mode(); - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 55a3c9f068f1..a74705dd32ab 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -52,6 +52,7 @@ use datafusion_execution::memory_pool::{ }; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; +use datafusion_physical_expr::LexOrdering; use futures::{StreamExt, TryStreamExt}; use log::{debug, error, trace}; @@ -694,16 +695,16 @@ impl SortExec { /// Create a new sort execution plan that produces a single, /// sorted output partition. pub fn new(expr: Vec, input: Arc) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let preserve_partitioning = false; + let cache = Self::create_cache(&input, expr.clone(), preserve_partitioning); Self { expr, input, metrics_set: ExecutionPlanMetricsSet::new(), - preserve_partitioning: false, + preserve_partitioning, fetch: None, cache, } - .with_cache() } /// Create a new sort execution plan with the option to preserve @@ -737,7 +738,13 @@ impl SortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.with_cache() + self.cache = self + .cache + .with_partitioning(Self::output_partitioning_helper( + &self.input, + self.preserve_partitioning, + )); + self } /// Modify how many rows to include in the result @@ -767,33 +774,43 @@ impl SortExec { self.fetch } - fn with_cache(mut self) -> Self { + fn output_partitioning_helper( + input: &Arc, + preserve_partitioning: bool, + ) -> Partitioning { + // Get output partitioning: + if preserve_partitioning { + input.output_partitioning().clone() + } else { + Partitioning::UnknownPartitioning(1) + } + } + + fn create_cache( + input: &Arc, + sort_exprs: LexOrdering, + preserve_partitioning: bool, + ) -> PlanPropertiesCache { // Calculate equivalence properties; i.e. reset the ordering equivalence // class with the new ordering: - let eq_properties = self - .input + let eq_properties = input .equivalence_properties() .clone() - .with_reorder(self.expr.to_vec()); + .with_reorder(sort_exprs); // Get output partitioning: - let output_partitioning = if self.preserve_partitioning { - self.input.output_partitioning().clone() - } else { - Partitioning::UnknownPartitioning(1) - }; + let output_partitioning = + Self::output_partitioning_helper(input, preserve_partitioning); // Determine execution mode: - let mode = match self.input.execution_mode() { + let mode = match input.execution_mode() { ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { ExecutionMode::PipelineBreaking } ExecutionMode::Bounded => ExecutionMode::Bounded, }; - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index eadd2d0711fe..c07ae72d5492 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -80,7 +80,7 @@ pub struct SortPreservingMergeExec { impl SortPreservingMergeExec { /// Create a new sort execution plan pub fn new(expr: Vec, input: Arc) -> Self { - let cache = PlanPropertiesCache::new_default(input.schema()); + let cache = Self::create_cache(&input); Self { input, expr, @@ -88,7 +88,6 @@ impl SortPreservingMergeExec { fetch: None, cache, } - .with_cache() } /// Sets the number of rows to fetch pub fn with_fetch(mut self, fetch: Option) -> Self { @@ -111,14 +110,12 @@ impl SortPreservingMergeExec { self.fetch } - fn with_cache(mut self) -> Self { - self.cache = PlanPropertiesCache::new( - self.input.equivalence_properties().clone(), // Equivalence Properties - Partitioning::UnknownPartitioning(1), // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - - self + fn create_cache(input: &Arc) -> PlanPropertiesCache { + PlanPropertiesCache::new( + input.equivalence_properties().clone(), // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 60b372446805..e95fd37ab1b2 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -85,16 +85,22 @@ impl StreamingTableExec { Some(p) => Arc::new(schema.project(p)?), None => schema, }; - let cache = PlanPropertiesCache::new_default(projected_schema.clone()); + let projected_output_ordering = + projected_output_ordering.into_iter().collect::>(); + let cache = Self::create_cache( + projected_schema.clone(), + &projected_output_ordering, + &partitions, + infinite, + ); Ok(Self { partitions, projected_schema, projection: projection.cloned().map(Into::into), - projected_output_ordering: projected_output_ordering.into_iter().collect(), + projected_output_ordering, infinite, cache, - } - .with_cache()) + }) } pub fn partitions(&self) -> &Vec> { @@ -121,26 +127,26 @@ impl StreamingTableExec { self.infinite } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + orderings: &[LexOrdering], + partitions: &[Arc], + is_infinite: bool, + ) -> PlanPropertiesCache { // Calculate equivalence properties: - let eq_properties = EquivalenceProperties::new_with_orderings( - self.schema(), - &self.projected_output_ordering, - ); + let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings); // Get output partitioning: - let output_partitioning = - Partitioning::UnknownPartitioning(self.partitions.len()); + let output_partitioning = Partitioning::UnknownPartitioning(partitions.len()); // Determine execution mode: - let mode = if self.infinite { + let mode = if is_infinite { ExecutionMode::Unbounded } else { ExecutionMode::Bounded }; - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 77ff8d27157a..a677907295a7 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -34,6 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use futures::Stream; use tokio::sync::Barrier; @@ -132,14 +133,13 @@ impl MockExec { /// ensure any poll loops are correct. This behavior can be /// changed with `with_use_task` pub fn new(data: Vec>, schema: SchemaRef) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone()); Self { data, schema, use_task: true, cache, } - .with_cache() } /// If `use_task` is true (the default) then the batches are sent @@ -150,15 +150,14 @@ impl MockExec { self } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } @@ -290,14 +289,13 @@ impl BarrierExec { pub fn new(data: Vec>, schema: SchemaRef) -> Self { // wait for all streams and the input let barrier = Arc::new(Barrier::new(data.len() + 1)); - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone(), &data); Self { data, schema, barrier, cache, } - .with_cache() } /// wait until all the input streams and this function is ready @@ -307,15 +305,13 @@ impl BarrierExec { println!("BarrierExec::wait done waiting"); } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(self.data.len())) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + fn create_cache(schema: SchemaRef, data: &[Vec]) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(data.len()), + ExecutionMode::Bounded, + ) } } @@ -412,19 +408,18 @@ impl ErrorExec { DataType::Int64, true, )])); - let cache = PlanPropertiesCache::new_default(schema.clone()); - Self { cache }.with_cache() + let cache = Self::create_cache(schema.clone()); + Self { cache } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(1)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } @@ -486,24 +481,22 @@ impl StatisticsExec { .column_statistics.len(), schema.fields().len(), "if defined, the column statistics vector length should be the number of fields" ); - let cache = PlanPropertiesCache::new_default(Arc::new(schema.clone())); + let cache = Self::create_cache(Arc::new(schema.clone())); Self { stats, schema: Arc::new(schema), cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(2)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(2), + ExecutionMode::Bounded, + ) } } @@ -567,9 +560,6 @@ pub struct BlockingExec { /// Schema that is mocked by this plan. schema: SchemaRef, - /// Number of output partitions. - n_partitions: usize, - /// Ref-counting helper to check if the plan and the produced stream are still in memory. refs: Arc<()>, cache: PlanPropertiesCache, @@ -578,14 +568,12 @@ pub struct BlockingExec { impl BlockingExec { /// Create new [`BlockingExec`] with a give schema and number of partitions. pub fn new(schema: SchemaRef, n_partitions: usize) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone(), n_partitions); Self { schema, - n_partitions, refs: Default::default(), cache, } - .with_cache() } /// Weak pointer that can be used for ref-counting this execution plan and its streams. @@ -597,15 +585,14 @@ impl BlockingExec { Arc::downgrade(&self.refs) } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(self.n_partitions)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef, n_partitions: usize) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(n_partitions), + ExecutionMode::Bounded, + ) } } @@ -719,13 +706,13 @@ impl PanicExec { /// Create new [`PanicExec`] with a give schema and number of /// partitions, which will each panic immediately. pub fn new(schema: SchemaRef, n_partitions: usize) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let batches_until_panics = vec![0; n_partitions]; + let cache = Self::create_cache(schema.clone(), &batches_until_panics); Self { schema, - batches_until_panics: vec![0; n_partitions], + batches_until_panics, cache, } - .with_cache() } /// Set the number of batches prior to panic for a partition @@ -734,17 +721,18 @@ impl PanicExec { self } - fn with_cache(mut self) -> Self { - let num_partitions = self.batches_until_panics.len(); + fn create_cache( + schema: SchemaRef, + batches_until_panics: &[usize], + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + let num_partitions = batches_until_panics.len(); - self.cache = self - .cache - // Output Partitioning - .with_partitioning(Partitioning::UnknownPartitioning(num_partitions)) - // Execution Mode - .with_exec_mode(ExecutionMode::Bounded); - - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(num_partitions), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 42e5ce58edb0..06a870123255 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -98,13 +98,12 @@ impl UnionExec { /// Create a new UnionExec pub fn new(inputs: Vec>) -> Self { let schema = union_schema(&inputs); - let cache = PlanPropertiesCache::new_default(schema); + let cache = Self::create_cache(&inputs, schema); UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), cache, } - .with_cache() } /// Get inputs of the execution plan @@ -112,16 +111,18 @@ impl UnionExec { &self.inputs } - fn with_cache(mut self) -> Self { + fn create_cache( + inputs: &[Arc], + schema: SchemaRef, + ) -> PlanPropertiesCache { // Calculate equivalence properties: // TODO: In some cases, we should be able to preserve some equivalence // classes and constants. Add support for such cases. - let children_eqs = self - .inputs + let children_eqs = inputs .iter() .map(|child| child.equivalence_properties()) .collect::>(); - let mut eq_properties = EquivalenceProperties::new(self.schema()); + let mut eq_properties = EquivalenceProperties::new(schema); // Use the ordering equivalence class of the first child as the seed: let mut meets = children_eqs[0] .oeq_class() @@ -152,18 +153,16 @@ impl UnionExec { eq_properties.add_new_orderings(meets); // Calculate output partitioning; i.e. sum output partitions of the inputs. - let num_partitions = self - .inputs + let num_partitions = inputs .iter() .map(|plan| plan.output_partitioning().partition_count()) .sum(); let output_partitioning = Partitioning::UnknownPartitioning(num_partitions); // Determine execution mode: - let mode = exec_mode_flatten(self.inputs.iter()); + let mode = exec_mode_flatten(inputs.iter()); - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } @@ -323,20 +322,17 @@ pub struct InterleaveExec { impl InterleaveExec { /// Create a new InterleaveExec pub fn try_new(inputs: Vec>) -> Result { - let schema = union_schema(&inputs); - if !can_interleave(inputs.iter()) { return internal_err!( "Not all InterleaveExec children have a consistent hash partitioning" ); } - let cache = PlanPropertiesCache::new_default(schema); + let cache = Self::create_cache(&inputs); Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), cache, - } - .with_cache()) + }) } /// Get inputs of the execution plan @@ -344,18 +340,15 @@ impl InterleaveExec { &self.inputs } - fn with_cache(mut self) -> Self { + fn create_cache(inputs: &[Arc]) -> PlanPropertiesCache { + let schema = union_schema(inputs); + let eq_properties = EquivalenceProperties::new(schema); // Get output partitioning: - let output_partitioning = self.inputs[0].output_partitioning().clone(); + let output_partitioning = inputs[0].output_partitioning().clone(); // Determine execution mode: - let mode = exec_mode_flatten(self.inputs.iter()); + let mode = exec_mode_flatten(inputs.iter()); - self.cache = self - .cache - .with_partitioning(output_partitioning) - .with_exec_mode(mode); - - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index b9df57d84f81..ba90e8b4f1fc 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -37,6 +37,7 @@ use arrow::datatypes::{ use arrow::record_batch::RecordBatch; use datafusion_common::{exec_err, DataFusionError, Result, UnnestOptions}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; use async_trait::async_trait; use futures::{Stream, StreamExt}; @@ -70,7 +71,7 @@ impl UnnestExec { schema: SchemaRef, options: UnnestOptions, ) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(&input, schema.clone()); UnnestExec { input, schema, @@ -79,18 +80,19 @@ impl UnnestExec { metrics: Default::default(), cache, } - .with_cache() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - // Output Partitioning - .with_partitioning(self.input.output_partitioning().clone()) - // Execution Mode - .with_exec_mode(self.input.execution_mode()); - - self + fn create_cache( + input: &Arc, + schema: SchemaRef, + ) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); + + PlanPropertiesCache::new( + eq_properties, + input.output_partitioning().clone(), + input.execution_mode(), + ) } } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 7fc242099379..20c8eddce6bd 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -33,6 +33,7 @@ use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::{internal_err, plan_err, DataFusionError, Result, ScalarValue}; use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; /// Execution plan for values list based relation (produces constant rows) #[derive(Debug)] @@ -113,13 +114,12 @@ impl ValuesExec { } } - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone()); Ok(ValuesExec { schema, data: batches, cache, - } - .with_cache()) + }) } /// provides the data @@ -127,13 +127,14 @@ impl ValuesExec { self.data.clone() } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - .with_partitioning(Partitioning::UnknownPartitioning(1)) - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index cb512302cb6f..a9dfc9bfeedd 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -121,8 +121,8 @@ impl BoundedWindowAggExec { vec![] } }; - let cache = PlanPropertiesCache::new_default(schema.clone()); - let window = Self { + let cache = Self::create_cache(&input, &schema, &window_expr); + Ok(Self { input, window_expr, schema, @@ -131,8 +131,7 @@ impl BoundedWindowAggExec { input_order_mode, ordered_partition_by_indices, cache, - }; - Ok(window.with_cache()) + }) } /// Window expressions @@ -183,23 +182,25 @@ impl BoundedWindowAggExec { }) } - fn with_cache(mut self) -> Self { + fn create_cache( + input: &Arc, + schema: &SchemaRef, + window_expr: &[Arc], + ) -> PlanPropertiesCache { // Calculate equivalence properties: - let eq_properties = - window_equivalence_properties(&self.schema, &self.input, &self.window_expr); + let eq_properties = window_equivalence_properties(schema, input, window_expr); // As we can have repartitioning using the partition keys, this can // be either one or more than one, depending on the presence of // repartitioning. - let output_partitioning = self.input.output_partitioning().clone(); + let output_partitioning = input.output_partitioning().clone(); // Construct properties cache - self.cache = PlanPropertiesCache::new( - eq_properties, // Equivalence Properties - output_partitioning, // Output Partitioning - self.input.execution_mode(), // Execution Mode - ); - self + PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + output_partitioning, // Output Partitioning + input.execution_mode(), // Execution Mode + ) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 247588c971a2..852698bafe3a 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -80,8 +80,8 @@ impl WindowAggExec { let ordered_partition_by_indices = get_ordered_partition_by_indices(window_expr[0].partition_by(), &input); - let cache = PlanPropertiesCache::new_default(schema.clone()); - let window = Self { + let cache = Self::create_cache(schema.clone(), &input, &window_expr); + Ok(Self { input, window_expr, schema, @@ -89,8 +89,7 @@ impl WindowAggExec { metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, cache, - }; - Ok(window.with_cache()) + }) } /// Window expressions @@ -117,18 +116,21 @@ impl WindowAggExec { ) } - fn with_cache(mut self) -> Self { + fn create_cache( + schema: SchemaRef, + input: &Arc, + window_expr: &[Arc], + ) -> PlanPropertiesCache { // Calculate equivalence properties: - let eq_properties = - window_equivalence_properties(&self.schema, &self.input, &self.window_expr); + let eq_properties = window_equivalence_properties(&schema, input, window_expr); // Get output partitioning: // Because we can have repartitioning using the partition keys this // would be either 1 or more than 1 depending on the presense of repartitioning. - let output_partitioning = self.input.output_partitioning().clone(); + let output_partitioning = input.output_partitioning().clone(); // Determine execution mode: - let mode = match self.input.execution_mode() { + let mode = match input.execution_mode() { ExecutionMode::Bounded => ExecutionMode::Bounded, ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { ExecutionMode::PipelineBreaking @@ -136,8 +138,7 @@ impl WindowAggExec { }; // Construct properties cache: - self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); - self + PlanPropertiesCache::new(eq_properties, output_partitioning, mode) } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 33c611dd30d8..9a0b5daf27e4 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -33,7 +33,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; /// The name is from PostgreSQL's terminology. /// See @@ -91,7 +91,7 @@ pub struct WorkTableExec { impl WorkTableExec { /// Create a new execution plan for a worktable exec. pub fn new(name: String, schema: SchemaRef) -> Self { - let cache = PlanPropertiesCache::new_default(schema.clone()); + let cache = Self::create_cache(schema.clone()); Self { name, schema, @@ -99,7 +99,6 @@ impl WorkTableExec { work_table: Arc::new(WorkTable::new()), cache, } - .with_cache() } pub(super) fn with_work_table(&self, work_table: Arc) -> Self { @@ -112,13 +111,14 @@ impl WorkTableExec { } } - fn with_cache(mut self) -> Self { - self.cache = self - .cache - .with_partitioning(Partitioning::UnknownPartitioning(1)) - .with_exec_mode(ExecutionMode::Bounded); + fn create_cache(schema: SchemaRef) -> PlanPropertiesCache { + let eq_properties = EquivalenceProperties::new(schema); - self + PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ) } }