Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop copying LogicalPlan and Exprs in TypeCoercion (10% faster planning) #10356

Merged
merged 6 commits into from
May 15, 2024
Merged
125 changes: 88 additions & 37 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use arrow::datatypes::{DataType, IntervalUnit};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_common::{
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema,
DataFusionError, Result, ScalarValue,
Expand All @@ -31,8 +31,8 @@ use datafusion_expr::expr::{
self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList,
InSubquery, Like, ScalarFunction, WindowFunction,
};
use datafusion_expr::expr_rewriter::rewrite_preserving_name;
use datafusion_expr::expr_schema::cast_subquery;
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::logical_plan::Subquery;
use datafusion_expr::type_coercion::binary::{
comparison_coercion, get_input_types, like_coercion,
Expand All @@ -52,6 +52,7 @@ use datafusion_expr::{
};

use crate::analyzer::AnalyzerRule;
use crate::utils::NamePreserver;

#[derive(Default)]
pub struct TypeCoercion {}
Expand All @@ -68,26 +69,28 @@ impl AnalyzerRule for TypeCoercion {
}

fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
analyze_internal(&DFSchema::empty(), &plan)
let empty_schema = DFSchema::empty();

let transformed_plan = plan
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))?
.data;

Ok(transformed_plan)
}
}

