Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add PullUpCorrelatedExpr::new and improve documentation #10500

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,63 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
/// 'Filter'. It adds the inner reference columns to the 'Projection' or
/// 'Aggregate' of the subquery if they are missing, so that they can be
/// evaluated by the parent operator as the join condition.
#[derive(Debug)]
pub struct PullUpCorrelatedExpr {
pub join_filters: Vec<Expr>,
// mapping from the plan to its holding correlated columns
/// mapping from the plan to its holding correlated columns
pub correlated_subquery_cols_map: HashMap<LogicalPlan, BTreeSet<Column>>,
pub in_predicate_opt: Option<Expr>,
// indicate whether it is Exists(Not Exists) SubQuery
/// Is this an Exists(Not Exists) SubQuery. Defaults to false
alamb marked this conversation as resolved.
Show resolved Hide resolved
pub exists_sub_query: bool,
// indicate whether the correlated expressions can pull up or not
/// Can the correlated expressions be pulled up. Defaults to **TRUE**
pub can_pull_up: bool,
// indicate whether need to handle the Count bug during the pull up process
/// Do we need to handle the Count bug during the pull up process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what/who is the "count bug"? Is this a hotfix for an actual issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick look and couldn't figure it out

It looks like it was introduced in #6457. @mingmwang or @jackwener do you remember what the "count bug is" and if it has a ticket?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the param, if we dont use it, the correlated subquery fails

Running "subquery.slt"
External error: query result mismatch:
[SQL] SELECT t1_id, (SELECT count(*) FROM t2 WHERE t2.t2_int = t1.t1_int) from t1
[Diff] (-expected|+actual)
    11 1
-   22 0
+   22 NULL
    33 3
-   44 0
+   44 NULL
at test_files/subquery.slt:763

Looks like its related if there is no match in correlated subq the count should be 0 instead of NULL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be introduced by @mingmwang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #10500 to track this issue (thanks @comphead and everyone for calling it out and the context) and added a reference to the ticket in the code

pub need_handle_count_bug: bool,
// mapping from the plan to its expressions' evaluation result on empty batch
/// mapping from the plan to its expressions' evaluation result on empty batch
pub collected_count_expr_map: HashMap<LogicalPlan, ExprResultMap>,
// pull up having expr, which must be evaluated after the Join
/// pull up having expr, which must be evaluated after the Join
pub pull_up_having_expr: Option<Expr>,
}

impl Default for PullUpCorrelatedExpr {
fn default() -> Self {
Self::new()
}
}

impl PullUpCorrelatedExpr {
pub fn new() -> Self {
Self {
join_filters: vec![],
correlated_subquery_cols_map: HashMap::new(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: HashMap::new(),
pull_up_having_expr: None,
}
}

/// Set if we need to handle the Count bug during the pull up process
pub fn with_need_handle_count_bug(mut self, need_handle_count_bug: bool) -> Self {
self.need_handle_count_bug = need_handle_count_bug;
self
}

/// Set the in_predicate_opt
pub fn with_in_predicate_opt(mut self, in_predicate_opt: Option<Expr>) -> Self {
self.in_predicate_opt = in_predicate_opt;
self
}

/// Set if this is an Exists(Not Exists) SubQuery
pub fn with_exists_sub_query(mut self, exists_sub_query: bool) -> Self {
self.exists_sub_query = exists_sub_query;
self
}
}

/// Used to indicate the unmatched rows from the inner(subquery) table after the left out Join
/// This is used to handle the Count bug
pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true";
Expand Down
14 changes: 4 additions & 10 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,10 @@ fn build_join(
let subquery = query_info.query.subquery.as_ref();
let subquery_alias = alias.next("__correlated_sq");

let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: in_predicate_opt.clone(),
exists_sub_query: in_predicate_opt.is_none(),
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new()
.with_in_predicate_opt(in_predicate_opt.clone())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were the two fields that are set differently which I think is much clearer now

.with_exists_sub_query(in_predicate_opt.is_none());

let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down
11 changes: 1 addition & 10 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,7 @@ fn build_join(
subquery_alias: &str,
) -> Result<Option<(LogicalPlan, HashMap<String, Expr>)>> {
let subquery_plan = subquery.subquery.as_ref();
let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: true,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down