diff --git a/datafusion/expr-common/src/sort_properties.rs b/datafusion/expr-common/src/sort_properties.rs index 7778be2ecf0d..5d17a34a96fb 100644 --- a/datafusion/expr-common/src/sort_properties.rs +++ b/datafusion/expr-common/src/sort_properties.rs @@ -129,19 +129,30 @@ impl Neg for SortProperties { } } -/// Represents the properties of a `PhysicalExpr`, including its sorting and range attributes. +/// Represents the properties of a `PhysicalExpr`, including its sorting, +/// range, and whether it preserves lexicographical ordering. #[derive(Debug, Clone)] pub struct ExprProperties { + /// Properties that describe the sorting behavior of the expression, + /// such as whether it is ordered, unordered, or a singleton value. pub sort_properties: SortProperties, + /// A closed interval representing the range of possible values for + /// the expression. Used to compute reliable bounds. pub range: Interval, + /// Indicates whether the expression preserves lexicographical ordering + /// of its inputs. For example, string concatenation preserves ordering, + /// while addition does not. + pub preserves_lex_ordering: bool, } impl ExprProperties { - /// Creates a new `ExprProperties` instance with unknown sort properties and unknown range. + /// Creates a new `ExprProperties` instance with unknown sort properties, + /// unknown range, and unknown lexicographical ordering preservation. pub fn new_unknown() -> Self { Self { sort_properties: SortProperties::default(), range: Interval::make_unbounded(&DataType::Null).unwrap(), + preserves_lex_ordering: false, } } @@ -156,4 +167,10 @@ impl ExprProperties { self.range = range; self } + + /// Sets whether the expression maintains lexicographical ordering and returns the modified instance. + pub fn with_preserves_lex_ordering(mut self, preserves_lex_ordering: bool) -> Self { + self.preserves_lex_ordering = preserves_lex_ordering; + self + } } diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 809c78f30eff..83200edfa24c 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -303,6 +303,10 @@ impl ScalarUDF { self.inner.output_ordering(inputs) } + pub fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result { + self.inner.preserves_lex_ordering(inputs) + } + /// See [`ScalarUDFImpl::coerce_types`] for more details. pub fn coerce_types(&self, arg_types: &[DataType]) -> Result> { self.inner.coerce_types(arg_types) @@ -650,10 +654,30 @@ pub trait ScalarUDFImpl: Debug + Send + Sync { Ok(Some(vec![])) } - /// Calculates the [`SortProperties`] of this function based on its - /// children's properties. - fn output_ordering(&self, _inputs: &[ExprProperties]) -> Result { - Ok(SortProperties::Unordered) + /// Calculates the [`SortProperties`] of this function based on its children's properties. + fn output_ordering(&self, inputs: &[ExprProperties]) -> Result { + if !self.preserves_lex_ordering(inputs)? { + return Ok(SortProperties::Unordered); + } + + let Some(first_order) = inputs.first().map(|p| &p.sort_properties) else { + return Ok(SortProperties::Singleton); + }; + + if inputs + .iter() + .skip(1) + .all(|input| &input.sort_properties == first_order) + { + Ok(*first_order) + } else { + Ok(SortProperties::Unordered) + } + } + + /// Whether the function preserves lexicographical ordering based on the input ordering + fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) -> Result { + Ok(false) } /// Coerce arguments of a function call to types that the function can evaluate. @@ -809,6 +833,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl { self.inner.output_ordering(inputs) } + fn preserves_lex_ordering(&self, inputs: &[ExprProperties]) -> Result { + self.inner.preserves_lex_ordering(inputs) + } + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { self.inner.coerce_types(arg_types) } diff --git a/datafusion/functions/src/string/concat.rs b/datafusion/functions/src/string/concat.rs index 895a7cdbf308..f4848c131504 100644 --- a/datafusion/functions/src/string/concat.rs +++ b/datafusion/functions/src/string/concat.rs @@ -17,6 +17,7 @@ use arrow::array::{as_largestring_array, Array}; use arrow::datatypes::DataType; +use datafusion_expr::sort_properties::ExprProperties; use std::any::Any; use std::sync::{Arc, OnceLock}; @@ -265,6 +266,10 @@ impl ScalarUDFImpl for ConcatFunc { fn documentation(&self) -> Option<&Documentation> { Some(get_concat_doc()) } + + fn preserves_lex_ordering(&self, _inputs: &[ExprProperties]) -> Result { + Ok(true) + } } static DOCUMENTATION: OnceLock = OnceLock::new(); diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index db3e0e10d816..d1de63a1e8fc 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -57,6 +57,7 @@ petgraph = "0.6.2" [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } criterion = "0.5" +datafusion-functions = { workspace = true } rand = { workspace = true } rstest = { workspace = true } diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs old mode 100644 new mode 100755 index e37e6b8abdb3..d4814fb4d780 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -346,40 +346,61 @@ impl EquivalenceProperties { .unwrap_or_else(|| vec![Arc::clone(&normalized_expr)]); let mut new_orderings: Vec = vec![]; - for (ordering, next_expr) in self - .normalized_oeq_class() - .iter() - .filter(|ordering| ordering[0].expr.eq(&normalized_expr)) - // First expression after leading ordering - .filter_map(|ordering| Some(ordering).zip(ordering.inner.get(1))) - { - let leading_ordering = ordering[0].options; - // Currently, we only handle expressions with a single child. - // TODO: It should be possible to handle expressions orderings like - // f(a, b, c), a, b, c if f is monotonic in all arguments. + for ordering in self.normalized_oeq_class().iter() { + if !ordering[0].expr.eq(&normalized_expr) { + continue; + } + + let leading_ordering_options = ordering[0].options; + for equivalent_expr in &eq_class { let children = equivalent_expr.children(); - if children.len() == 1 - && children[0].eq(&next_expr.expr) - && SortProperties::Ordered(leading_ordering) - == equivalent_expr - .get_properties(&[ExprProperties { - sort_properties: SortProperties::Ordered( - leading_ordering, - ), - range: Interval::make_unbounded( - &equivalent_expr.data_type(&self.schema)?, - )?, - }])? - .sort_properties - { - // Assume existing ordering is [a ASC, b ASC] - // When equality a = f(b) is given, If we know that given ordering `[b ASC]`, ordering `[f(b) ASC]` is valid, - // then we can deduce that ordering `[b ASC]` is also valid. - // Hence, ordering `[b ASC]` can be added to the state as valid ordering. - // (e.g. existing ordering where leading ordering is removed) - new_orderings.push(LexOrdering::new(ordering[1..].to_vec())); - break; + if children.is_empty() { + continue; + } + + // Check if all children match the next expressions in the ordering + let mut all_children_match = true; + let mut child_properties = vec![]; + + // Build properties for each child based on the next expressions + for (i, child) in children.iter().enumerate() { + if let Some(next) = ordering.get(i + 1) { + if !child.as_ref().eq(next.expr.as_ref()) { + all_children_match = false; + break; + } + child_properties.push(ExprProperties { + sort_properties: SortProperties::Ordered(next.options), + range: Interval::make_unbounded( + &child.data_type(&self.schema)?, + )?, + preserves_lex_ordering: true, + }); + } else { + all_children_match = false; + break; + } + } + + if all_children_match { + // Check if the expression is monotonic in all arguments + if let Ok(expr_properties) = + equivalent_expr.get_properties(&child_properties) + { + if expr_properties.preserves_lex_ordering + && SortProperties::Ordered(leading_ordering_options) + == expr_properties.sort_properties + { + // Assume existing ordering is [c ASC, a ASC, b ASC] + // When equality c = f(a,b) is given, if we know that given ordering `[a ASC, b ASC]`, + // ordering `[f(a,b) ASC]` is valid, then we can deduce that ordering `[a ASC, b ASC]` is also valid. + // Hence, ordering `[a ASC, b ASC]` can be added to the state as a valid ordering. + // (e.g. existing ordering where leading ordering is removed) + new_orderings.push(LexOrdering::new(ordering[1..].to_vec())); + break; + } + } } } } @@ -1428,16 +1449,19 @@ fn get_expr_properties( Ok(ExprProperties { sort_properties: SortProperties::Ordered(column_order.options), range: Interval::make_unbounded(&expr.data_type(schema)?)?, + preserves_lex_ordering: false, }) } else if expr.as_any().downcast_ref::().is_some() { Ok(ExprProperties { sort_properties: SortProperties::Unordered, range: Interval::make_unbounded(&expr.data_type(schema)?)?, + preserves_lex_ordering: false, }) } else if let Some(literal) = expr.as_any().downcast_ref::() { Ok(ExprProperties { sort_properties: SortProperties::Singleton, range: Interval::try_new(literal.value().clone(), literal.value().clone())?, + preserves_lex_ordering: true, }) } else { // Find orderings of its children @@ -2143,11 +2167,14 @@ mod tests { create_test_params, create_test_schema, output_schema, }; use crate::expressions::{col, BinaryExpr, Column}; + use crate::ScalarFunctionExpr; use arrow::datatypes::{DataType, Field, Schema}; use arrow_schema::{Fields, TimeUnit}; use datafusion_expr::Operator; + use datafusion_functions::string::concat; + #[test] fn project_equivalence_properties_test() -> Result<()> { let input_schema = Arc::new(Schema::new(vec![ @@ -3684,4 +3711,145 @@ mod tests { sort_expr } + + #[test] + fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Utf8, false), + ])); + + let col_a = col("a", &schema)?; + let col_b = col("b", &schema)?; + let col_c = col("c", &schema)?; + + let a_concat_b: Arc = Arc::new(ScalarFunctionExpr::new( + "concat", + concat(), + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + DataType::Utf8, + )); + + // Assume existing ordering is [c ASC, a ASC, b ASC] + let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + eq_properties.add_new_ordering(LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(), + ])); + + // Add equality condition c = concat(a, b) + eq_properties.add_equal_conditions(&col_c, &a_concat_b)?; + + let orderings = eq_properties.oeq_class().orderings.clone(); + + let expected_ordering1 = + LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc() + ]); + let expected_ordering2 = LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(), + ]); + + // The ordering should be [c ASC] and [a ASC, b ASC] + assert_eq!(orderings.len(), 2); + assert!(orderings.contains(&expected_ordering1)); + assert!(orderings.contains(&expected_ordering2)); + + Ok(()) + } + + #[test] + fn test_ordering_equivalence_with_non_lex_monotonic_multiply() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let col_a = col("a", &schema)?; + let col_b = col("b", &schema)?; + let col_c = col("c", &schema)?; + + let a_times_b: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Multiply, + Arc::clone(&col_b), + )); + + // Assume existing ordering is [c ASC, a ASC, b ASC] + let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + let initial_ordering = LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&col_c)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(), + ]); + + eq_properties.add_new_ordering(initial_ordering.clone()); + + // Add equality condition c = a * b + eq_properties.add_equal_conditions(&col_c, &a_times_b)?; + + let orderings = eq_properties.oeq_class().orderings.clone(); + + // The ordering should remain unchanged since multiplication is not lex-monotonic + assert_eq!(orderings.len(), 1); + assert!(orderings.contains(&initial_ordering)); + + Ok(()) + } + + #[test] + fn test_ordering_equivalence_with_concat_equality() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Utf8, false), + ])); + + let col_a = col("a", &schema)?; + let col_b = col("b", &schema)?; + let col_c = col("c", &schema)?; + + let a_concat_b: Arc = Arc::new(ScalarFunctionExpr::new( + "concat", + concat(), + vec![Arc::clone(&col_a), Arc::clone(&col_b)], + DataType::Utf8, + )); + + // Assume existing ordering is [concat(a, b) ASC, a ASC, b ASC] + let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema)); + + eq_properties.add_new_ordering(LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&a_concat_b)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(), + ])); + + // Add equality condition c = concat(a, b) + eq_properties.add_equal_conditions(&col_c, &a_concat_b)?; + + let orderings = eq_properties.oeq_class().orderings.clone(); + + let expected_ordering1 = LexOrdering::from(vec![PhysicalSortExpr::new_default( + Arc::clone(&a_concat_b), + ) + .asc()]); + let expected_ordering2 = LexOrdering::from(vec![ + PhysicalSortExpr::new_default(Arc::clone(&col_a)).asc(), + PhysicalSortExpr::new_default(Arc::clone(&col_b)).asc(), + ]); + + // The ordering should be [concat(a, b) ASC] and [a ASC, b ASC] + assert_eq!(orderings.len(), 2); + assert!(orderings.contains(&expected_ordering1)); + assert!(orderings.contains(&expected_ordering2)); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index ae2bfe5b0bd4..938d775a2ad1 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -503,34 +503,42 @@ impl PhysicalExpr for BinaryExpr { Operator::Plus => Ok(ExprProperties { sort_properties: l_order.add(&r_order), range: l_range.add(r_range)?, + preserves_lex_ordering: false, }), Operator::Minus => Ok(ExprProperties { sort_properties: l_order.sub(&r_order), range: l_range.sub(r_range)?, + preserves_lex_ordering: false, }), Operator::Gt => Ok(ExprProperties { sort_properties: l_order.gt_or_gteq(&r_order), range: l_range.gt(r_range)?, + preserves_lex_ordering: false, }), Operator::GtEq => Ok(ExprProperties { sort_properties: l_order.gt_or_gteq(&r_order), range: l_range.gt_eq(r_range)?, + preserves_lex_ordering: false, }), Operator::Lt => Ok(ExprProperties { sort_properties: r_order.gt_or_gteq(&l_order), range: l_range.lt(r_range)?, + preserves_lex_ordering: false, }), Operator::LtEq => Ok(ExprProperties { sort_properties: r_order.gt_or_gteq(&l_order), range: l_range.lt_eq(r_range)?, + preserves_lex_ordering: false, }), Operator::And => Ok(ExprProperties { sort_properties: r_order.and_or(&l_order), range: l_range.and(r_range)?, + preserves_lex_ordering: false, }), Operator::Or => Ok(ExprProperties { sort_properties: r_order.and_or(&l_order), range: l_range.or(r_range)?, + preserves_lex_ordering: false, }), _ => Ok(ExprProperties::new_unknown()), } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index f0d02eb605b2..c594f039ff2f 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -90,6 +90,7 @@ impl PhysicalExpr for Literal { Ok(ExprProperties { sort_properties: SortProperties::Singleton, range: Interval::try_new(self.value().clone(), self.value().clone())?, + preserves_lex_ordering: true, }) } } diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 6235845fc028..03f2111aca33 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -145,6 +145,7 @@ impl PhysicalExpr for NegativeExpr { Ok(ExprProperties { sort_properties: -children[0].sort_properties, range: children[0].range.clone().arithmetic_negate()?, + preserves_lex_ordering: false, }) } } diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index e312d5de59fb..82c718cfaca3 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -202,6 +202,7 @@ impl PhysicalExpr for ScalarFunctionExpr { fn get_properties(&self, children: &[ExprProperties]) -> Result { let sort_properties = self.fun.output_ordering(children)?; + let preserves_lex_ordering = self.fun.preserves_lex_ordering(children)?; let children_range = children .iter() .map(|props| &props.range) @@ -211,6 +212,7 @@ impl PhysicalExpr for ScalarFunctionExpr { Ok(ExprProperties { sort_properties, range, + preserves_lex_ordering, }) } }