/// use the external schema to handle the correlated subqueries case
///
/// Assumes that children have already been optimized
fn analyze_internal(
// use the external schema to handle the correlated subqueries case
external_schema: &DFSchema,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
// optimize child plans first
let new_inputs = plan
.inputs()
.iter()
.map(|p| analyze_internal(external_schema, p))
.collect::<Result<Vec<_>>>()?;
plan: LogicalPlan,
) -> Result<Transformed<LogicalPlan>> {
// get schema representing all available input fields. This is used for data type
// resolution only, so order does not matter here
let mut schema = merge_schema(new_inputs.iter().collect());
let mut schema = merge_schema(plan.inputs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 q


if let LogicalPlan::TableScan(ts) = plan {
if let LogicalPlan::TableScan(ts) = &plan {
let source_schema = DFSchema::try_from_qualified_schema(
ts.table_name.clone(),
&ts.source.schema(),
Expand All @@ -100,25 +103,75 @@ fn analyze_internal(
// select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3)
schema.merge(external_schema);

let mut expr_rewrite = TypeCoercionRewriter { schema: &schema };

let new_expr = plan
.expressions()
.into_iter()
.map(|expr| {
// ensure aggregate names don't change:
// https://github.com/apache/datafusion/issues/3555
rewrite_preserving_name(expr, &mut expr_rewrite)
})
.collect::<Result<Vec<_>>>()?;

plan.with_new_exprs(new_expr, new_inputs)
let mut expr_rewrite = TypeCoercionRewriter::new(&schema);

let name_preserver = NamePreserver::new(&plan);
// apply coercion rewrite all expressions in the plan individually
plan.map_expressions(|expr| {
let original_name = name_preserver.save(&expr)?;
expr.rewrite(&mut expr_rewrite)?
.map_data(|expr| original_name.restore(expr))
})?
// coerce join expressions specially
.map_data(|plan| expr_rewrite.coerce_joins(plan))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since expr_rewrite.coerce_joins(plan) can change the plan, shouldn't its result be Result<Transformed<LogicalPlan>>? And then here we should probably use map_transformed() instead of the current map_data().

Copy link
Contributor Author

@alamb alamb May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for anyone following along, the response is https://github.com/apache/datafusion/pull/10356/files#r1588998665 (tldr should do as a follow on PR)

// recompute the schema after the expressions have been rewritten as the types may have changed
.map_data(|plan| plan.recompute_schema())
Copy link
Contributor

@peter-toth peter-toth May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always need to run plan.recompute_schema()? If the Transformed<LogicalPlan>'s .transformed is false then probably we don't need to.

Copy link
Contributor Author

@alamb alamb May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point. At the moment, I think we do need to always run recompute_schema because the TypeCoercionRewriter doesn't return Transformed (and thus we don't know if any actual expression coercion was done, so we have to assume it was).

I filed #10365 to track improving this

Copy link
Contributor

@peter-toth peter-toth May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you use TypeCoercionRewriter in expr.rewrite(&mut expr_rewrite)? and that rewrite() returns Transformed<Expr> and then that Transformed<Expr> is propagated up into plan.map_expressions(), that returns Transformed<LogicalPlan>. So you have the necessary Transformed to decide if recompute_schema() is needed. Or not? 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct (of course!) thank you for pointing it out. Now that analyze_internal returns Transformed would work. However, there is still code like this:

                let new_plan =
                    analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
                Ok(Transformed::yes(Expr::Exists(Exists {
                    subquery: Subquery {
                        subquery: Arc::new(new_plan),
                        outer_ref_columns: subquery.outer_ref_columns,
                    },
                    negated,
                })))
            }

Which discards the transformed information (and in this case always returns Transformed::true).

In order to keep the PRs small and easier to review I would like to not change this PR (it is no worse than main in regards to recomputing schema) and I will make a follow on PR to avoid recomputing schema when unecessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, it seems there are many unnecessary Transformed::yess in the current code. But false positive transformeds doesn't cause any issue...

Sure, a follow-up PR sounds good, I agree that this PR already looks really nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my draft followup: #10369

It is quite large (it requires updating the entire expression rewriter) so I am glad we left it in a separate PR

}

pub(crate) struct TypeCoercionRewriter<'a> {
pub(crate) schema: &'a DFSchema,
}

impl<'a> TypeCoercionRewriter<'a> {
fn new(schema: &'a DFSchema) -> Self {
Self { schema }
}

/// Coerce join equality expressions
///
/// Joins must be treated specially as their equality expressions are stored
/// as a parallel list of left and right expressions, rather than a single
/// equality expression
///
/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)`
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
let LogicalPlan::Join(mut join) = plan else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats an interesting syntax

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it checks the plan can be deconstructed into LogicalPlan::Join(...) and if its not the else branch is triggered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting

https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html

return Ok(plan);
};

join.on = join
.on
.into_iter()
.map(|(lhs, rhs)| {
// coerce the arguments as though they were a single binary equality
// expression
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this method needed, as it looks like we just cast lhs, rhs? it feels it can be simplified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think coerce_binary_op is different than just casting lhs and rhs as it first calls get_input_types:

        let (left_type, right_type) = get_input_types(
            &left.get_type(self.schema)?,
            &op,
            &right.get_type(self.schema)?,
        )?;

And get_input_types usese the comparison coercion rules to figure out a common set if types to coerce lhs and rhs to.

Ok((lhs, rhs))
})
.collect::<Result<Vec<_>>>()?;

Ok(LogicalPlan::Join(join))
}

fn coerce_binary_op(
&self,
left: Expr,
op: Operator,
right: Expr,
) -> Result<(Expr, Expr)> {
let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&op,
&right.get_type(self.schema)?,
)?;
Ok((
left.cast_to(&left_type, self.schema)?,
right.cast_to(&right_type, self.schema)?,
))
}
}

impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
type Node = Expr;

Expand All @@ -131,14 +184,15 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
outer_ref_columns,
}) => {
let new_plan = analyze_internal(self.schema, &subquery)?;
let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data;
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns,
})))
}
Expr::Exists(Exists { subquery, negated }) => {
let new_plan = analyze_internal(self.schema, &subquery.subquery)?;
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(new_plan),
Expand All @@ -152,7 +206,8 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
subquery,
negated,
}) => {
let new_plan = analyze_internal(self.schema, &subquery.subquery)?;
let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
let expr_type = expr.get_type(self.schema)?;
let subquery_type = new_plan.schema().field(0).data_type();
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!(
Expand Down Expand Up @@ -221,15 +276,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> {
))))
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&op,
&right.get_type(self.schema)?,
)?;
let (left, right) = self.coerce_binary_op(*left, op, *right)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Box::new(left.cast_to(&left_type, self.schema)?),
Box::new(left),
op,
Box::new(right.cast_to(&right_type, self.schema)?),
Box::new(right),
))))
}
Expr::Between(Between {
Expand Down