diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 3959223e68c1..b55b1a7f8f2d 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -38,25 +38,71 @@ use datafusion_physical_expr::execution_props::ExecutionProps; /// 'Filter'. It adds the inner reference columns to the 'Projection' or /// 'Aggregate' of the subquery if they are missing, so that they can be /// evaluated by the parent operator as the join condition. +#[derive(Debug)] pub struct PullUpCorrelatedExpr { pub join_filters: Vec, - // mapping from the plan to its holding correlated columns + /// mapping from the plan to its holding correlated columns pub correlated_subquery_cols_map: HashMap>, pub in_predicate_opt: Option, - // indicate whether it is Exists(Not Exists) SubQuery + /// Is this an Exists(Not Exists) SubQuery. Defaults to **FALSE** pub exists_sub_query: bool, - // indicate whether the correlated expressions can pull up or not + /// Can the correlated expressions be pulled up. Defaults to **TRUE** pub can_pull_up: bool, - // indicate whether need to handle the Count bug during the pull up process + /// Do we need to handle [the Count bug] during the pull up process + /// + /// [the Count bug]: https://github.com/apache/datafusion/pull/10500 pub need_handle_count_bug: bool, - // mapping from the plan to its expressions' evaluation result on empty batch + /// mapping from the plan to its expressions' evaluation result on empty batch pub collected_count_expr_map: HashMap, - // pull up having expr, which must be evaluated after the Join + /// pull up having expr, which must be evaluated after the Join pub pull_up_having_expr: Option, } +impl Default for PullUpCorrelatedExpr { + fn default() -> Self { + Self::new() + } +} + +impl PullUpCorrelatedExpr { + pub fn new() -> Self { + Self { + join_filters: vec![], + correlated_subquery_cols_map: HashMap::new(), + in_predicate_opt: None, + exists_sub_query: false, + can_pull_up: true, + need_handle_count_bug: false, + collected_count_expr_map: HashMap::new(), + pull_up_having_expr: None, + } + } + + /// Set if we need to handle [the Count bug] during the pull up process + /// + /// [the Count bug]: https://github.com/apache/datafusion/pull/10500 + pub fn with_need_handle_count_bug(mut self, need_handle_count_bug: bool) -> Self { + self.need_handle_count_bug = need_handle_count_bug; + self + } + + /// Set the in_predicate_opt + pub fn with_in_predicate_opt(mut self, in_predicate_opt: Option) -> Self { + self.in_predicate_opt = in_predicate_opt; + self + } + + /// Set if this is an Exists(Not Exists) SubQuery + pub fn with_exists_sub_query(mut self, exists_sub_query: bool) -> Self { + self.exists_sub_query = exists_sub_query; + self + } +} + /// Used to indicate the unmatched rows from the inner(subquery) table after the left out Join -/// This is used to handle the Count bug +/// This is used to handle [the Count bug] +/// +/// [the Count bug]: https://github.com/apache/datafusion/pull/10500 pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true"; /// Mapping from expr display name to its evaluation result on empty record diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 58fd8557194f..88ce300e5c9a 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -248,16 +248,10 @@ fn build_join( let subquery = query_info.query.subquery.as_ref(); let subquery_alias = alias.next("__correlated_sq"); - let mut pull_up = PullUpCorrelatedExpr { - join_filters: vec![], - correlated_subquery_cols_map: Default::default(), - in_predicate_opt: in_predicate_opt.clone(), - exists_sub_query: in_predicate_opt.is_none(), - can_pull_up: true, - need_handle_count_bug: false, - collected_count_expr_map: Default::default(), - pull_up_having_expr: None, - }; + let mut pull_up = PullUpCorrelatedExpr::new() + .with_in_predicate_opt(in_predicate_opt.clone()) + .with_exists_sub_query(in_predicate_opt.is_none()); + let new_plan = subquery.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(None); diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 71692b934543..cb28961497f4 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -305,16 +305,7 @@ fn build_join( subquery_alias: &str, ) -> Result)>> { let subquery_plan = subquery.subquery.as_ref(); - let mut pull_up = PullUpCorrelatedExpr { - join_filters: vec![], - correlated_subquery_cols_map: Default::default(), - in_predicate_opt: None, - exists_sub_query: false, - can_pull_up: true, - need_handle_count_bug: true, - collected_count_expr_map: Default::default(), - pull_up_having_expr: None, - }; + let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true); let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?; if !pull_up.can_pull_up { return Ok(None);