Skip to content

Commit

Permalink
- Add TreeNode.transform_with_payload(), `TreeNode.transform_down_w…
Browse files Browse the repository at this point in the history
…ith_payload()` and `TreeNode.transform_up_with_payload()`

- Refactor SortPushDown` and `PlanWithRequitements` using `TreeNode.transform_down_with_payload()`
- Refactor `ExprOrdering`, `ExprTreeNode` and `PipelineStatePropagator` using `TreeNode.transform_up_with_payload()`
- Refactor `OrderPreservationContext` and `PlanWithCorrespondingCoalescePartitions`  using `TreeNode.transform_with_payload()`
  • Loading branch information
peter-toth committed Jan 30, 2024
1 parent 488cfe1 commit 0007fe1
Show file tree
Hide file tree
Showing 13 changed files with 726 additions and 760 deletions.
88 changes: 88 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,52 @@ pub trait TreeNode: Sized {
self.transform_up(op)
}

/// Transforms the tree using `f_down` and `f_up` closures. `f_down` is applied on a
/// node while traversing the tree top-down (pre-order, before the node's children are
/// visited) while `f_up` is applied on a node while traversing the tree bottom-up
/// (post-order, after the the node's children are visited).
///
/// The `f_down` closure takes
/// - a `PD` type payload from its parent
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `PC` type payload to pass to `f_up`,
/// - a `Vec<PD>` type payload to propagate down to the node's children
/// (one `PD` element is propagated down to each child).
///
/// The `f_up` closure takes
/// - a `PC` type payload from `f_down` and
/// - a `Vec<PU>` type payload collected from the node's children
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `PU` type payload to propagate up to the node's parent.
fn transform_with_payload<FD, PD, PC, FU, PU>(
self,
f_down: &mut FD,
payload_down: PD,
f_up: &mut FU,
) -> Result<(Self, PU)>
where
FD: FnMut(Self, PD) -> Result<(Transformed<Self>, Vec<PD>, PC)>,
FU: FnMut(Self, PC, Vec<PU>) -> Result<(Transformed<Self>, PU)>,
{
let (new_node, new_payload_down, payload_current) = f_down(self, payload_down)?;
let mut new_payload_down_iter = new_payload_down.into_iter();
let mut payload_up = vec![];
let node_with_new_children = new_node.into().map_children(|node| {
let (new_node, p) = node.transform_with_payload(
f_down,
new_payload_down_iter.next().unwrap(),
f_up,
)?;
payload_up.push(p);
Ok(new_node)
})?;
let (new_node, new_payload_up) =
f_up(node_with_new_children, payload_current, payload_up)?;
Ok((new_node.into(), new_payload_up))
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its
/// children(Preorder Traversal).
/// When the `op` does not apply to a given node, it is left unchanged.
Expand All @@ -135,6 +181,26 @@ pub trait TreeNode: Sized {
after_op.map_children(|node| node.transform_down_mut(op))
}

/// Transforms the tree using `f` closure. `f` is applied on a node while traversing
/// the tree top-down (pre-order, before the node's children are visited).
///
/// The `f` closure takes
/// - a `P` type payload from its parent
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `Vec<P>` type payload to propagate down to the node's children
/// (one `P` element is propagated down to each child).
fn transform_down_with_payload<F, P>(self, f: &mut F, payload: P) -> Result<Self>
where
F: FnMut(Self, P) -> Result<(Transformed<Self>, Vec<P>)>,
{
let (new_node, new_payload) = f(self, payload)?;
let mut new_payload_iter = new_payload.into_iter();
new_node.into().map_children(|node| {
node.transform_down_with_payload(f, new_payload_iter.next().unwrap())
})
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
/// children and then itself(Postorder Traversal).
/// When the `op` does not apply to a given node, it is left unchanged.
Expand All @@ -159,6 +225,28 @@ pub trait TreeNode: Sized {
Ok(new_node)
}

/// Transforms the tree using `f` closure. `f` is applied on a node while traversing
/// the tree bottom-up (post-order, after the the node's children are visited).
///
/// The `f_up` closure takes
/// - a `Vec<P>` type payload collected from the node's children
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `P` type payload to propagate up to the node's parent.
fn transform_up_with_payload<F, P>(self, f: &mut F) -> Result<(Self, P)>
where
F: FnMut(Self, Vec<P>) -> Result<(Transformed<Self>, P)>,
{
let mut payload = vec![];
let node_with_new_children = self.map_children(|node| {
let (new_node, p) = node.transform_up_with_payload(f)?;
payload.push(p);
Ok(new_node)
})?;
let (new_node, new_payload) = f(node_with_new_children, payload)?;
Ok((new_node.into(), new_payload))
}

/// Transform the tree node using the given [TreeNodeRewriter]
/// It performs a depth first walk of an node and its children.
///
Expand Down
150 changes: 72 additions & 78 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {

let adjusted = if top_down_join_key_reordering {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted =
plan_requirements.transform_down(&adjust_input_keys_ordering)?;
adjusted.plan
plan.transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])?
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Expand Down Expand Up @@ -267,12 +264,17 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
/// 5) For other types of operators, by default, pushdown the parent requirements to children.
///
fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = requirements.plan.clone();
type RequiredKeyOrdering = Vec<Arc<dyn PhysicalExpr>>;

if let Some(HashJoinExec {
#[allow(clippy::type_complexity)]
fn adjust_input_keys_ordering(
plan: Arc<dyn ExecutionPlan>,
parent_required: RequiredKeyOrdering,
) -> Result<(
Transformed<Arc<dyn ExecutionPlan>>,
Vec<RequiredKeyOrdering>,
)> {
let request_key_ordering = if let Some(HashJoinExec {
left,
right,
on,
Expand Down Expand Up @@ -301,43 +303,47 @@ fn adjust_input_keys_ordering(
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
)
.map(Transformed::Yes);
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
}
PartitionMode::CollectLeft => {
// Push down requirements to the right side
requirements.children[1].data = match join_type {
let new_right_request = match join_type {
JoinType::Inner | JoinType::Right => shift_right_required(
&requirements.data,
&parent_required,
left.schema().fields().len(),
)
.unwrap_or_default(),
),
JoinType::RightSemi | JoinType::RightAnti => {
requirements.data.clone()
Some(parent_required.clone())
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => vec![],
| JoinType::Full => None,
};
vec![vec![], new_right_request.unwrap_or_default()]
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan.as_any().downcast_ref::<CrossJoinExec>()
{
let left_columns_len = left.schema().fields().len();
// Push down requirements to the right side
requirements.children[1].data =
shift_right_required(&requirements.data, left_columns_len)
.unwrap_or_default();
vec![
vec![],
shift_right_required(&parent_required, left_columns_len).unwrap_or_default(),
]
} else if let Some(SortMergeJoinExec {
left,
right,
Expand All @@ -363,35 +369,45 @@ fn adjust_input_keys_ordering(
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
)
.map(Transformed::Yes);
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
} else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
if !requirements.data.is_empty() {
if !parent_required.is_empty() {
if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
return reorder_aggregate_keys(requirements, aggregate_exec)
.map(Transformed::Yes);
return reorder_aggregate_keys(
plan.clone(),
&parent_required,
aggregate_exec,
)
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
} else {
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
} else {
// Keep everything unchanged
return Ok(Transformed::No(requirements));
let request_key_ordering = vec![vec![]; plan.children().len()];
return Ok((Transformed::No(plan), request_key_ordering));
}
} else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
// For Projection, we need to transform the requirements to the columns before the Projection
// And then to push down the requirements
// Construct a mapping from new name to the the orginal Column
let new_required = map_columns_before_projection(&requirements.data, expr);
if new_required.len() == requirements.data.len() {
requirements.children[0].data = new_required;
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
vec![new_required]
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
Expand All @@ -400,30 +416,28 @@ fn adjust_input_keys_ordering(
.is_some()
|| plan.as_any().downcast_ref::<WindowAggExec>().is_some()
{
requirements.data.clear();
vec![vec![]; plan.children().len()]
} else {
// By default, push down the parent requirements to children
for child in requirements.children.iter_mut() {
child.data = requirements.data.clone();
}
}
Ok(Transformed::Yes(requirements))
vec![parent_required.clone(); plan.children().len()]
};
Ok((Transformed::Yes(plan), request_key_ordering))
}

fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
mut join_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: Vec<SortOptions>,
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)>
where
F: Fn(
(Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
) -> Result<Arc<dyn ExecutionPlan>>,
{
let parent_required = &join_plan.data;
let join_key_pairs = extract_join_keys(on);
let eq_properties = join_plan.plan.equivalence_properties();
let eq_properties = join_plan.equivalence_properties();

if let Some((
JoinKeyPairs {
Expand All @@ -438,25 +452,22 @@ where
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[new_positions[idx]])
.collect();
join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
join_plan = join_constructor((new_join_on, new_sort_options))?;
}
let mut requirements = join_plan;
requirements.children[0].data = left_keys;
requirements.children[1].data = right_keys;
Ok(requirements)
Ok((join_plan, vec![left_keys, right_keys]))
} else {
let mut requirements = join_plan;
requirements.children[0].data = join_key_pairs.left_keys;
requirements.children[1].data = join_key_pairs.right_keys;
Ok(requirements)
Ok((
join_plan,
vec![join_key_pairs.left_keys, join_key_pairs.right_keys],
))
}
}

fn reorder_aggregate_keys(
mut agg_node: PlanWithKeyRequirements,
agg_node: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
agg_exec: &AggregateExec,
) -> Result<PlanWithKeyRequirements> {
let parent_required = &agg_node.data;
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)> {
let output_columns = agg_exec
.group_by()
.expr()
Expand Down Expand Up @@ -508,18 +519,10 @@ fn reorder_aggregate_keys(
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
partial_agg.clone(),
partial_agg,
agg_exec.input_schema(),
)?);

agg_node.plan = new_final_agg.clone();
agg_node.data.clear();
agg_node.children = vec![PlanWithKeyRequirements::new(
partial_agg as _,
vec![],
agg_node.children.swap_remove(0).children,
)];

// Need to create a new projection to change the expr ordering back
let agg_schema = new_final_agg.schema();
let mut proj_exprs = output_columns
Expand All @@ -538,14 +541,14 @@ fn reorder_aggregate_keys(
let plan = Arc::new(Column::new(name, idx)) as _;
proj_exprs.push((plan, name.clone()))
}
return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
});
return ProjectionExec::try_new(proj_exprs, new_final_agg)
.map(|p| (Arc::new(p) as _, vec![vec![]; 1]));
}
}
}
}
Ok(agg_node)
};
let request_key_ordering = vec![vec![]; agg_node.children().len()];
Ok((agg_node, request_key_ordering))
}

fn shift_right_required(
Expand Down Expand Up @@ -1295,9 +1298,6 @@ struct JoinKeyPairs {
right_keys: Vec<Arc<dyn PhysicalExpr>>,
}

/// Keeps track of parent required key orderings.
type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;

/// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on
#[cfg(feature = "parquet")]
#[cfg(test)]
Expand Down Expand Up @@ -1783,13 +1783,7 @@ pub(crate) mod tests {
{
let adjusted = if config.optimizer.top_down_join_key_reordering {
// Run adjust_input_keys_ordering rule
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
$PLAN.clone().transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])?
} else {
// Run reorder_join_keys_to_inputs rule
$PLAN.clone().transform_up(&|plan| {
Expand Down
Loading

0 comments on commit 0007fe1

Please sign in to comment.