Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform with payload #8664

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,52 @@ pub trait TreeNode: Sized {
self.transform_up(op)
}

/// Transforms the tree using `f_down` and `f_up` closures. `f_down` is applied on a
/// node while traversing the tree top-down (pre-order, before the node's children are
/// visited) while `f_up` is applied on a node while traversing the tree bottom-up
/// (post-order, after the the node's children are visited).
///
/// The `f_down` closure takes
/// - a `PD` type payload from its parent
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `PC` type payload to pass to `f_up`,
/// - a `Vec<PD>` type payload to propagate down to the node's children
/// (one `PD` element is propagated down to each child).
///
/// The `f_up` closure takes
/// - a `PC` type payload from `f_down` and
/// - a `Vec<PU>` type payload collected from the node's children
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `PU` type payload to propagate up to the node's parent.
fn transform_with_payload<FD, PD, PC, FU, PU>(
self,
f_down: &mut FD,
payload_down: PD,
f_up: &mut FU,
) -> Result<(Self, PU)>
where
FD: FnMut(Self, PD) -> Result<(Transformed<Self>, Vec<PD>, PC)>,
FU: FnMut(Self, PC, Vec<PU>) -> Result<(Transformed<Self>, PU)>,
{
let (new_node, new_payload_down, payload_current) = f_down(self, payload_down)?;
let mut new_payload_down_iter = new_payload_down.into_iter();
let mut payload_up = vec![];
let node_with_new_children = new_node.into().map_children(|node| {
let (new_node, p) = node.transform_with_payload(
f_down,
new_payload_down_iter.next().unwrap(),
f_up,
)?;
payload_up.push(p);
Ok(new_node)
})?;
let (new_node, new_payload_up) =
f_up(node_with_new_children, payload_current, payload_up)?;
Ok((new_node.into(), new_payload_up))
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' to the node and all of its
/// children(Preorder Traversal).
/// When the `op` does not apply to a given node, it is left unchanged.
Expand All @@ -135,6 +181,26 @@ pub trait TreeNode: Sized {
after_op.map_children(|node| node.transform_down_mut(op))
}

/// Transforms the tree using `f` closure. `f` is applied on a node while traversing
/// the tree top-down (pre-order, before the node's children are visited).
///
/// The `f` closure takes
/// - a `P` type payload from its parent
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `Vec<P>` type payload to propagate down to the node's children
/// (one `P` element is propagated down to each child).
fn transform_down_with_payload<F, P>(self, f: &mut F, payload: P) -> Result<Self>
where
F: FnMut(Self, P) -> Result<(Transformed<Self>, Vec<P>)>,
{
let (new_node, new_payload) = f(self, payload)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially reduce redundancy by implementing this function terms of transform_with_payload and empty f_up and PU=() -- it probably doesn't matter but I figured I would bring it up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, both the down and up versions can be implemented using transform_with_payload. But I would keep them separated in this PR and check later what happens if we we use empty f_up and PU=(). Does the "up logic" get optimized out by the compiler at "down invocations"? I.e. payload_up vec is not allocated?

let mut new_payload_iter = new_payload.into_iter();
new_node.into().map_children(|node| {
node.transform_down_with_payload(f, new_payload_iter.next().unwrap())
})
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
/// children and then itself(Postorder Traversal).
/// When the `op` does not apply to a given node, it is left unchanged.
Expand All @@ -159,6 +225,28 @@ pub trait TreeNode: Sized {
Ok(new_node)
}

/// Transforms the tree using `f` closure. `f` is applied on a node while traversing
/// the tree bottom-up (post-order, after the the node's children are visited).
///
/// The `f_up` closure takes
/// - a `Vec<P>` type payload collected from the node's children
/// and returns a tuple made of:
/// - a possibly modified node,
/// - a `P` type payload to propagate up to the node's parent.
fn transform_up_with_payload<F, P>(self, f: &mut F) -> Result<(Self, P)>
where
F: FnMut(Self, Vec<P>) -> Result<(Transformed<Self>, P)>,
{
let mut payload = vec![];
let node_with_new_children = self.map_children(|node| {
let (new_node, p) = node.transform_up_with_payload(f)?;
payload.push(p);
Ok(new_node)
})?;
let (new_node, new_payload) = f(node_with_new_children, payload)?;
Ok((new_node.into(), new_payload))
}

/// Transform the tree node using the given [TreeNodeRewriter]
/// It performs a depth first walk of an node and its children.
///
Expand Down
150 changes: 72 additions & 78 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,7 @@ impl PhysicalOptimizerRule for EnforceDistribution {

let adjusted = if top_down_join_key_reordering {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted =
plan_requirements.transform_down(&adjust_input_keys_ordering)?;
adjusted.plan
plan.transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])?
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Expand Down Expand Up @@ -267,12 +264,17 @@ impl PhysicalOptimizerRule for EnforceDistribution {
/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
/// 5) For other types of operators, by default, pushdown the parent requirements to children.
///
fn adjust_input_keys_ordering(
mut requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = requirements.plan.clone();
type RequiredKeyOrdering = Vec<Arc<dyn PhysicalExpr>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make this code simpler to make this struct RequiredKeyOrder and give its methods names rather than dealing with Vec<Vec<...>> -- but that can be a refactor for a different PR perhaps


if let Some(HashJoinExec {
#[allow(clippy::type_complexity)]
fn adjust_input_keys_ordering(
plan: Arc<dyn ExecutionPlan>,
parent_required: RequiredKeyOrdering,
) -> Result<(
Transformed<Arc<dyn ExecutionPlan>>,
Vec<RequiredKeyOrdering>,
)> {
let request_key_ordering = if let Some(HashJoinExec {
left,
right,
on,
Expand Down Expand Up @@ -301,43 +303,47 @@ fn adjust_input_keys_ordering(
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
)
.map(Transformed::Yes);
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
}
PartitionMode::CollectLeft => {
// Push down requirements to the right side
requirements.children[1].data = match join_type {
let new_right_request = match join_type {
JoinType::Inner | JoinType::Right => shift_right_required(
&requirements.data,
&parent_required,
left.schema().fields().len(),
)
.unwrap_or_default(),
),
JoinType::RightSemi | JoinType::RightAnti => {
requirements.data.clone()
Some(parent_required.clone())
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => vec![],
| JoinType::Full => None,
};
vec![vec![], new_right_request.unwrap_or_default()]
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan.as_any().downcast_ref::<CrossJoinExec>()
{
let left_columns_len = left.schema().fields().len();
// Push down requirements to the right side
requirements.children[1].data =
shift_right_required(&requirements.data, left_columns_len)
.unwrap_or_default();
vec![
vec![],
shift_right_required(&parent_required, left_columns_len).unwrap_or_default(),
]
} else if let Some(SortMergeJoinExec {
left,
right,
Expand All @@ -363,35 +369,45 @@ fn adjust_input_keys_ordering(
.map(|e| Arc::new(e) as _)
};
return reorder_partitioned_join_keys(
requirements,
plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
)
.map(Transformed::Yes);
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
} else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
if !requirements.data.is_empty() {
if !parent_required.is_empty() {
if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
return reorder_aggregate_keys(requirements, aggregate_exec)
.map(Transformed::Yes);
return reorder_aggregate_keys(
plan.clone(),
&parent_required,
aggregate_exec,
)
.map(|(plan, request_key_ordering)| {
(Transformed::Yes(plan), request_key_ordering)
});
} else {
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
} else {
// Keep everything unchanged
return Ok(Transformed::No(requirements));
let request_key_ordering = vec![vec![]; plan.children().len()];
return Ok((Transformed::No(plan), request_key_ordering));
}
} else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
let expr = proj.expr();
// For Projection, we need to transform the requirements to the columns before the Projection
// And then to push down the requirements
// Construct a mapping from new name to the the orginal Column
let new_required = map_columns_before_projection(&requirements.data, expr);
if new_required.len() == requirements.data.len() {
requirements.children[0].data = new_required;
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
vec![new_required]
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
requirements.data.clear();
vec![vec![]; plan.children().len()]
}
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
|| plan
Expand All @@ -400,30 +416,28 @@ fn adjust_input_keys_ordering(
.is_some()
|| plan.as_any().downcast_ref::<WindowAggExec>().is_some()
{
requirements.data.clear();
vec![vec![]; plan.children().len()]
} else {
// By default, push down the parent requirements to children
for child in requirements.children.iter_mut() {
child.data = requirements.data.clone();
}
}
Ok(Transformed::Yes(requirements))
vec![parent_required.clone(); plan.children().len()]
};
Ok((Transformed::Yes(plan), request_key_ordering))
}

fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
mut join_plan: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: Vec<SortOptions>,
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)>
where
F: Fn(
(Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
) -> Result<Arc<dyn ExecutionPlan>>,
{
let parent_required = &join_plan.data;
let join_key_pairs = extract_join_keys(on);
let eq_properties = join_plan.plan.equivalence_properties();
let eq_properties = join_plan.equivalence_properties();

if let Some((
JoinKeyPairs {
Expand All @@ -438,25 +452,22 @@ where
let new_sort_options = (0..sort_options.len())
.map(|idx| sort_options[new_positions[idx]])
.collect();
join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
join_plan = join_constructor((new_join_on, new_sort_options))?;
}
let mut requirements = join_plan;
requirements.children[0].data = left_keys;
requirements.children[1].data = right_keys;
Ok(requirements)
Ok((join_plan, vec![left_keys, right_keys]))
} else {
let mut requirements = join_plan;
requirements.children[0].data = join_key_pairs.left_keys;
requirements.children[1].data = join_key_pairs.right_keys;
Ok(requirements)
Ok((
join_plan,
vec![join_key_pairs.left_keys, join_key_pairs.right_keys],
))
}
}

fn reorder_aggregate_keys(
mut agg_node: PlanWithKeyRequirements,
agg_node: Arc<dyn ExecutionPlan>,
parent_required: &[Arc<dyn PhysicalExpr>],
agg_exec: &AggregateExec,
) -> Result<PlanWithKeyRequirements> {
let parent_required = &agg_node.data;
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)> {
let output_columns = agg_exec
.group_by()
.expr()
Expand Down Expand Up @@ -508,18 +519,10 @@ fn reorder_aggregate_keys(
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
partial_agg.clone(),
partial_agg,
agg_exec.input_schema(),
)?);

agg_node.plan = new_final_agg.clone();
agg_node.data.clear();
agg_node.children = vec![PlanWithKeyRequirements::new(
partial_agg as _,
vec![],
agg_node.children.swap_remove(0).children,
)];

// Need to create a new projection to change the expr ordering back
let agg_schema = new_final_agg.schema();
let mut proj_exprs = output_columns
Expand All @@ -538,14 +541,14 @@ fn reorder_aggregate_keys(
let plan = Arc::new(Column::new(name, idx)) as _;
proj_exprs.push((plan, name.clone()))
}
return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
});
return ProjectionExec::try_new(proj_exprs, new_final_agg)
.map(|p| (Arc::new(p) as _, vec![vec![]; 1]));
}
}
}
}
Ok(agg_node)
};
let request_key_ordering = vec![vec![]; agg_node.children().len()];
Ok((agg_node, request_key_ordering))
}

fn shift_right_required(
Expand Down Expand Up @@ -1295,9 +1298,6 @@ struct JoinKeyPairs {
right_keys: Vec<Arc<dyn PhysicalExpr>>,
}

/// Keeps track of parent required key orderings.
type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;

/// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on
#[cfg(feature = "parquet")]
#[cfg(test)]
Expand Down Expand Up @@ -1783,13 +1783,7 @@ pub(crate) mod tests {
{
let adjusted = if config.optimizer.top_down_join_key_reordering {
// Run adjust_input_keys_ordering rule
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
$PLAN.clone().transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])?
} else {
// Run reorder_join_keys_to_inputs rule
$PLAN.clone().transform_up(&|plan| {
Expand Down
Loading
Loading