Skip to content

Commit

Permalink
Minor: Add PullUpCorrelatedExpr::new and improve documentation (apa…
Browse files Browse the repository at this point in the history
…che#10500)

* Minor `PullUpCorrelatedExpr::new` and add documentation

* clippy

* Update datafusion/optimizer/src/decorrelate.rs

Co-authored-by: Oleks V <[email protected]>

* Add ticket reference to count bug

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
2 people authored and findepi committed Jul 16, 2024
1 parent cc2bc7c commit ba42abe
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 27 deletions.
60 changes: 53 additions & 7 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,71 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
/// 'Filter'. It adds the inner reference columns to the 'Projection' or
/// 'Aggregate' of the subquery if they are missing, so that they can be
/// evaluated by the parent operator as the join condition.
#[derive(Debug)]
pub struct PullUpCorrelatedExpr {
pub join_filters: Vec<Expr>,
// mapping from the plan to its holding correlated columns
/// mapping from the plan to its holding correlated columns
pub correlated_subquery_cols_map: HashMap<LogicalPlan, BTreeSet<Column>>,
pub in_predicate_opt: Option<Expr>,
// indicate whether it is Exists(Not Exists) SubQuery
/// Is this an Exists(Not Exists) SubQuery. Defaults to **FALSE**
pub exists_sub_query: bool,
// indicate whether the correlated expressions can pull up or not
/// Can the correlated expressions be pulled up. Defaults to **TRUE**
pub can_pull_up: bool,
// indicate whether need to handle the Count bug during the pull up process
/// Do we need to handle [the Count bug] during the pull up process
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub need_handle_count_bug: bool,
// mapping from the plan to its expressions' evaluation result on empty batch
/// mapping from the plan to its expressions' evaluation result on empty batch
pub collected_count_expr_map: HashMap<LogicalPlan, ExprResultMap>,
// pull up having expr, which must be evaluated after the Join
/// pull up having expr, which must be evaluated after the Join
pub pull_up_having_expr: Option<Expr>,
}

impl Default for PullUpCorrelatedExpr {
fn default() -> Self {
Self::new()
}
}

impl PullUpCorrelatedExpr {
pub fn new() -> Self {
Self {
join_filters: vec![],
correlated_subquery_cols_map: HashMap::new(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: HashMap::new(),
pull_up_having_expr: None,
}
}

/// Set if we need to handle [the Count bug] during the pull up process
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub fn with_need_handle_count_bug(mut self, need_handle_count_bug: bool) -> Self {
self.need_handle_count_bug = need_handle_count_bug;
self
}

/// Set the in_predicate_opt
pub fn with_in_predicate_opt(mut self, in_predicate_opt: Option<Expr>) -> Self {
self.in_predicate_opt = in_predicate_opt;
self
}

/// Set if this is an Exists(Not Exists) SubQuery
pub fn with_exists_sub_query(mut self, exists_sub_query: bool) -> Self {
self.exists_sub_query = exists_sub_query;
self
}
}

/// Used to indicate the unmatched rows from the inner(subquery) table after the left out Join
/// This is used to handle the Count bug
/// This is used to handle [the Count bug]
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true";

/// Mapping from expr display name to its evaluation result on empty record
Expand Down
14 changes: 4 additions & 10 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,10 @@ fn build_join(
let subquery = query_info.query.subquery.as_ref();
let subquery_alias = alias.next("__correlated_sq");

let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: in_predicate_opt.clone(),
exists_sub_query: in_predicate_opt.is_none(),
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new()
.with_in_predicate_opt(in_predicate_opt.clone())
.with_exists_sub_query(in_predicate_opt.is_none());

let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down
11 changes: 1 addition & 10 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,7 @@ fn build_join(
subquery_alias: &str,
) -> Result<Option<(LogicalPlan, HashMap<String, Expr>)>> {
let subquery_plan = subquery.subquery.as_ref();
let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: true,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down

0 comments on commit ba42abe

Please sign in to comment.