Skip to content

Commit

Permalink
deprecated (#13076)
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksenn authored Oct 24, 2024
1 parent 8adbc23 commit f2da32b
Show file tree
Hide file tree
Showing 16 changed files with 32 additions and 299 deletions.
6 changes: 3 additions & 3 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -151,7 +151,7 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
}

// any join we count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,10 +1127,6 @@ impl DefaultPhysicalPlanner {
join
}
}
LogicalPlan::CrossJoin(_) => {
let [left, right] = children.two()?;
Arc::new(CrossJoinExec::new(left, right))
}
LogicalPlan::RecursiveQuery(RecursiveQuery {
name, is_distinct, ..
}) => {
Expand Down
5 changes: 0 additions & 5 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Filter": format!("{}", filter_expr)
})
}
LogicalPlan::CrossJoin(_) => {
json!({
"Node Type": "Cross Join"
})
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable,
Distinct, DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
67 changes: 2 additions & 65 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,6 @@ pub enum LogicalPlan {
/// Join two logical plans on one or more join columns.
/// This is used to implement SQL `JOIN`
Join(Join),
/// Apply Cross Join to two logical plans.
/// This is used to implement SQL `CROSS JOIN`
/// Deprecated: use [LogicalPlan::Join] instead with empty `on` / no filter
CrossJoin(CrossJoin),
/// Repartitions the input based on a partitioning scheme. This is
/// used to add parallelism and is sometimes referred to as an
/// "exchange" operator in other systems
Expand Down Expand Up @@ -312,7 +308,6 @@ impl LogicalPlan {
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
LogicalPlan::Join(Join { schema, .. }) => schema,
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
LogicalPlan::Statement(statement) => statement.schema(),
Expand Down Expand Up @@ -345,8 +340,7 @@ impl LogicalPlan {
| LogicalPlan::Projection(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_) => self
| LogicalPlan::Join(_) => self
.inputs()
.iter()
.map(|input| input.schema().as_ref())
Expand Down Expand Up @@ -436,7 +430,6 @@ impl LogicalPlan {
LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
LogicalPlan::Sort(Sort { input, .. }) => vec![input],
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
Expand Down Expand Up @@ -542,13 +535,6 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
},
LogicalPlan::CrossJoin(cross) => {
if cross.left.schema().fields().is_empty() {
cross.right.head_output_expr()
} else {
cross.left.head_output_expr()
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
static_term.head_output_expr()
}
Expand Down Expand Up @@ -674,20 +660,6 @@ impl LogicalPlan {
null_equals_null,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: _,
}) => {
let join_schema =
build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;

Ok(LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: join_schema.into(),
}))
}
LogicalPlan::Subquery(_) => Ok(self),
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
Expand Down Expand Up @@ -938,11 +910,6 @@ impl LogicalPlan {
null_equals_null: *null_equals_null,
}))
}
LogicalPlan::CrossJoin(_) => {
self.assert_no_expressions(expr)?;
let (left, right) = self.only_two_inputs(inputs)?;
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
}) => {
Expand Down Expand Up @@ -1327,12 +1294,6 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti => left.max_rows(),
JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
},
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
match (left.max_rows(), right.max_rows()) {
(Some(left_max), Some(right_max)) => Some(left_max * right_max),
_ => None,
}
}
LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
LogicalPlan::Union(Union { inputs, .. }) => inputs
.iter()
Expand Down Expand Up @@ -1893,9 +1854,6 @@ impl LogicalPlan {
}
}
}
LogicalPlan::CrossJoin(_) => {
write!(f, "CrossJoin:")
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
Expand Down Expand Up @@ -2601,28 +2559,7 @@ impl TableScan {
}
}

/// Apply Cross Join to two logical plans
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CrossJoin {
/// Left input
pub left: Arc<LogicalPlan>,
/// Right input
pub right: Arc<LogicalPlan>,
/// The output schema, containing fields from the left and right inputs
pub schema: DFSchemaRef,
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
impl PartialOrd for CrossJoin {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.left.partial_cmp(&other.left) {
Some(Ordering::Equal) => self.right.partial_cmp(&other.right),
cmp => cmp,
}
}
}

/// Repartition the plan based on a partitioning scheme.
// Repartition the plan based on a partitioning scheme.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Repartition {
/// The incoming logical plan
Expand Down
28 changes: 5 additions & 23 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin,
DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter,
Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest,
UserDefinedLogicalNode, Values, Window,
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit,
LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values,
Window,
};
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -160,22 +160,6 @@ impl TreeNode for LogicalPlan {
null_equals_null,
})
}),
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
}) => map_until_stop_and_collect!(
rewrite_arc(left, &mut f),
right,
rewrite_arc(right, &mut f)
)?
.update_data(|(left, right)| {
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
})
}),
LogicalPlan::Limit(Limit { skip, fetch, input }) => rewrite_arc(input, f)?
.update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
LogicalPlan::Subquery(Subquery {
Expand Down Expand Up @@ -527,7 +511,6 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Statement(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
Expand Down Expand Up @@ -758,7 +741,6 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Statement(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ fn check_inner_plan(
LogicalPlan::Projection(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::EmptyRelation(_)
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ impl OptimizerRule for CommonSubexprEliminate {
LogicalPlan::Window(window) => self.try_optimize_window(window, config)?,
LogicalPlan::Aggregate(agg) => self.try_optimize_aggregate(agg, config)?,
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
Expand Down
35 changes: 8 additions & 27 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl OptimizerRule for EliminateCrossJoin {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
}) | LogicalPlan::CrossJoin(_)
})
);

if !rewriteable {
Expand Down Expand Up @@ -241,20 +241,6 @@ fn flatten_join_inputs(
all_filters,
)?;
}
LogicalPlan::CrossJoin(join) => {
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
all_filters,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
all_filters,
)?;
}
_ => {
all_inputs.push(plan);
}
Expand All @@ -270,23 +256,18 @@ fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool {
// can only flatten inner / cross joins
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {}
LogicalPlan::CrossJoin(_) => {}
_ => return false,
};

for child in plan.inputs() {
match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
| LogicalPlan::CrossJoin(_) => {
if !can_flatten_join_inputs(child) {
return false;
}
if let LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
}) = child
{
if !can_flatten_join_inputs(child) {
return false;
}
// the child is not a join/cross join
_ => (),
}
}
true
Expand Down
11 changes: 0 additions & 11 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,6 @@ fn optimize_projections(
right_indices.with_projection_beneficial(),
]
}
LogicalPlan::CrossJoin(cross_join) => {
let left_len = cross_join.left.schema().fields().len();
let (left_indices, right_indices) =
split_join_requirements(left_len, indices, &JoinType::Inner);
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
vec![
left_indices.with_projection_beneficial(),
right_indices.with_projection_beneficial(),
]
}
// these nodes are explicitly rewritten in the match statement above
LogicalPlan::Projection(_)
| LogicalPlan::Aggregate(_)
Expand Down
13 changes: 0 additions & 13 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,6 @@ impl OptimizerRule for PropagateEmptyRelation {
}
Ok(Transformed::no(plan))
}
LogicalPlan::CrossJoin(ref join) => {
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: Arc::clone(plan.schema()),
},
)));
}
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
}

LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
Expand Down
Loading

0 comments on commit f2da32b

Please sign in to comment.