Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve ordering equivalencies on with_reorder #13770

Merged
12 changes: 12 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,18 @@ impl EquivalenceGroup {
JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(),
}
}

/// Checks if two expressions are equal either directly or through equivalence classes.
pub fn exprs_equal(
gokselk marked this conversation as resolved.
Show resolved Hide resolved
&self,
left: &Arc<dyn PhysicalExpr>,
right: &Arc<dyn PhysicalExpr>,
) -> bool {
left.eq(right)
|| self
.iter()
.any(|cls| cls.contains(left) && cls.contains(right))
}
}

impl Display for EquivalenceGroup {
Expand Down
287 changes: 284 additions & 3 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt;
use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::iter::Peekable;
use std::slice::Iter;
use std::sync::Arc;
use std::{fmt, mem};

use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::const_exprs_contains;
Expand Down Expand Up @@ -391,12 +391,51 @@ impl EquivalenceProperties {
/// Updates the ordering equivalence group within assuming that the table
/// is re-sorted according to the argument `sort_exprs`. Note that constants
/// and equivalence classes are unchanged as they are unaffected by a re-sort.
/// If the given ordering is already satisfied, the function does nothing.
pub fn with_reorder(mut self, sort_exprs: LexOrdering) -> Self {
// TODO: In some cases, existing ordering equivalences may still be valid add this analysis.
self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]);
// Filter out constant expressions as they don't affect ordering
let filtered_exprs = LexOrdering::new(
sort_exprs
.into_iter()
.filter(|expr| !self.is_expr_constant(&expr.expr))
.collect(),
);

if filtered_exprs.is_empty() {
return self;
}

let mut new_orderings = vec![filtered_exprs.clone()];

// Preserve valid suffixes from existing orderings
let orderings = mem::take(&mut self.oeq_class.orderings);
for existing in orderings {
if self.is_prefix_of(&filtered_exprs, &existing) {
let mut extended = filtered_exprs.clone();
extended.extend(existing.into_iter().skip(filtered_exprs.len()));
new_orderings.push(extended);
}
}

self.oeq_class = OrderingEquivalenceClass::new(new_orderings);
self
}

/// Checks if the new ordering matches a prefix of the existing ordering
/// (considering expression equivalences)
fn is_prefix_of(&self, new_order: &LexOrdering, existing: &LexOrdering) -> bool {
// Check if new order is longer than existing - can't be a prefix
if new_order.len() > existing.len() {
return false;
}

// Check if new order matches existing prefix (considering equivalences)
new_order.iter().zip(existing).all(|(new, existing)| {
self.eq_group.exprs_equal(&new.expr, &existing.expr)
&& new.options == existing.options
})
}

/// Normalizes the given sort expressions (i.e. `sort_exprs`) using the
/// equivalence group and the ordering equivalence class within.
///
Expand Down Expand Up @@ -3651,4 +3690,246 @@ mod tests {

sort_expr
}

#[test]
fn test_with_reorder_constant_filtering() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));

// Setup constant columns
let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_a)]);

let sort_exprs = LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: SortOptions::default(),
},
]);

let result = eq_properties.with_reorder(sort_exprs);

// Should only contain b since a is constant
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 1);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));

Ok(())
}

#[test]
fn test_with_reorder_preserve_suffix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;

let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};

// Initial ordering: [a ASC, b DESC, c ASC]
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
])]);

// New ordering: [a ASC]
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
}]);

let result = eq_properties.with_reorder(new_order);

// Should only contain [a ASC, b DESC, c ASC]
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0].len(), 3);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_a));
assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][1].options.eq(&desc));
assert!(result.oeq_class().orderings[0][2].expr.eq(&col_c));
assert!(result.oeq_class().orderings[0][2].options.eq(&asc));

Ok(())
}

#[test]
fn test_with_reorder_equivalent_expressions() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;

// Make a and b equivalent
eq_properties.add_equal_conditions(&col_a, &col_b)?;

let asc = SortOptions::default();

// Initial ordering: [a ASC, c ASC]
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
])]);

// New ordering: [b ASC]
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: asc,
}]);

let result = eq_properties.with_reorder(new_order);

// Should only contain [b ASC, c ASC]
assert_eq!(result.oeq_class().len(), 1);

// Verify orderings
assert_eq!(result.oeq_class().orderings[0].len(), 2);
assert!(result.oeq_class().orderings[0][0].expr.eq(&col_b));
assert!(result.oeq_class().orderings[0][0].options.eq(&asc));
assert!(result.oeq_class().orderings[0][1].expr.eq(&col_c));
assert!(result.oeq_class().orderings[0][1].options.eq(&asc));

Ok(())
}

#[test]
fn test_with_reorder_incompatible_prefix() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;

let asc = SortOptions::default();
let desc = SortOptions {
descending: true,
nulls_first: true,
};

// Initial ordering: [a ASC, b DESC]
eq_properties.add_new_orderings([LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: desc,
},
])]);

// New ordering: [a DESC]
let new_order = LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: desc,
}]);

let result = eq_properties.with_reorder(new_order.clone());

// Should only contain the new ordering since options don't match
assert_eq!(result.oeq_class().len(), 1);
assert_eq!(result.oeq_class().orderings[0], new_order);

Ok(())
}

#[test]
fn test_with_reorder_comprehensive() -> Result<()> {
let schema = create_test_schema()?;
let mut eq_properties = EquivalenceProperties::new(Arc::clone(&schema));

let col_a = col("a", &schema)?;
let col_b = col("b", &schema)?;
let col_c = col("c", &schema)?;
let col_d = col("d", &schema)?;
let col_e = col("e", &schema)?;

let asc = SortOptions::default();

// Constants: c is constant
eq_properties = eq_properties.with_constants([ConstExpr::from(&col_c)]);

// Equality: b = d
eq_properties.add_equal_conditions(&col_b, &col_d)?;

// Orderings: [d ASC, a ASC], [e ASC]
eq_properties.add_new_orderings([
LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_d),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_a),
options: asc,
},
]),
LexOrdering::new(vec![PhysicalSortExpr {
expr: Arc::clone(&col_e),
options: asc,
}]),
]);

// Initial ordering: [b ASC, c ASC]
let new_order = LexOrdering::new(vec![
PhysicalSortExpr {
expr: Arc::clone(&col_b),
options: asc,
},
PhysicalSortExpr {
expr: Arc::clone(&col_c),
options: asc,
},
]);

let result = eq_properties.with_reorder(new_order);

// Should preserve the original [d ASC, a ASC] ordering
assert_eq!(result.oeq_class().len(), 1);
let ordering = &result.oeq_class().orderings[0];
assert_eq!(ordering.len(), 2);

// First expression should be either b or d (they're equivalent)
assert!(
ordering[0].expr.eq(&col_b) || ordering[0].expr.eq(&col_d),
"Expected b or d as first expression, got {:?}",
ordering[0].expr
);
assert!(ordering[0].options.eq(&asc));

// Second expression should be a
assert!(ordering[1].expr.eq(&col_a));
assert!(ordering[1].options.eq(&asc));

Ok(())
}
}
Loading