diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index 06d1ea4d46b9..da572b6d591f 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -34,13 +34,13 @@ use crate::{ use datafusion::common::plan_datafusion_err; use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; +use datafusion::logical_expr::dml::CopyTo; use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan}; use datafusion::physical_plan::{collect, execute_stream}; use datafusion::prelude::SessionContext; -use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str}; +use datafusion::sql::parser::{DFParser, Statement}; +use datafusion::sql::sqlparser::dialect::dialect_from_str; -use datafusion::logical_expr::dml::CopyTo; -use datafusion::sql::parser::Statement; use rustyline::error::ReadlineError; use rustyline::Editor; use tokio::signal; @@ -231,7 +231,7 @@ async fn exec_and_print( let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; - if physical_plan.unbounded_output().is_unbounded() { + if physical_plan.execution_mode().is_unbounded() { let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { @@ -305,10 +305,9 @@ mod tests { use std::str::FromStr; use super::*; - use datafusion::common::plan_err; - use datafusion_common::{ - file_options::StatementOptions, FileType, FileTypeWriterOptions, - }; + + use datafusion::common::{plan_err, FileType, FileTypeWriterOptions}; + use datafusion_common::file_options::StatementOptions; async fn create_external_table_test(location: &str, sql: &str) -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index 5ed1dfe05a14..9516dc570d6d 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -31,7 +31,7 @@ use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, - PlanPropertiesCache, SendableRecordBatchStream, + Partitioning, PlanPropertiesCache, SendableRecordBatchStream, }; use datafusion::prelude::*; use datafusion_expr::{Expr, LogicalPlanBuilder}; @@ -209,17 +209,11 @@ impl CustomExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; + self.cache = self + .cache + .with_partitioning(Partitioning::UnknownPartitioning(1)) + .with_exec_mode(ExecutionMode::Bounded); - // Output Partitioning - let output_partitioning = - datafusion::physical_plan::Partitioning::UnknownPartitioning(1); - new_cache = new_cache.with_partitioning(output_partitioning); - - // Execution Mode - new_cache = new_cache.with_exec_mode(ExecutionMode::Bounded); - - self.cache = new_cache; self } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 784b76fc3cae..cf7abc7e2314 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -28,10 +28,8 @@ use crate::arrow::record_batch::RecordBatch; use crate::arrow::util::pretty; use crate::datasource::{provider_as_source, MemTable, TableProvider}; use crate::error::Result; -use crate::execution::{ - context::{SessionState, TaskContext}, - FunctionRegistry, -}; +use crate::execution::context::{SessionState, TaskContext}; +use crate::execution::FunctionRegistry; use crate::logical_expr::utils::find_window_exprs; use crate::logical_expr::{ col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType, @@ -40,6 +38,7 @@ use crate::physical_plan::{ collect, collect_partitioned, execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; +use crate::prelude::SessionContext; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; @@ -58,7 +57,6 @@ use datafusion_expr::{ TableProviderFilterPushDown, UNNAMED_TABLE, }; -use crate::prelude::SessionContext; use async_trait::async_trait; /// Contains options that control how data is diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 1548d8160829..1a27f9315b34 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -20,6 +20,7 @@ use std::any::Any; use std::sync::Arc; +use super::FileGroupPartitioner; use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, FileScanConfig, @@ -42,8 +43,6 @@ use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; -use super::FileGroupPartitioner; - /// Execution plan for scanning Arrow data source #[derive(Debug, Clone)] #[allow(dead_code)] @@ -89,14 +88,11 @@ impl ArrowExec { &self.projected_output_ordering, ); - // Output Partitioning - let output_partitioning = self.output_partitioning_helper(); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + eq_properties, + self.output_partitioning_helper(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); self } diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4b2521c81609..fb2cd627a1da 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -24,14 +24,13 @@ use super::FileScanConfig; use crate::error::Result; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, Statistics, }; use arrow::datatypes::SchemaRef; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] @@ -73,16 +72,13 @@ impl AvroExec { self.schema(), &self.projected_output_ordering, ); + let n_partitions = self.base_config.file_groups.len(); - // Output Partitioning - let output_partitioning = - Partitioning::UnknownPartitioning(self.base_config.file_groups.len()); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + eq_properties, + Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); self } } @@ -168,6 +164,7 @@ mod private { use crate::datasource::avro_to_arrow::Reader as AvroReader; use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; use crate::datasource::physical_plan::FileMeta; + use bytes::Buf; use futures::StreamExt; use object_store::{GetResultPayload, ObjectStore}; diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 831b2f40e93e..55cf62507788 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -32,18 +32,17 @@ use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, Statistics, }; use arrow::csv; use arrow::datatypes::SchemaRef; +use datafusion_common::config::ConfigOptions; use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use bytes::{Buf, Bytes}; -use datafusion_common::config::ConfigOptions; use futures::{ready, StreamExt, TryStreamExt}; use object_store::{GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; @@ -128,14 +127,11 @@ impl CsvExec { &self.projected_output_ordering, ); - // Output Partitioning - let output_partitioning = self.output_partitioning_helper(); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + eq_properties, + self.output_partitioning_helper(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); self } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 9cb58e7032db..d21a1cd9fb21 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -28,9 +28,8 @@ use std::task::{Context, Poll}; use std::time::Instant; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::{ - FileMeta, FileScanConfig, PartitionColumnProjector, -}; +use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; +use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 5f3724cf2192..231b48f28d96 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -32,8 +32,8 @@ use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, Statistics, }; use arrow::json::ReaderBuilder; @@ -42,10 +42,8 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use bytes::{Buf, Bytes}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use futures::{ready, StreamExt, TryStreamExt}; -use object_store::{self, GetOptions}; -use object_store::{GetResultPayload, ObjectStore}; +use object_store::{self, GetOptions, GetResultPayload, ObjectStore}; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -97,14 +95,11 @@ impl NdJsonExec { &self.projected_output_ordering, ); - // Output Partitioning - let output_partitioning = self.output_partitioning_helper(); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + eq_properties, + self.output_partitioning_helper(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); self } @@ -378,11 +373,10 @@ pub async fn plan_to_json( #[cfg(test)] mod tests { - use arrow::array::Array; - use arrow::datatypes::{Field, SchemaBuilder}; - use futures::StreamExt; - use object_store::local::LocalFileSystem; + use std::fs; + use std::path::Path; + use super::*; use crate::assert_batches_eq; use crate::dataframe::DataFrameWriteOptions; use crate::datasource::file_format::file_compression_type::FileTypeExt; @@ -390,20 +384,23 @@ mod tests { use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::context::SessionState; - use crate::prelude::NdJsonReadOptions; - use crate::prelude::*; + use crate::prelude::{ + CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext, + }; use crate::test::partitioned_file_groups; + + use arrow::array::Array; + use arrow::datatypes::{Field, SchemaBuilder}; use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array}; use datafusion_common::FileType; + + use futures::StreamExt; use object_store::chunked::ChunkedStore; + use object_store::local::LocalFileSystem; use rstest::*; - use std::fs; - use std::path::Path; use tempfile::TempDir; use url::Url; - use super::*; - const TEST_DATA_BASE: &str = "tests/data"; async fn prepare_store( diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 31a45674b6bf..500fe4cb97f4 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -27,7 +27,6 @@ mod json; #[cfg(feature = "parquet")] pub mod parquet; pub use file_groups::FileGroupPartitioner; -use futures::StreamExt; pub(crate) use self::csv::plan_to_csv; pub use self::csv::{CsvConfig, CsvExec, CsvOpener}; @@ -37,7 +36,6 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor pub use arrow_file::ArrowExec; pub use avro::AvroExec; -use file_scan_config::PartitionColumnProjector; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; @@ -72,9 +70,9 @@ use datafusion_common::{file_options::FileTypeWriterOptions, plan_err}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use futures::StreamExt; use log::debug; -use object_store::ObjectMeta; -use object_store::{path::Path, GetOptions, GetRange, ObjectStore}; +use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore}; /// The base configurations to provide when creating a physical plan for /// writing to any given file format. @@ -589,6 +587,9 @@ async fn find_first_newline( #[cfg(test)] mod tests { + use super::*; + use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; + use arrow_array::cast::AsArray; use arrow_array::types::{Float32Type, Float64Type, UInt32Type}; use arrow_array::{ @@ -596,11 +597,8 @@ mod tests { UInt64Array, }; use arrow_schema::Field; - use chrono::Utc; - - use crate::physical_plan::{DefaultDisplay, VerboseDisplay}; - use super::*; + use chrono::Utc; #[test] fn schema_mapping_map_batch() { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index d12fb5fafbf7..6499aa085219 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -38,15 +38,14 @@ use crate::{ physical_optimizer::pruning::PruningPredicate, physical_plan::{ metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, Statistics, }, }; use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::ArrowError; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use bytes::Bytes; use futures::future::BoxFuture; @@ -64,7 +63,7 @@ use parquet::schema::types::ColumnDescriptor; use tokio::task::JoinSet; mod metrics; -pub mod page_filter; +mod page_filter; mod row_filter; mod row_groups; mod statistics; @@ -273,14 +272,11 @@ impl ParquetExec { &self.projected_output_ordering, ); - // Output Partitioning - let output_partitioning = self.output_partitioning_helper(); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + eq_properties, + self.output_partitioning_helper(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); self } @@ -776,6 +772,8 @@ pub(crate) fn parquet_to_arrow_decimal_type( #[cfg(test)] mod tests { // See also `parquet_exec` integration test + use std::fs::{self, File}; + use std::io::Write; use super::*; use crate::dataframe::DataFrameWriteOptions; @@ -793,28 +791,25 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::{ArrayRef, Int32Array}; - use arrow::datatypes::Schema; - use arrow::record_batch::RecordBatch; - use arrow::{ - array::{Int64Array, Int8Array, StringArray}, - datatypes::{DataType, Field, SchemaBuilder}, + + use arrow::array::{ + ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, + StructArray, }; - use arrow_array::{Date64Array, StructArray}; + use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder}; + use arrow::record_batch::RecordBatch; use arrow_schema::Fields; - use chrono::{TimeZone, Utc}; - use datafusion_common::{assert_contains, ToDFSchema}; - use datafusion_common::{FileType, GetExt, ScalarValue}; + use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue, ToDFSchema}; use datafusion_expr::{col, lit, when, Expr}; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::execution_props::ExecutionProps; + + use chrono::{TimeZone, Utc}; use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; - use std::fs::{self, File}; - use std::io::Write; use tempfile::TempDir; use url::Url; diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index de77f53e49b0..c7bfe4742bdf 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -426,31 +426,27 @@ where let join_key_pairs = extract_join_keys(on); let eq_properties = join_plan.plan.equivalence_properties(); - if let Some(( + let ( JoinKeyPairs { left_keys, right_keys, }, - new_positions, - )) = try_reorder(join_key_pairs.clone(), parent_required, eq_properties) - { - if !new_positions.is_empty() { + positions, + ) = try_reorder(join_key_pairs, parent_required, eq_properties); + + if let Some(positions) = positions { + if !positions.is_empty() { let new_join_on = new_join_conditions(&left_keys, &right_keys); let new_sort_options = (0..sort_options.len()) - .map(|idx| sort_options[new_positions[idx]]) + .map(|idx| sort_options[positions[idx]]) .collect(); join_plan.plan = join_constructor((new_join_on, new_sort_options))?; } - let mut requirements = join_plan; - requirements.children[0].data = left_keys; - requirements.children[1].data = right_keys; - Ok(requirements) - } else { - let mut requirements = join_plan; - requirements.children[0].data = join_key_pairs.left_keys; - requirements.children[1].data = join_key_pairs.right_keys; - Ok(requirements) } + let mut requirements = join_plan; + requirements.children[0].data = left_keys; + requirements.children[1].data = right_keys; + Ok(requirements) } fn reorder_aggregate_keys( @@ -604,32 +600,28 @@ pub(crate) fn reorder_join_keys_to_inputs( }) = plan_any.downcast_ref::() { if matches!(mode, PartitionMode::Partitioned) { - let join_key_pairs = extract_join_keys(on); - if let Some(( - JoinKeyPairs { - left_keys, - right_keys, - }, - new_positions, - )) = reorder_current_join_keys( - join_key_pairs, + let (join_keys, positions) = reorder_current_join_keys( + extract_join_keys(on), Some(left.output_partitioning()), Some(right.output_partitioning()), left.equivalence_properties(), right.equivalence_properties(), - ) { - if !new_positions.is_empty() { - let new_join_on = new_join_conditions(&left_keys, &right_keys); - return Ok(Arc::new(HashJoinExec::try_new( - left.clone(), - right.clone(), - new_join_on, - filter.clone(), - join_type, - PartitionMode::Partitioned, - *null_equals_null, - )?)); - } + ); + if positions.map_or(false, |idxs| !idxs.is_empty()) { + let JoinKeyPairs { + left_keys, + right_keys, + } = join_keys; + let new_join_on = new_join_conditions(&left_keys, &right_keys); + return Ok(Arc::new(HashJoinExec::try_new( + left.clone(), + right.clone(), + new_join_on, + filter.clone(), + join_type, + PartitionMode::Partitioned, + *null_equals_null, + )?)); } } } else if let Some(SortMergeJoinExec { @@ -643,24 +635,22 @@ pub(crate) fn reorder_join_keys_to_inputs( .. }) = plan_any.downcast_ref::() { - let join_key_pairs = extract_join_keys(on); - if let Some(( - JoinKeyPairs { - left_keys, - right_keys, - }, - new_positions, - )) = reorder_current_join_keys( - join_key_pairs, + let (join_keys, positions) = reorder_current_join_keys( + extract_join_keys(on), Some(left.output_partitioning()), Some(right.output_partitioning()), left.equivalence_properties(), right.equivalence_properties(), - ) { - if !new_positions.is_empty() { + ); + if let Some(positions) = positions { + if !positions.is_empty() { + let JoinKeyPairs { + left_keys, + right_keys, + } = join_keys; let new_join_on = new_join_conditions(&left_keys, &right_keys); let new_sort_options = (0..sort_options.len()) - .map(|idx| sort_options[new_positions[idx]]) + .map(|idx| sort_options[positions[idx]]) .collect(); return SortMergeJoinExec::try_new( left.clone(), @@ -685,24 +675,24 @@ fn reorder_current_join_keys( right_partition: Option<&Partitioning>, left_equivalence_properties: &EquivalenceProperties, right_equivalence_properties: &EquivalenceProperties, -) -> Option<(JoinKeyPairs, Vec)> { +) -> (JoinKeyPairs, Option>) { match (left_partition, right_partition) { (Some(Partitioning::Hash(left_exprs, _)), _) => { - try_reorder(join_keys.clone(), left_exprs, left_equivalence_properties) - .or_else(|| { - reorder_current_join_keys( - join_keys, - None, - right_partition, - left_equivalence_properties, - right_equivalence_properties, - ) - }) + match try_reorder(join_keys, left_exprs, left_equivalence_properties) { + (join_keys, None) => reorder_current_join_keys( + join_keys, + None, + right_partition, + left_equivalence_properties, + right_equivalence_properties, + ), + result => result, + } } (_, Some(Partitioning::Hash(right_exprs, _))) => { try_reorder(join_keys, right_exprs, right_equivalence_properties) } - _ => None, + _ => (join_keys, None), } } @@ -710,66 +700,65 @@ fn try_reorder( join_keys: JoinKeyPairs, expected: &[Arc], equivalence_properties: &EquivalenceProperties, -) -> Option<(JoinKeyPairs, Vec)> { +) -> (JoinKeyPairs, Option>) { let eq_groups = equivalence_properties.eq_group(); let mut normalized_expected = vec![]; let mut normalized_left_keys = vec![]; let mut normalized_right_keys = vec![]; if join_keys.left_keys.len() != expected.len() { - return None; + return (join_keys, None); } if physical_exprs_equal(expected, &join_keys.left_keys) || physical_exprs_equal(expected, &join_keys.right_keys) { - return Some((join_keys, vec![])); + return (join_keys, Some(vec![])); } else if !equivalence_properties.eq_group().is_empty() { normalized_expected = expected .iter() .map(|e| eq_groups.normalize_expr(e.clone())) .collect(); - assert_eq!(normalized_expected.len(), expected.len()); normalized_left_keys = join_keys .left_keys .iter() .map(|e| eq_groups.normalize_expr(e.clone())) .collect(); - assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len()); normalized_right_keys = join_keys .right_keys .iter() .map(|e| eq_groups.normalize_expr(e.clone())) .collect(); - assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len()); if physical_exprs_equal(&normalized_expected, &normalized_left_keys) || physical_exprs_equal(&normalized_expected, &normalized_right_keys) { - return Some((join_keys, vec![])); + return (join_keys, Some(vec![])); } } - let new_positions = expected_expr_positions(&join_keys.left_keys, expected) + let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected) .or_else(|| expected_expr_positions(&join_keys.right_keys, expected)) .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected)) .or_else(|| { expected_expr_positions(&normalized_right_keys, &normalized_expected) - }); - - new_positions.map(|positions| { - let mut new_left_keys = vec![]; - let mut new_right_keys = vec![]; - for pos in positions.iter() { - new_left_keys.push(join_keys.left_keys[*pos].clone()); - new_right_keys.push(join_keys.right_keys[*pos].clone()); - } - let pairs = JoinKeyPairs { - left_keys: new_left_keys, - right_keys: new_right_keys, - }; - (pairs, positions) - }) + }) + else { + return (join_keys, None); + }; + + let mut new_left_keys = vec![]; + let mut new_right_keys = vec![]; + for pos in positions.iter() { + new_left_keys.push(join_keys.left_keys[*pos].clone()); + new_right_keys.push(join_keys.right_keys[*pos].clone()); + } + let pairs = JoinKeyPairs { + left_keys: new_left_keys, + right_keys: new_right_keys, + }; + + (pairs, Some(positions)) } /// Return the expected expressions positions. @@ -882,12 +871,11 @@ fn add_hash_on_top( return Ok(input); } + let dist = Distribution::HashPartitioned(hash_exprs); let satisfied = input .plan .output_partitioning() - .satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || { - input.plan.equivalence_properties().clone() - }); + .satisfy(&dist, input.plan.equivalence_properties()); // Add hash repartitioning when: // - The hash distribution requirement is not satisfied, or @@ -900,7 +888,7 @@ fn add_hash_on_top( // requirements. // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). - let partitioning = Partitioning::Hash(hash_exprs, n_target); + let partitioning = dist.create_partitioning(n_target); let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)? .with_preserve_order(); let plan = Arc::new(repartition) as _; @@ -1076,7 +1064,7 @@ fn ensure_distribution( let enable_round_robin = config.optimizer.enable_round_robin_repartition; let repartition_file_scans = config.optimizer.repartition_file_scans; let batch_size = config.execution.batch_size; - let is_unbounded = dist_context.plan.unbounded_output().is_unbounded(); + let is_unbounded = dist_context.plan.execution_mode().is_unbounded(); // Use order preserving variants either of the conditions true // - it is desired according to config // - when plan is unbounded @@ -1374,17 +1362,11 @@ pub(crate) mod tests { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); - - // Output Partitioning - let output_partitioning = self.input.output_partitioning().clone(); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + self.input.output_partitioning().clone(), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); self } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index ea83e2b6b38f..b459c86518b6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -207,7 +207,7 @@ fn replace_with_partial_sort( let plan_any = plan.as_any(); if let Some(sort_plan) = plan_any.downcast_ref::() { let child = sort_plan.children()[0].clone(); - if !child.unbounded_output().is_unbounded() { + if !child.execution_mode().is_unbounded() { return Ok(plan); } diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 8e3d14543b3f..f74732305372 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -38,8 +38,7 @@ use crate::physical_plan::ExecutionPlan; use arrow_schema::Schema; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{internal_err, JoinSide}; -use datafusion_common::{DataFusionError, JoinType}; +use datafusion_common::{internal_err, DataFusionError, JoinSide, JoinType}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::sort_properties::SortProperties; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; @@ -465,8 +464,8 @@ fn hash_join_convert_symmetric_subrule( ) -> Result> { // Check if the current plan node is a HashJoinExec. if let Some(hash_join) = input.as_any().downcast_ref::() { - let left_unbounded = hash_join.left.unbounded_output().is_unbounded(); - let right_unbounded = hash_join.right.unbounded_output().is_unbounded(); + let left_unbounded = hash_join.left.execution_mode().is_unbounded(); + let right_unbounded = hash_join.right.execution_mode().is_unbounded(); // Process only if both left and right sides are unbounded. if left_unbounded && right_unbounded { // Determine the partition mode based on configuration. @@ -595,10 +594,8 @@ fn hash_join_swap_subrule( _config_options: &ConfigOptions, ) -> Result> { if let Some(hash_join) = input.as_any().downcast_ref::() { - let left_unbounded = hash_join.left.unbounded_output().is_unbounded(); - let right_unbounded = hash_join.right.unbounded_output().is_unbounded(); - if left_unbounded - && !right_unbounded + if hash_join.left.execution_mode().is_unbounded() + && !hash_join.right.execution_mode().is_unbounded() && matches!( *hash_join.join_type(), JoinType::Inner @@ -1779,12 +1776,12 @@ mod hash_join_tests { assert_eq!( ( t.case.as_str(), - if left.unbounded_output().is_unbounded() { + if left.execution_mode().is_unbounded() { SourceType::Unbounded } else { SourceType::Bounded }, - if right.unbounded_output().is_unbounded() { + if right.execution_mode().is_unbounded() { SourceType::Unbounded } else { SourceType::Bounded diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index 52a1afdbd1ba..5ddba79515ee 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -114,17 +114,11 @@ impl OutputRequirementExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); - - // Output Partitioning - let output_partitioning = self.input.output_partitioning().clone(); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + self.input.output_partitioning().clone(), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); self } } diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 9f694a5c7695..9a7afcb6409d 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -28,7 +28,7 @@ use crate::physical_plan::ExecutionPlan; use datafusion_common::config::OptimizerOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError}; +use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; @@ -50,9 +50,7 @@ impl PhysicalOptimizerRule for PipelineChecker { plan: Arc, config: &ConfigOptions, ) -> Result> { - let plan = - plan.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer))?; - Ok(plan) + plan.transform_up(&|p| check_finiteness_requirements(p, &config.optimizer)) } fn name(&self) -> &str { @@ -78,11 +76,11 @@ pub fn check_finiteness_requirements( the 'allow_symmetric_joins_without_pruning' configuration flag"); } } - if !input.unbounded_output().pipeline_friendly() { - Err(plan_datafusion_err!( + if !input.execution_mode().pipeline_friendly() { + plan_err!( "Cannot execute pipeline breaking queries, operator: {:?}", input - )) + ) } else { Ok(Transformed::No(input)) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 5fc24205f5da..8825feb45e98 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -241,7 +241,7 @@ pub(crate) fn replace_with_order_preserving_variants( // For unbounded cases, we replace with the order-preserving variant in any // case, as doing so helps fix the pipeline. Also replace if config allows. let use_order_preserving_variant = config.optimizer.prefer_existing_sort - || !requirements.plan.unbounded_output().pipeline_friendly(); + || !requirements.plan.execution_mode().pipeline_friendly(); // Create an alternate plan with order-preserving variants: let mut alternate_plan = plan_with_order_preserving_variants( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 165c8818f403..03e601eacf95 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1976,31 +1976,35 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { #[cfg(test)] mod tests { + use std::any::Any; + use std::collections::HashMap; + use std::convert::TryFrom; + use std::fmt::{self, Debug}; + use std::ops::{BitAnd, Not}; + use super::*; use crate::datasource::file_format::options::CsvReadOptions; use crate::datasource::MemTable; - use crate::physical_plan::{expressions, DisplayFormatType, Partitioning}; - use crate::physical_plan::{DisplayAs, SendableRecordBatchStream}; + use crate::physical_plan::{ + expressions, DisplayAs, DisplayFormatType, ExecutionMode, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, + }; use crate::physical_planner::PhysicalPlanner; use crate::prelude::{SessionConfig, SessionContext}; use crate::test_util::{scan_empty, scan_empty_with_partitions}; + use arrow::array::{ArrayRef, DictionaryArray, Int32Array}; use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef}; use arrow::record_batch::RecordBatch; - use datafusion_common::{assert_contains, TableReference}; - use datafusion_common::{DFField, DFSchema, DFSchemaRef}; + use datafusion_common::{ + assert_contains, DFField, DFSchema, DFSchemaRef, TableReference, + }; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_expr::{ col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder, UserDefinedLogicalNodeCore, }; - use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; - use fmt::Debug; - use std::collections::HashMap; - use std::convert::TryFrom; - use std::ops::{BitAnd, Not}; - use std::{any::Any, fmt}; fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); @@ -2572,16 +2576,13 @@ mod tests { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Execution Mode - new_cache = new_cache.with_exec_mode(ExecutionMode::Bounded); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 80560fd8d107..277901ff9915 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -369,6 +369,7 @@ pub struct StatisticsExec { schema: Arc, cache: PlanPropertiesCache, } + impl StatisticsExec { pub fn new(stats: Statistics, schema: Schema) -> Self { assert_eq!( @@ -385,15 +386,13 @@ impl StatisticsExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(2)); - - // Execution Mode - new_cache = new_cache.with_exec_mode(ExecutionMode::Bounded); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(2)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 8f5d7164e136..bb016f93c351 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -29,17 +29,16 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use tempfile::TempDir; - use crate::dataframe::DataFrame; use crate::datasource::provider::TableProviderFactory; +use crate::datasource::stream::{StreamConfig, StreamTable}; use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider}; use crate::error::Result; use crate::execution::context::{SessionState, TaskContext}; use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE}; use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, - SendableRecordBatchStream, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, }; use crate::prelude::{CsvReadOptions, SessionContext}; @@ -50,16 +49,13 @@ use datafusion_expr::{CreateExternalTable, Expr, TableType}; use async_trait::async_trait; use futures::Stream; +use tempfile::TempDir; // backwards compatibility #[cfg(feature = "parquet")] pub use datafusion_common::test_util::parquet_test_data; pub use datafusion_common::test_util::{arrow_test_data, get_data_dir}; -use crate::datasource::stream::{StreamConfig, StreamTable}; -pub use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; - /// Scan an empty data source, mainly used in tests pub fn scan_empty( name: Option<&str>, @@ -253,21 +249,17 @@ impl UnboundedExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - - // Output Partitioning - new_cache = new_cache - .with_partitioning(Partitioning::UnknownPartitioning(self.partitions)); - - // Execution Mode - let exec_mode = if self.batch_produce.is_none() { + let mode = if self.batch_produce.is_none() { ExecutionMode::Unbounded } else { ExecutionMode::Bounded }; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(self.partitions)) + // Execution Mode + .with_exec_mode(mode); self } } diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index 005b02a14054..a167258ee1d5 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -86,15 +86,13 @@ impl CustomExecutionPlan { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index 522c18a4d654..9423f0170c7e 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -27,15 +27,14 @@ use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, + PlanPropertiesCache, SendableRecordBatchStream, Statistics, }; use datafusion::prelude::*; use datafusion::scalar::ScalarValue; use datafusion_common::cast::as_primitive_array; use datafusion_common::{internal_err, not_impl_err, DataFusionError}; use datafusion_expr::expr::{BinaryExpr, Cast}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use async_trait::async_trait; @@ -68,15 +67,13 @@ impl CustomPlan { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 41915c1fd49e..315c7cb6dd26 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -26,14 +26,13 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, + Partitioning, PlanPropertiesCache, SendableRecordBatchStream, Statistics, }, prelude::SessionContext, scalar::ScalarValue, }; use datafusion_common::{project_schema, stats::Precision}; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use async_trait::async_trait; @@ -63,15 +62,13 @@ impl StatisticsValidation { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(2)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(2)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index f6488711b9f7..947376dfb6a0 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -70,7 +70,7 @@ use arrow::{ }; use datafusion::{ common::cast::{as_int64_array, as_string_array}, - common::{internal_err, DFSchemaRef}, + common::{arrow_datafusion_err, internal_err, DFSchemaRef}, error::{DataFusionError, Result}, execution::{ context::{QueryPlanner, SessionState, TaskContext}, @@ -82,14 +82,13 @@ use datafusion::{ }, optimizer::{optimize_children, OptimizerConfig, OptimizerRule}, physical_plan::{ - DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, + Partitioning, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, + Statistics, }, physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}, prelude::{SessionConfig, SessionContext}, }; -use datafusion_common::arrow_datafusion_err; -use datafusion_physical_plan::{ExecutionMode, PlanPropertiesCache}; use async_trait::async_trait; use futures::{Stream, StreamExt}; @@ -422,15 +421,13 @@ impl TopKExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 301f12e9aa2e..a4a246daf676 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -142,12 +142,12 @@ impl Partitioning { } } - /// Returns true when the guarantees made by this [[Partitioning]] are sufficient to - /// satisfy the partitioning scheme mandated by the `required` [[Distribution]] - pub fn satisfy EquivalenceProperties>( + /// Returns true when the guarantees made by this [`Partitioning`] are sufficient to + /// satisfy the partitioning scheme mandated by the `required` [`Distribution`]. + pub fn satisfy( &self, - required: Distribution, - eq_properties: F, + required: &Distribution, + eq_properties: &EquivalenceProperties, ) -> bool { match required { Distribution::UnspecifiedDistribution => true, @@ -159,11 +159,10 @@ impl Partitioning { // then we need to have the partition count and hash functions validation. Partitioning::Hash(partition_exprs, _) => { let fast_match = - physical_exprs_equal(&required_exprs, partition_exprs); + physical_exprs_equal(required_exprs, partition_exprs); // If the required exprs do not match, need to leverage the eq_properties provided by the child // and normalize both exprs based on the equivalent groups. if !fast_match { - let eq_properties = eq_properties(); let eq_groups = eq_properties.eq_group(); if !eq_groups.is_empty() { let normalized_required_exprs = required_exprs @@ -222,14 +221,14 @@ pub enum Distribution { impl Distribution { /// Creates a `Partitioning` that satisfies this `Distribution` - pub fn create_partitioning(&self, partition_count: usize) -> Partitioning { + pub fn create_partitioning(self, partition_count: usize) -> Partitioning { match self { Distribution::UnspecifiedDistribution => { Partitioning::UnknownPartitioning(partition_count) } Distribution::SinglePartition => Partitioning::UnknownPartitioning(1), Distribution::HashPartitioned(expr) => { - Partitioning::Hash(expr.clone(), partition_count) + Partitioning::Hash(expr, partition_count) } } } @@ -273,24 +272,15 @@ mod tests { let round_robin_partition = Partitioning::RoundRobinBatch(10); let hash_partition1 = Partitioning::Hash(partition_exprs1, 10); let hash_partition2 = Partitioning::Hash(partition_exprs2, 10); + let eq_properties = EquivalenceProperties::new(schema); for distribution in distribution_types { let result = ( - single_partition.satisfy(distribution.clone(), || { - EquivalenceProperties::new(schema.clone()) - }), - unspecified_partition.satisfy(distribution.clone(), || { - EquivalenceProperties::new(schema.clone()) - }), - round_robin_partition.satisfy(distribution.clone(), || { - EquivalenceProperties::new(schema.clone()) - }), - hash_partition1.satisfy(distribution.clone(), || { - EquivalenceProperties::new(schema.clone()) - }), - hash_partition2.satisfy(distribution.clone(), || { - EquivalenceProperties::new(schema.clone()) - }), + single_partition.satisfy(&distribution, &eq_properties), + unspecified_partition.satisfy(&distribution, &eq_properties), + round_robin_partition.satisfy(&distribution, &eq_properties), + hash_partition1.satisfy(&distribution, &eq_properties), + hash_partition2.satisfy(&distribution, &eq_properties), ); match distribution { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index a686f8f462d6..ea3fc3a737b2 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -506,22 +506,20 @@ impl AggregateExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Construct equivalence properties: let eq_properties = self .input .equivalence_properties() .project(&self.projection_mapping, self.schema()); - // Output partitioning + // Get output partitioning: let mut output_partitioning = self.input.output_partitioning().clone(); if self.mode.is_first_stage() { // First stage aggregation will not change the output partitioning, // but needs to respect aliases (e.g. mapping in the GROUP BY // expression). let input_eq_properties = self.input.equivalence_properties(); - // First stage Aggregation will not change the output partitioning but need to respect the Alias - let input_partition = self.input.output_partitioning(); - if let Partitioning::Hash(exprs, part) = input_partition { + if let Partitioning::Hash(exprs, part) = output_partitioning { let normalized_exprs = exprs .iter() .map(|expr| { @@ -532,24 +530,22 @@ impl AggregateExec { }) }) .collect(); - output_partitioning = Partitioning::Hash(normalized_exprs, *part); + output_partitioning = Partitioning::Hash(normalized_exprs, part); } } - // Unbounded Output - let mut unbounded_output = self.input.unbounded_output(); - if self.input.unbounded_output() == ExecutionMode::Unbounded + // Determine execution mode: + let mut exec_mode = self.input.execution_mode(); + if exec_mode == ExecutionMode::Unbounded && self.input_order_mode == InputOrderMode::Linear { - // Cannot run without breaking pipeline. - unbounded_output = ExecutionMode::PipelineBreaking; + // Cannot run without breaking the pipeline + exec_mode = ExecutionMode::PipelineBreaking; } - self.cache = PlanPropertiesCache::new( - eq_properties, - output_partitioning, - unbounded_output, - ); + self.cache = + PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self } @@ -1631,15 +1627,13 @@ mod tests { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 7d4bdebfe8a0..99b3add2acd6 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -82,15 +82,13 @@ impl AnalyzeExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(self.input.execution_mode()); - // Execution Mode - let exec_mode = self.input.unbounded_output(); - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 8d25f7df1d8f..e01060f3784d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -75,18 +75,14 @@ impl CoalesceBatchesExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); + // The coalesce batches operator does not make any changes to the + // partitioning of its input. + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + self.input.output_partitioning().clone(), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); - // Output Partitioning - // The coalesce batches operator does not make any changes to the partitioning of its input - let output_partitioning = self.input.output_partitioning().clone(); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 4815aaa2d4f0..255e996bd122 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -59,19 +59,16 @@ impl CoalescePartitionsExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Coalescing partitions loses existing orderings: let mut eq_properties = self.input.equivalence_properties().clone(); - // Coalesce partitions loses existing orderings. eq_properties.clear_orderings(); - // Output Partitioning - let output_partitioning = Partitioning::UnknownPartitioning(1); + self.cache = PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); - // Execution Mode - let exec_mode = self.input.unbounded_output(); - - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index fe5be52bc001..d4afca6a1acf 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -303,10 +303,7 @@ impl GraphvizVisitor<'_, '_> { impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> { type Error = fmt::Error; - fn pre_visit( - &mut self, - plan: &dyn ExecutionPlan, - ) -> datafusion_common::Result { + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { let id = self.graphviz_builder.next_id(); struct Wrapper<'a>(&'a dyn ExecutionPlan, DisplayFormatType); @@ -468,11 +465,11 @@ mod tests { use std::fmt::Write; use std::sync::Arc; - use datafusion_common::DataFusionError; - + use super::DisplayableExecutionPlan; use crate::{DisplayAs, ExecutionPlan, PlanPropertiesCache}; - use super::DisplayableExecutionPlan; + use datafusion_common::{DataFusionError, Result, Statistics}; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; #[derive(Debug, Clone, Copy)] enum TestStatsExecPlan { @@ -511,28 +508,25 @@ mod tests { fn with_new_children( self: Arc, _: Vec>, - ) -> datafusion_common::Result> { + ) -> Result> { unimplemented!() } fn execute( &self, _: usize, - _: Arc, - ) -> datafusion_common::Result - { + _: Arc, + ) -> Result { todo!() } - fn statistics(&self) -> datafusion_common::Result { + fn statistics(&self) -> Result { match self { Self::Panic => panic!("expected panic"), Self::Error => { Err(DataFusionError::Internal("expected error".to_string())) } - Self::Ok => Ok(datafusion_common::Statistics::new_unknown( - self.schema().as_ref(), - )), + Self::Ok => Ok(Statistics::new_unknown(self.schema().as_ref())), } } } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index dfd1e9d12904..d91395825135 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -58,8 +58,7 @@ impl EmptyExec { /// Create a new EmptyExec with specified partition number pub fn with_partitions(mut self, partitions: usize) -> Self { self.partitions = partitions; - // Changing partitions may invalidate output partitioning. - // Update it also. + // Changing partitions may invalidate output partitioning, so update it: let output_partitioning = self.output_partitioning_helper(); self.cache = self.cache.with_partitioning(output_partitioning); self @@ -74,17 +73,15 @@ impl EmptyExec { } fn with_cache(mut self) -> Self { - // Output Partitioning let output_partitioning = self.output_partitioning_helper(); - let mut new_cache = self.cache; - new_cache = new_cache.with_partitioning(output_partitioning); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(output_partitioning) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 1f7beb632501..935b37c168da 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -73,15 +73,13 @@ impl ExplainExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 9e0b644cd4fb..cc8fdcbcd0cd 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -116,9 +116,9 @@ impl FilterExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let stats = self.statistics().unwrap(); // Combine the equal predicates with the input equivalence properties + // to construct the equivalence properties: + let stats = self.statistics().unwrap(); let mut eq_properties = self.input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate(&self.predicate); for (lhs, rhs) in equal_pairs { @@ -126,21 +126,20 @@ impl FilterExec { let rhs_expr = Arc::new(rhs.clone()) as _; eq_properties.add_equal_conditions(&lhs_expr, &rhs_expr) } - // Add the columns that have only one value (singleton) after filtering to constants. + // Add the columns that have only one viable value (singleton) after + // filtering to constants. let constants = collect_columns(self.predicate()) .into_iter() .filter(|column| stats.column_statistics[column.index()].is_singleton()) .map(|column| Arc::new(column) as _); eq_properties = eq_properties.add_constants(constants); - // Output Partitioning - let output_partitioning = self.input.output_partitioning().clone(); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); + self.cache = PlanPropertiesCache::new( + eq_properties, + self.input.output_partitioning().clone(), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index bdc1c3bc40e3..b20e8cac7926 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -177,15 +177,13 @@ impl FileSinkExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(self.input.execution_mode()); - // Execution Mode - let exec_mode = self.input.unbounded_output(); - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 0f4fea1cab96..8b12b02b4667 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -23,13 +23,14 @@ use std::{any::Any, sync::Arc, task::Poll}; use super::utils::{ adjust_right_output_partitioning, BuildProbeJoinMetrics, OnceAsync, OnceFut, }; +use crate::coalesce_batches::concat_batches; +use crate::coalesce_partitions::CoalescePartitionsExec; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - ColumnStatistics, DisplayFormatType, Distribution, ExecutionPlan, - PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, Statistics, + ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionMode, + ExecutionPlan, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; -use crate::{DisplayAs, ExecutionMode}; use arrow::datatypes::{Fields, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; @@ -99,8 +100,9 @@ impl CrossJoinExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - // TODO: Check equivalence properties of crossjoin, for some cases it may preserve ordering. + // Calculate equivalence properties + // TODO: Check equivalence properties of cross join, it may preserve + // ordering in some cases. let eq_properties = join_equivalence_properties( self.left.equivalence_properties().clone(), self.right.equivalence_properties().clone(), @@ -111,25 +113,22 @@ impl CrossJoinExec { &[], ); - // Output Partitioning - // TODO optimize CrossJoin implementation to generate M * N partitions - let left_columns_len = self.left.schema().fields.len(); + // Get output partitioning: + // TODO: Optimize the cross join implementation to generate M * N + // partitions. let output_partitioning = adjust_right_output_partitioning( - self.right.output_partitioning().clone(), - left_columns_len, + self.right.output_partitioning(), + self.left.schema().fields.len(), ); - // Execution Mode - let left_unbounded = self.left.unbounded_output(); - let right_unbounded = self.right.unbounded_output(); - let exec_mode = match (left_unbounded, right_unbounded) { + // Determine the execution mode: + let mode = match (self.left.execution_mode(), self.right.execution_mode()) { (ExecutionMode::Bounded, ExecutionMode::Bounded) => ExecutionMode::Bounded, - // If any of the inputs is unbounded, cross join break pipeline. + // If any of the inputs is unbounded, cross join breaks the pipeline. (_, _) => ExecutionMode::PipelineBreaking, }; - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 8fb36b728ecb..7afe252f21ee 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -23,29 +23,26 @@ use std::sync::Arc; use std::task::Poll; use std::{any::Any, usize, vec}; -use crate::joins::utils::{ - adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, - get_final_indices_from_bit_map, need_produce_result_in_final, JoinHashMap, - JoinHashMapOffset, JoinHashMapType, +use super::{ + utils::{OnceAsync, OnceFut}, + PartitionMode, }; use crate::{ coalesce_partitions::CoalescePartitionsExec, - exec_mode_flatten, + exec_mode_flatten, handle_state, hash_utils::create_hashes, joins::utils::{ - adjust_right_output_partitioning, build_join_schema, check_join_is_valid, - estimate_join_statistics, partitioned_join_output_partitioning, - BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinOn, StatefulStreamResult, + adjust_indices_by_join_type, adjust_right_output_partitioning, + apply_join_filter_to_indices, build_batch_from_indices, build_join_schema, + check_join_is_valid, estimate_join_statistics, get_final_indices_from_bit_map, + need_produce_result_in_final, partitioned_join_output_partitioning, + BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMap, JoinHashMapOffset, + JoinHashMapType, JoinOn, StatefulStreamResult, }, metrics::{ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, Partitioning, - PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, Statistics, -}; -use crate::{handle_state, DisplayAs}; - -use super::{ - utils::{OnceAsync, OnceFut}, - PartitionMode, + DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan, + Partitioning, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, + Statistics, }; use arrow::array::{ @@ -299,6 +296,7 @@ pub struct HashJoinExec { /// Otherwise, rows that have `null`s in the join columns will not be /// matched and thus will not appear in the output. pub null_equals_null: bool, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -405,7 +403,7 @@ impl HashJoinExec { let left = &self.left; let right = &self.right; let schema = self.schema(); - // Equivalence properties + // Calculate equivalence properties: let eq_properties = join_equivalence_properties( left.equivalence_properties().clone(), right.equivalence_properties().clone(), @@ -416,12 +414,12 @@ impl HashJoinExec { &self.on, ); - // Output partitioning + // Get output partitioning: let left_columns_len = left.schema().fields.len(); let output_partitioning = match self.mode { PartitionMode::CollectLeft => match self.join_type { JoinType::Inner | JoinType::Right => adjust_right_output_partitioning( - right.output_partitioning().clone(), + right.output_partitioning(), left_columns_len, ), JoinType::RightSemi | JoinType::RightAnti => { @@ -436,8 +434,8 @@ impl HashJoinExec { }, PartitionMode::Partitioned => partitioned_join_output_partitioning( self.join_type, - left.output_partitioning().clone(), - right.output_partitioning().clone(), + left.output_partitioning(), + right.output_partitioning(), left_columns_len, ), PartitionMode::Auto => Partitioning::UnknownPartitioning( @@ -445,13 +443,11 @@ impl HashJoinExec { ), }; - // Unbounded output - let left_unbounded = left.unbounded_output().is_unbounded(); - let right_unbounded = right.unbounded_output().is_unbounded(); - // If left is unbounded, or right is unbounded with JoinType::Right, - // JoinType::Full, JoinType::RightAnti types. - let breaking = left_unbounded - || (right_unbounded + // Determine execution mode by checking whether this join is pipeline + // breaking. This happens when the left side is unbounded, or the right + // side is unbounded with `Right`, `Full` or `RightAnti` join types. + let pipeline_breaking = left.execution_mode().is_unbounded() + || (right.execution_mode().is_unbounded() && matches!( self.join_type, JoinType::Left @@ -460,14 +456,13 @@ impl HashJoinExec { | JoinType::LeftSemi )); - let exec_mode = if breaking { + let mode = if pipeline_breaking { ExecutionMode::PipelineBreaking } else { exec_mode_flatten([left, right]) }; - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 724735f28077..d4c3051e551b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -34,8 +34,8 @@ use crate::joins::utils::{ }; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ - exec_mode_safe_flatten, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, - PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, + exec_mode_flatten, DisplayAs, DisplayFormatType, Distribution, ExecutionMode, + ExecutionPlan, PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, }; use arrow::array::{ @@ -92,6 +92,7 @@ pub struct NestedLoopJoinExec { column_indices: Vec, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -144,7 +145,7 @@ impl NestedLoopJoinExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = join_equivalence_properties( self.left.equivalence_properties().clone(), self.right.equivalence_properties().clone(), @@ -156,24 +157,26 @@ impl NestedLoopJoinExec { &[], ); - // Output Partitioning - // the partition of output is determined by the rule of `required_input_distribution` + // Get output partitioning, which is determined by the rule of + // `required_input_distribution`: let output_partitioning = if self.join_type == JoinType::Full { self.left.output_partitioning().clone() } else { partitioned_join_output_partitioning( self.join_type, - self.left.output_partitioning().clone(), - self.right.output_partitioning().clone(), + self.left.output_partitioning(), + self.right.output_partitioning(), self.left.schema().fields.len(), ) }; - // Execution Mode - let exec_mode = exec_mode_safe_flatten([&self.left, &self.right]); + // Determine execution mode: + let mut mode = exec_mode_flatten([&self.left, &self.right]); + if mode.is_unbounded() { + mode = ExecutionMode::PipelineBreaking; + } - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 2f9729351a31..0d7cd995a5a2 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -43,8 +43,7 @@ use crate::{ }; use arrow::array::*; -use arrow::compute; -use arrow::compute::{concat_batches, take, SortOptions}; +use arrow::compute::{self, concat_batches, take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; @@ -84,6 +83,7 @@ pub struct SortMergeJoinExec { pub sort_options: Vec, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -202,7 +202,7 @@ impl SortMergeJoinExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = join_equivalence_properties( self.left.equivalence_properties().clone(), self.right.equivalence_properties().clone(), @@ -213,20 +213,19 @@ impl SortMergeJoinExec { self.on(), ); - // Output Partitioning + // Get output partitioning: let left_columns_len = self.left.schema().fields.len(); let output_partitioning = partitioned_join_output_partitioning( self.join_type, - self.left.output_partitioning().clone(), - self.right.output_partitioning().clone(), + self.left.output_partitioning(), + self.right.output_partitioning(), left_columns_len, ); - // Execution Mode - let exec_mode = exec_mode_flatten([&self.left, &self.right]); + // Determine execution mode: + let mode = exec_mode_flatten([&self.left, &self.right]); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index f636fc197168..5fd89e98a58f 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -71,9 +71,9 @@ use datafusion_execution::TaskContext; use datafusion_expr::interval_arithmetic::Interval; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; +use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use ahash::RandomState; -use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use futures::Stream; use hashbrown::HashSet; use parking_lot::Mutex; @@ -191,6 +191,7 @@ pub struct SymmetricHashJoinExec { pub(crate) right_sort_exprs: Option>, /// Partition Mode mode: StreamJoinPartitionMode, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -253,7 +254,7 @@ impl SymmetricHashJoinExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = join_equivalence_properties( self.left.equivalence_properties().clone(), self.right.equivalence_properties().clone(), @@ -265,20 +266,19 @@ impl SymmetricHashJoinExec { self.on(), ); - // Output Partitioning + // Get output partitioning: let left_columns_len = self.left.schema().fields.len(); let output_partitioning = partitioned_join_output_partitioning( self.join_type, - self.left.output_partitioning().clone(), - self.right.output_partitioning().clone(), + self.left.output_partitioning(), + self.right.output_partitioning(), left_columns_len, ); - // Execution Mode - let exec_mode = exec_mode_flatten([&self.left, &self.right]); + // Determine execution mode: + let mode = exec_mode_flatten([&self.left, &self.right]); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } @@ -1328,11 +1328,11 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; + use datafusion_common::ScalarValue; use datafusion_execution::config::SessionConfig; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, col, lit, Column}; - use datafusion_common::ScalarValue; use once_cell::sync::Lazy; use rstest::*; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index e6e3f83fd7e8..3dac0107d3ef 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -432,15 +432,15 @@ fn check_join_set_is_valid( /// Calculate the OutputPartitioning for Partitioned Join pub fn partitioned_join_output_partitioning( join_type: JoinType, - left_partitioning: Partitioning, - right_partitioning: Partitioning, + left_partitioning: &Partitioning, + right_partitioning: &Partitioning, left_columns_len: usize, ) -> Partitioning { match join_type { JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => { - left_partitioning + left_partitioning.clone() } - JoinType::RightSemi | JoinType::RightAnti => right_partitioning, + JoinType::RightSemi | JoinType::RightAnti => right_partitioning.clone(), JoinType::Right => { adjust_right_output_partitioning(right_partitioning, left_columns_len) } @@ -452,21 +452,18 @@ pub fn partitioned_join_output_partitioning( /// Adjust the right out partitioning to new Column Index pub fn adjust_right_output_partitioning( - right_partitioning: Partitioning, + right_partitioning: &Partitioning, left_columns_len: usize, ) -> Partitioning { match right_partitioning { - Partitioning::RoundRobinBatch(size) => Partitioning::RoundRobinBatch(size), - Partitioning::UnknownPartitioning(size) => { - Partitioning::UnknownPartitioning(size) - } Partitioning::Hash(exprs, size) => { let new_exprs = exprs - .into_iter() - .map(|expr| add_offset_to_expr(expr, left_columns_len)) + .iter() + .map(|expr| add_offset_to_expr(expr.clone(), left_columns_len)) .collect(); - Partitioning::Hash(new_exprs, size) + Partitioning::Hash(new_exprs, *size) } + result => result.clone(), } } diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index e7e451586fbc..52bfb0063d40 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -29,6 +29,7 @@ use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; use datafusion_common::utils::DataPtr; use datafusion_common::{DataFusionError, Result}; @@ -41,6 +42,7 @@ use datafusion_physical_expr::{ use futures::stream::TryStreamExt; use tokio::task::JoinSet; +mod ordering; mod topk; mod visitor; @@ -58,7 +60,6 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; -mod ordering; pub mod placeholder_row; pub mod projection; pub mod recursive_query; @@ -80,7 +81,6 @@ pub use crate::ordering::InputOrderMode; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; -use datafusion_common::config::ConfigOptions; pub use datafusion_common::hash_utils; pub use datafusion_common::utils::project_schema; pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; @@ -136,7 +136,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. - fn unbounded_output(&self) -> ExecutionMode { + fn execution_mode(&self) -> ExecutionMode { self.cache().exec_mode } @@ -450,33 +450,21 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { } } -/// Describes the execution mode of each operator's resulting stream. -/// -/// This enum defines the possible execution modes: -/// -/// - `Bounded`: Represents a mode where execution is limited or constrained within certain bounds. -/// In this mode, the process operates within defined limits or boundaries. -/// -/// - `Unbounded`: Indicates a mode where execution is not limited by any specific bounds. -/// Processes or tasks in this mode can operate without constraints or limitations. -/// -/// - `PipelineBreaking`: Denotes a mode where execution can potentially break pipeline constraints. -/// This mode may disrupt established pipelines or processes that rely on sequential operation. -/// -/// This enum can be used to specify the behavior or characteristics of a process or task -/// in various execution scenarios. +/// Describes the execution mode of an operator's resulting stream with respect +/// to its size and behavior. There are three possible execution modes: `Bounded`, +/// `Unbounded` and `PipelineBreaking`. #[derive(Clone, Copy, PartialEq, Debug)] pub enum ExecutionMode { /// Represents the mode where generated stream is bounded, e.g. finite. Bounded, /// Represents the mode where generated stream is unbounded, e.g. infinite. - /// Operator can generate streaming results with bounded memory. - /// In this mode, execution can still continue successfully. + /// Even though the operator generates an unbounded stream of results, it + /// works with bounded memory and execution can still continue successfully. Unbounded, - /// Represents the mode, where input stream to the operator is unbounded. However, - /// operator cannot generate streaming results from streaming inputs. In this case, - /// execution mode will be pipeline breaking. e.g. operator requires unbounded memory - /// to generate its result. + /// Represents the mode where some of the operator's input stream(s) are + /// unbounded; however, the operator cannot generate streaming results from + /// these streaming inputs. In this case, the execution mode will be pipeline + /// breaking, e.g. the operator requires unbounded memory to generate results. PipelineBreaking, } @@ -486,33 +474,31 @@ impl ExecutionMode { matches!(self, ExecutionMode::Unbounded) } - /// Check whether the execution is pipeline friendly. If so, operator can execute - /// safely. + /// Check whether the execution is pipeline friendly. If so, operator can + /// execute safely. pub fn pipeline_friendly(&self) -> bool { matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded) } } -fn exec_mode_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode { +/// Conservatively "combines" execution modes of a given collection of operators. +fn exec_mode_flatten<'a>( + children: impl IntoIterator>, +) -> ExecutionMode { let mut result = ExecutionMode::Bounded; - for mode in modes { + for mode in children.into_iter().map(|child| child.execution_mode()) { match (mode, result) { (ExecutionMode::PipelineBreaking, _) | (_, ExecutionMode::PipelineBreaking) => { - // If any of the modes is `InExecutable`. result is `PipelineBreaking` - // Return immediately + // If any of the modes is `PipelineBreaking`, so is the result: return ExecutionMode::PipelineBreaking; } - ( - ExecutionMode::Unbounded, - ExecutionMode::Bounded | ExecutionMode::Unbounded, - ) - | (ExecutionMode::Bounded, ExecutionMode::Unbounded) => { - // Unbounded mode eats up bounded mode. + (ExecutionMode::Unbounded, _) | (_, ExecutionMode::Unbounded) => { + // Unbounded mode eats up bounded mode: result = ExecutionMode::Unbounded; } (ExecutionMode::Bounded, ExecutionMode::Bounded) => { - // When both modes are bounded, result is bounded + // When both modes are bounded, so is the result: result = ExecutionMode::Bounded; } } @@ -520,63 +506,20 @@ fn exec_mode_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode { result } -/// Constructs execution mode of the operator using its children. -/// This util assumes operator applied is pipeline friendly. For operators -/// not pipeline friendly, `exec_mode_safe_flatten` should be used. -fn exec_mode_flatten<'a>( - children: impl IntoIterator>, -) -> ExecutionMode { - let modes = children - .into_iter() - .map(|child| child.unbounded_output()) - .collect::>(); - exec_mode_flatten_helper(&modes) -} - -fn exec_mode_safe_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode { - let mut result = ExecutionMode::Bounded; - for mode in modes { - match (mode, result) { - (ExecutionMode::Bounded, ExecutionMode::Bounded) => { - // When both modes are bounded, result is bounded - result = ExecutionMode::Bounded; - } - (_, _) => { - // If any of the modes is `InExecutable` or `Unbounded`. result is `PipelineBreaking` - // Return immediately - return ExecutionMode::PipelineBreaking; - } - } - } - result -} - -/// Constructs execution mode of the operator using its children. -/// This util assumes operator applied is not pipeline friendly. For operators -/// that pipeline friendly, `exec_mode_flatten` should be used. -fn exec_mode_safe_flatten<'a>( - children: impl IntoIterator>, -) -> ExecutionMode { - let modes = children - .into_iter() - .map(|child| child.unbounded_output()) - .collect::>(); - exec_mode_safe_flatten_helper(&modes) -} - /// Represents a cache for plan properties used in query optimization. /// /// This struct holds various properties useful for the query planning, which are used /// during optimization and execution phases. #[derive(Debug, Clone)] pub struct PlanPropertiesCache { - /// Stores Equivalence Properties of the [`ExecutionPlan`]. See [`EquivalenceProperties`] + /// Stores the [`EquivalenceProperties`] of the [`ExecutionPlan`]. pub eq_properties: EquivalenceProperties, - /// Stores Output Partitioning of the [`ExecutionPlan`]. See [`Partitioning`] + /// Stores the output [`Partitioning`] of the [`ExecutionPlan`]. pub partitioning: Partitioning, - /// Stores Execution Mode of the [`ExecutionPlan`]. See [`ExecutionMode`] + /// Stores the [`ExecutionMode`] of the [`ExecutionPlan`]. pub exec_mode: ExecutionMode, - /// Stores output ordering of the [`ExecutionPlan`]. `None` represents, no ordering. + /// Stores output ordering of the [`ExecutionPlan`]. A `None` value represents + /// no ordering. output_ordering: Option, } @@ -599,7 +542,7 @@ impl PlanPropertiesCache { /// Construct a default `PlanPropertiesCache`, for a given schema. pub fn new_default(schema: SchemaRef) -> PlanPropertiesCache { - // Defaults are most restrictive possible values. + // Default values are the most restrictive possible values. let eq_properties = EquivalenceProperties::new(schema); // Please note that this default is not safe, and should be overwritten. let partitioning = Partitioning::UnknownPartitioning(0); @@ -613,21 +556,22 @@ impl PlanPropertiesCache { } } - /// Overwrite partitioning with its new value + /// Overwrite output partitioning with its new value. pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { self.partitioning = partitioning; self } - /// Overwrite Execution Mode with its new value + /// Overwrite the execution Mode with its new value. pub fn with_exec_mode(mut self, exec_mode: ExecutionMode) -> Self { self.exec_mode = exec_mode; self } - /// Overwrite Equivalence Properties with its new value + /// Overwrite equivalence properties with its new value. pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { - // Changing equivalence properties, changes output ordering also. Make sure to overwrite it. + // Changing equivalence properties also changes output ordering, so + // make sure to overwrite it: self.output_ordering = eq_properties.oeq_class().output_ordering(); self.eq_properties = eq_properties; self diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 8b7313436469..59559e84fd75 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -84,17 +84,12 @@ impl GlobalLimitExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); - - // Output Partitioning - let output_partitioning = Partitioning::UnknownPartitioning(1); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } @@ -302,17 +297,12 @@ impl LocalLimitExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); - - // Output Partitioning - let output_partitioning = self.input.output_partitioning().clone(); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + self.input.output_partitioning().clone(), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 3a994f41a0be..206d22e72226 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -192,7 +192,7 @@ impl MemoryExec { pub fn with_sort_information(mut self, sort_information: Vec) -> Self { self.sort_information = sort_information; - // With updated sort information, we need to update equivalence properties also. + // We need to update equivalence properties when updating sort information. let eq_properties = self.equivalent_properties_helper(); self.cache = self.cache.with_eq_properties(eq_properties); self.with_cache() @@ -207,18 +207,12 @@ impl MemoryExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.equivalent_properties_helper(); - - // Output Partitioning - let output_partitioning = - Partitioning::UnknownPartitioning(self.partitions.len()); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; + self.cache = PlanPropertiesCache::new( + self.equivalent_properties_helper(), // Equivalence Properties + Partitioning::UnknownPartitioning(self.partitions.len()), // Output Partitioning + ExecutionMode::Bounded, // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index a35d259f153a..a9b27cb13fe1 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -60,7 +60,7 @@ impl PlaceholderRowExec { /// Create a new PlaceholderRowExecPlaceholderRowExec with specified partition number pub fn with_partitions(mut self, partitions: usize) -> Self { self.partitions = partitions; - // When changing partitions, output partitions should change also. + // Update output partitioning when updating partitions: let output_partitioning = self.output_partitioning_helper(); self.cache = self.cache.with_partitioning(output_partitioning); self @@ -94,16 +94,14 @@ impl PlaceholderRowExec { } fn with_cache(mut self) -> Self { + // Get output partitioning: let output_partitioning = self.output_partitioning_helper(); - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(output_partitioning); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + .with_partitioning(output_partitioning) + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } @@ -180,8 +178,7 @@ impl ExecutionPlan for PlaceholderRowExec { #[cfg(test)] mod tests { use super::*; - use crate::with_new_children_if_necessary; - use crate::{common, test}; + use crate::{common, test, with_new_children_if_necessary}; #[test] fn with_new_children() -> Result<()> { diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index eb45e701544e..68592972498e 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -61,6 +61,7 @@ pub struct ProjectionExec { projection_mapping: ProjectionMapping, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -119,13 +120,12 @@ impl ProjectionExec { fn with_cache(mut self) -> Self { let input = &self.input; - // Equivalence properties + // Calculate equivalence properties: let input_eq_properties = input.equivalence_properties(); let eq_properties = input_eq_properties.project(&self.projection_mapping, self.schema.clone()); - // output partitioning - // Output partition need to respect the alias + // Calculate output partitioning, which needs to respect aliases: let input_partition = input.output_partitioning(); let output_partitioning = if let Partitioning::Hash(exprs, part) = input_partition { @@ -144,16 +144,12 @@ impl ProjectionExec { input_partition.clone() }; - // unbounded output - let unbounded_output = input.unbounded_output(); - - // Construct cache - let cache = PlanPropertiesCache::new( + self.cache = PlanPropertiesCache::new( eq_properties, output_partitioning, - unbounded_output, + input.execution_mode(), ); - self.cache = cache; + self } } diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index e49c94d6c646..97a626c25116 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -21,13 +21,12 @@ use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; -use super::metrics::BaselineMetrics; use super::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, + metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, work_table::{WorkTable, WorkTableExec}, - ExecutionMode, SendableRecordBatchStream, Statistics, + PlanPropertiesCache, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use super::{PlanPropertiesCache, RecordBatchStream}; +use crate::{DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -37,8 +36,6 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::Partitioning; use futures::{ready, Stream, StreamExt}; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlan}; - /// Recursive query execution plan. /// /// This plan has two components: a base part (the static term) and @@ -68,6 +65,7 @@ pub struct RecursiveQueryExec { is_distinct: bool, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -97,15 +95,11 @@ impl RecursiveQueryExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + .with_partitioning(Partitioning::UnknownPartitioning(1)) + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } @@ -144,12 +138,13 @@ impl ExecutionPlan for RecursiveQueryExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(RecursiveQueryExec::try_new( + RecursiveQueryExec::try_new( self.name.clone(), children[0].clone(), children[1].clone(), self.is_distinct, - )?)) + ) + .map(|e| Arc::new(e) as _) } fn execute( diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 54b5a5404d24..6d2835df05f0 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -24,36 +24,34 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, vec}; -use arrow::array::{ArrayRef, UInt64Builder}; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use futures::stream::Stream; -use futures::{FutureExt, StreamExt}; -use hashbrown::HashMap; -use log::trace; -use parking_lot::Mutex; -use tokio::task::JoinHandle; - -use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; -use datafusion_execution::memory_pool::MemoryConsumer; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalExpr; - +use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; +use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream}; use crate::common::transpose; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; -use crate::repartition::distributor_channels::{channels, partition_aware_channels}; +use crate::repartition::distributor_channels::{ + channels, partition_aware_channels, DistributionReceiver, DistributionSender, +}; use crate::sorts::streaming_merge; use crate::{ DisplayFormatType, ExecutionPlan, Partitioning, PlanPropertiesCache, Statistics, }; -use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; -use super::expressions::PhysicalSortExpr; -use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; -use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream}; +use arrow::array::{ArrayRef, UInt64Builder}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; +use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; -use self::distributor_channels::{DistributionReceiver, DistributionSender}; +use futures::stream::Stream; +use futures::{FutureExt, StreamExt}; +use hashbrown::HashMap; +use log::trace; +use parking_lot::Mutex; +use tokio::task::JoinHandle; mod distributor_channels; @@ -297,19 +295,16 @@ impl BatchPartitioner { pub struct RepartitionExec { /// Input execution plan input: Arc, - /// Partitioning scheme to use partitioning: Partitioning, - /// Inner state that is initialized when the first output stream is created. state: Arc>, - /// Execution metrics metrics: ExecutionPlanMetricsSet, - /// Boolean flag to decide whether to preserve ordering. If true means /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. preserve_order: bool, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -625,19 +620,17 @@ impl RepartitionExec { fn with_cache(mut self) -> Self { // Equivalence Properties let mut eq_properties = self.input.equivalence_properties().clone(); - // If the ordering is lost, reset the ordering equivalence class. + // If the ordering is lost, reset the ordering equivalence class: if !self.maintains_input_order()[0] { eq_properties.clear_orderings(); } - // Output Partitioning - let output_partitioning = self.partitioning.clone(); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); + self.cache = PlanPropertiesCache::new( + eq_properties, // Equivalence Properties + self.partitioning.clone(), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index d59a3577acc0..99ead9f904a1 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -57,16 +57,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::compute::concat_batches; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use futures::{ready, Stream, StreamExt}; -use log::trace; - -use datafusion_common::utils::evaluate_partition_ranges; -use datafusion_common::Result; -use datafusion_execution::{RecordBatchStream, TaskContext}; - use crate::expressions::PhysicalSortExpr; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; @@ -75,6 +65,16 @@ use crate::{ PlanPropertiesCache, SendableRecordBatchStream, Statistics, }; +use arrow::compute::concat_batches; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::utils::evaluate_partition_ranges; +use datafusion_common::Result; +use datafusion_execution::{RecordBatchStream, TaskContext}; + +use futures::{ready, Stream, StreamExt}; +use log::trace; + /// Partial Sort execution plan. #[derive(Debug, Clone)] pub struct PartialSortExec { @@ -92,6 +92,7 @@ pub struct PartialSortExec { preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -161,26 +162,25 @@ impl PartialSortExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - // Reset the ordering equivalence class with the new ordering: + // Calculate equivalence properties; i.e. reset the ordering equivalence + // class with the new ordering: let eq_properties = self .input .equivalence_properties() .clone() .with_reorder(self.expr.to_vec()); - // Output Partitioning + // Get output partitioning: let output_partitioning = if self.preserve_partitioning { self.input.output_partitioning().clone() } else { Partitioning::UnknownPartitioning(1) }; - // Execution Mode - let exec_mode = self.input.unbounded_output(); + // Determine execution mode: + let mode = self.input.execution_mode(); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e97c550cdcd4..55a3c9f068f1 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -676,6 +676,7 @@ pub struct SortExec { preserve_partitioning: bool, /// Fetch highest/lowest n results fetch: Option, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -767,31 +768,31 @@ impl SortExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - // Reset the ordering equivalence class with the new ordering: + // Calculate equivalence properties; i.e. reset the ordering equivalence + // class with the new ordering: let eq_properties = self .input .equivalence_properties() .clone() .with_reorder(self.expr.to_vec()); - // Output Partitioning + // Get output partitioning: let output_partitioning = if self.preserve_partitioning { self.input.output_partitioning().clone() } else { Partitioning::UnknownPartitioning(1) }; - // Execution Mode - let exec_mode = match self.input.unbounded_output() { + // Determine execution mode: + let mode = match self.input.execution_mode() { ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { ExecutionMode::PipelineBreaking } ExecutionMode::Bounded => ExecutionMode::Bounded, }; - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); + self } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 6a0ab8cb5748..eadd2d0711fe 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -73,6 +73,7 @@ pub struct SortPreservingMergeExec { metrics: ExecutionPlanMetricsSet, /// Optional number of rows to fetch. Stops producing rows after this fetch fetch: Option, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -111,17 +112,12 @@ impl SortPreservingMergeExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties - let eq_properties = self.input.equivalence_properties().clone(); - - // Output Partitioning - let output_partitioning = Partitioning::UnknownPartitioning(1); - - // Execution Mode - let exec_mode = self.input.unbounded_output(); + self.cache = PlanPropertiesCache::new( + self.input.equivalence_properties().clone(), // Equivalence Properties + Partitioning::UnknownPartitioning(1), // Output Partitioning + self.input.execution_mode(), // Execution Mode + ); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); self } } diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index dbb7805261a7..60b372446805 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -122,25 +122,24 @@ impl StreamingTableExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = EquivalenceProperties::new_with_orderings( self.schema(), &self.projected_output_ordering, ); - // Output Partitioning + // Get output partitioning: let output_partitioning = Partitioning::UnknownPartitioning(self.partitions.len()); - // Execution Mode - let exec_mode = if self.infinite { + // Determine execution mode: + let mode = if self.infinite { ExecutionMode::Unbounded } else { ExecutionMode::Bounded }; - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index cfbbd054bac2..77ff8d27157a 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -151,15 +151,13 @@ impl MockExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } @@ -310,16 +308,13 @@ impl BarrierExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache - .with_partitioning(Partitioning::UnknownPartitioning(self.data.len())); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(self.data.len())) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } @@ -422,15 +417,13 @@ impl ErrorExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(1)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } @@ -503,15 +496,13 @@ impl StatisticsExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(2)); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(2)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } @@ -607,16 +598,13 @@ impl BlockingExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache - .with_partitioning(Partitioning::UnknownPartitioning(self.n_partitions)); - - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(self.n_partitions)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } @@ -747,17 +735,15 @@ impl PanicExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning let num_partitions = self.batches_until_panics.len(); - new_cache = new_cache - .with_partitioning(Partitioning::UnknownPartitioning(num_partitions)); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(Partitioning::UnknownPartitioning(num_partitions)) + // Execution Mode + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 974ea630b160..42e5ce58edb0 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -90,6 +90,7 @@ pub struct UnionExec { inputs: Vec>, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -112,7 +113,7 @@ impl UnionExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: // TODO: In some cases, we should be able to preserve some equivalence // classes and constants. Add support for such cases. let children_eqs = self @@ -150,8 +151,7 @@ impl UnionExec { // entries (implicitly) and return: eq_properties.add_new_orderings(meets); - // Output Partitioning - // Output the combination of all output partitions of the inputs if the Union is not partition aware + // Calculate output partitioning; i.e. sum output partitions of the inputs. let num_partitions = self .inputs .iter() @@ -159,11 +159,10 @@ impl UnionExec { .sum(); let output_partitioning = Partitioning::UnknownPartitioning(num_partitions); - // Execution Mode - let exec_mode = exec_mode_flatten(self.inputs.iter()); + // Determine execution mode: + let mode = exec_mode_flatten(self.inputs.iter()); - self.cache = - PlanPropertiesCache::new(eq_properties, output_partitioning, exec_mode); + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } @@ -317,6 +316,7 @@ pub struct InterleaveExec { inputs: Vec>, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -345,17 +345,16 @@ impl InterleaveExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - - // Output Partitioning + // Get output partitioning: let output_partitioning = self.inputs[0].output_partitioning().clone(); - new_cache = new_cache.with_partitioning(output_partitioning); + // Determine execution mode: + let mode = exec_mode_flatten(self.inputs.iter()); - // Execution Mode - let exec_mode = exec_mode_flatten(self.inputs.iter()); - new_cache = new_cache.with_exec_mode(exec_mode); + self.cache = self + .cache + .with_partitioning(output_partitioning) + .with_exec_mode(mode); - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 64c575831562..b9df57d84f81 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -19,6 +19,7 @@ //! type, conceptually is like joining each row with all the values in the list column. use std::{any::Any, sync::Arc}; +use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{DisplayAs, PlanPropertiesCache}; use crate::{ expressions::Column, DisplayFormatType, Distribution, ExecutionPlan, PhysicalExpr, @@ -26,15 +27,14 @@ use crate::{ }; use arrow::array::{ - Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, LargeListArray, ListArray, - PrimitiveArray, + Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, GenericListArray, + LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, }; use arrow::compute::kernels; use arrow::datatypes::{ ArrowNativeType, DataType, Int32Type, Int64Type, Schema, SchemaRef, }; use arrow::record_batch::RecordBatch; -use arrow_array::{GenericListArray, OffsetSizeTrait}; use datafusion_common::{exec_err, DataFusionError, Result, UnnestOptions}; use datafusion_execution::TaskContext; @@ -42,8 +42,6 @@ use async_trait::async_trait; use futures::{Stream, StreamExt}; use log::trace; -use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; - /// Unnest the given column by joining the row with each value in the /// nested type. /// @@ -60,6 +58,7 @@ pub struct UnnestExec { options: UnnestOptions, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -84,14 +83,13 @@ impl UnnestExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(self.input.output_partitioning().clone()); - - // Execution Mode - new_cache = new_cache.with_exec_mode(self.input.unbounded_output()); + self.cache = self + .cache + // Output Partitioning + .with_partitioning(self.input.output_partitioning().clone()) + // Execution Mode + .with_exec_mode(self.input.execution_mode()); - self.cache = new_cache; self } } diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs index 6351ad8f5b10..7fc242099379 100644 --- a/datafusion/physical-plan/src/values.rs +++ b/datafusion/physical-plan/src/values.rs @@ -41,6 +41,7 @@ pub struct ValuesExec { schema: SchemaRef, /// The data data: Vec, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -127,15 +128,11 @@ impl ValuesExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); + self.cache = self + .cache + .with_partitioning(Partitioning::UnknownPartitioning(1)) + .with_exec_mode(ExecutionMode::Bounded); - // Execution Mode - let exec_mode = ExecutionMode::Bounded; - new_cache = new_cache.with_exec_mode(exec_mode); - - self.cache = new_cache; self } } @@ -172,10 +169,8 @@ impl ExecutionPlan for ValuesExec { self: Arc, _: Vec>, ) -> Result> { - Ok(Arc::new(ValuesExec::try_new_from_batches( - self.schema.clone(), - self.data.clone(), - )?)) + ValuesExec::try_new_from_batches(self.schema.clone(), self.data.clone()) + .map(|e| Arc::new(e) as _) } fn execute( diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index d3b9ff74b644..cb512302cb6f 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -89,6 +89,7 @@ pub struct BoundedWindowAggExec { // `ordered_partition_by_indices` would be 0, 1. // See `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -183,7 +184,7 @@ impl BoundedWindowAggExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = window_equivalence_properties(&self.schema, &self.input, &self.window_expr); @@ -192,14 +193,11 @@ impl BoundedWindowAggExec { // repartitioning. let output_partitioning = self.input.output_partitioning().clone(); - // unbounded output - let unbounded_output = self.input.unbounded_output(); - // Construct properties cache self.cache = PlanPropertiesCache::new( - eq_properties, - output_partitioning, - unbounded_output, + eq_properties, // Equivalence Properties + output_partitioning, // Output Partitioning + self.input.execution_mode(), // Execution Mode ); self } @@ -1117,21 +1115,23 @@ fn get_aggregate_result_out_column( #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::common::collect; use crate::memory::MemoryExec; use crate::windows::{BoundedWindowAggExec, InputOrderMode}; use crate::{get_plan_string, ExecutionPlan}; + use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{assert_batches_eq, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::TaskContext; use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; - use datafusion_physical_expr::expressions::col; - use datafusion_physical_expr::expressions::NthValue; - use datafusion_physical_expr::window::BuiltInWindowExpr; - use datafusion_physical_expr::window::BuiltInWindowFunctionExpr; - use std::sync::Arc; + use datafusion_physical_expr::expressions::{col, NthValue}; + use datafusion_physical_expr::window::{ + BuiltInWindowExpr, BuiltInWindowFunctionExpr, + }; // Tests NTH_VALUE(negative index) with memoize feature. // To be able to trigger memoize feature for NTH_VALUE we need to diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 4143b5c9f508..14e3d190d77c 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -48,11 +48,10 @@ mod bounded_window_agg_exec; mod window_agg_exec; pub use bounded_window_agg_exec::BoundedWindowAggExec; -pub use window_agg_exec::WindowAggExec; - pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; +pub use window_agg_exec::WindowAggExec; /// Create a physical expression for window function pub fn create_window_expr( @@ -415,7 +414,7 @@ pub fn get_best_fitting_window( } else { return Ok(None); }; - let is_unbounded = input.unbounded_output().is_unbounded(); + let is_unbounded = input.execution_mode().is_unbounded(); if !is_unbounded && input_order_mode != InputOrderMode::Sorted { // Executor has bounded input and `input_order_mode` is not `InputOrderMode::Sorted` // in this case removing the sort is not helpful, return: diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index ddfce03e2026..247588c971a2 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -35,22 +35,18 @@ use crate::{ SendableRecordBatchStream, Statistics, WindowExpr, }; +use arrow::array::ArrayRef; use arrow::compute::{concat, concat_batches}; -use arrow::datatypes::SchemaBuilder; +use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef}; use arrow::error::ArrowError; -use arrow::{ - array::ArrayRef, - datatypes::{Schema, SchemaRef}, - record_batch::RecordBatch, -}; +use arrow::record_batch::RecordBatch; use datafusion_common::stats::Precision; use datafusion_common::utils::evaluate_partition_ranges; use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortRequirement; -use futures::stream::Stream; -use futures::{ready, StreamExt}; +use futures::{ready, Stream, StreamExt}; /// Window execution plan #[derive(Debug)] @@ -68,6 +64,7 @@ pub struct WindowAggExec { /// Partition by indices that defines preset for existing ordering // see `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -121,30 +118,25 @@ impl WindowAggExec { } fn with_cache(mut self) -> Self { - // Equivalence Properties + // Calculate equivalence properties: let eq_properties = window_equivalence_properties(&self.schema, &self.input, &self.window_expr); - // output partitioning - // because we can have repartitioning using the partition keys - // this would be either 1 or more than 1 depending on the presense of - // repartitioning + // Get output partitioning: + // Because we can have repartitioning using the partition keys this + // would be either 1 or more than 1 depending on the presense of repartitioning. let output_partitioning = self.input.output_partitioning().clone(); - // unbounded output - let unbounded_output = match self.input.unbounded_output() { + // Determine execution mode: + let mode = match self.input.execution_mode() { ExecutionMode::Bounded => ExecutionMode::Bounded, ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => { ExecutionMode::PipelineBreaking } }; - // Construct properties cache - self.cache = PlanPropertiesCache::new( - eq_properties, - output_partitioning, - unbounded_output, - ); + // Construct properties cache: + self.cache = PlanPropertiesCache::new(eq_properties, output_partitioning, mode); self } } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 13bc98f1878b..33c611dd30d8 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -20,21 +20,20 @@ use std::any::Any; use std::sync::{Arc, Mutex}; -use arrow::datatypes::SchemaRef; -use arrow::record_batch::RecordBatch; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::Partitioning; - +use super::{ + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + SendableRecordBatchStream, Statistics, +}; use crate::memory::MemoryStream; use crate::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanPropertiesCache, }; -use super::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, - SendableRecordBatchStream, Statistics, -}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::Partitioning; /// The name is from PostgreSQL's terminology. /// See @@ -85,6 +84,7 @@ pub struct WorkTableExec { work_table: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanPropertiesCache, } @@ -113,14 +113,11 @@ impl WorkTableExec { } fn with_cache(mut self) -> Self { - let mut new_cache = self.cache; - // Output Partitioning - new_cache = new_cache.with_partitioning(Partitioning::UnknownPartitioning(1)); - - // Execution Mode - new_cache = new_cache.with_exec_mode(ExecutionMode::Bounded); + self.cache = self + .cache + .with_partitioning(Partitioning::UnknownPartitioning(1)) + .with_exec_mode(ExecutionMode::Bounded); - self.cache = new_cache; self } }