-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Transform with payload #8664
Closed
Closed
Transform with payload #8664
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -196,10 +196,7 @@ impl PhysicalOptimizerRule for EnforceDistribution { | |
|
||
let adjusted = if top_down_join_key_reordering { | ||
// Run a top-down process to adjust input key ordering recursively | ||
let plan_requirements = PlanWithKeyRequirements::new_default(plan); | ||
let adjusted = | ||
plan_requirements.transform_down(&adjust_input_keys_ordering)?; | ||
adjusted.plan | ||
plan.transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])? | ||
} else { | ||
// Run a bottom-up process | ||
plan.transform_up(&|plan| { | ||
|
@@ -267,12 +264,17 @@ impl PhysicalOptimizerRule for EnforceDistribution { | |
/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements | ||
/// 5) For other types of operators, by default, pushdown the parent requirements to children. | ||
/// | ||
fn adjust_input_keys_ordering( | ||
mut requirements: PlanWithKeyRequirements, | ||
) -> Result<Transformed<PlanWithKeyRequirements>> { | ||
let plan = requirements.plan.clone(); | ||
type RequiredKeyOrdering = Vec<Arc<dyn PhysicalExpr>>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might make this code simpler to make this |
||
|
||
if let Some(HashJoinExec { | ||
#[allow(clippy::type_complexity)] | ||
fn adjust_input_keys_ordering( | ||
plan: Arc<dyn ExecutionPlan>, | ||
parent_required: RequiredKeyOrdering, | ||
) -> Result<( | ||
Transformed<Arc<dyn ExecutionPlan>>, | ||
Vec<RequiredKeyOrdering>, | ||
)> { | ||
let request_key_ordering = if let Some(HashJoinExec { | ||
left, | ||
right, | ||
on, | ||
|
@@ -301,43 +303,47 @@ fn adjust_input_keys_ordering( | |
.map(|e| Arc::new(e) as _) | ||
}; | ||
return reorder_partitioned_join_keys( | ||
requirements, | ||
plan.clone(), | ||
&parent_required, | ||
on, | ||
vec![], | ||
&join_constructor, | ||
) | ||
.map(Transformed::Yes); | ||
.map(|(plan, request_key_ordering)| { | ||
(Transformed::Yes(plan), request_key_ordering) | ||
}); | ||
} | ||
PartitionMode::CollectLeft => { | ||
// Push down requirements to the right side | ||
requirements.children[1].data = match join_type { | ||
let new_right_request = match join_type { | ||
JoinType::Inner | JoinType::Right => shift_right_required( | ||
&requirements.data, | ||
&parent_required, | ||
left.schema().fields().len(), | ||
) | ||
.unwrap_or_default(), | ||
), | ||
JoinType::RightSemi | JoinType::RightAnti => { | ||
requirements.data.clone() | ||
Some(parent_required.clone()) | ||
} | ||
JoinType::Left | ||
| JoinType::LeftSemi | ||
| JoinType::LeftAnti | ||
| JoinType::Full => vec![], | ||
| JoinType::Full => None, | ||
}; | ||
vec![vec![], new_right_request.unwrap_or_default()] | ||
} | ||
PartitionMode::Auto => { | ||
// Can not satisfy, clear the current requirements and generate new empty requirements | ||
requirements.data.clear(); | ||
vec![vec![]; plan.children().len()] | ||
} | ||
} | ||
} else if let Some(CrossJoinExec { left, .. }) = | ||
plan.as_any().downcast_ref::<CrossJoinExec>() | ||
{ | ||
let left_columns_len = left.schema().fields().len(); | ||
// Push down requirements to the right side | ||
requirements.children[1].data = | ||
shift_right_required(&requirements.data, left_columns_len) | ||
.unwrap_or_default(); | ||
vec![ | ||
vec![], | ||
shift_right_required(&parent_required, left_columns_len).unwrap_or_default(), | ||
] | ||
} else if let Some(SortMergeJoinExec { | ||
left, | ||
right, | ||
|
@@ -363,35 +369,45 @@ fn adjust_input_keys_ordering( | |
.map(|e| Arc::new(e) as _) | ||
}; | ||
return reorder_partitioned_join_keys( | ||
requirements, | ||
plan.clone(), | ||
&parent_required, | ||
on, | ||
sort_options.clone(), | ||
&join_constructor, | ||
) | ||
.map(Transformed::Yes); | ||
.map(|(plan, request_key_ordering)| { | ||
(Transformed::Yes(plan), request_key_ordering) | ||
}); | ||
} else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() { | ||
if !requirements.data.is_empty() { | ||
if !parent_required.is_empty() { | ||
if aggregate_exec.mode() == &AggregateMode::FinalPartitioned { | ||
return reorder_aggregate_keys(requirements, aggregate_exec) | ||
.map(Transformed::Yes); | ||
return reorder_aggregate_keys( | ||
plan.clone(), | ||
&parent_required, | ||
aggregate_exec, | ||
) | ||
.map(|(plan, request_key_ordering)| { | ||
(Transformed::Yes(plan), request_key_ordering) | ||
}); | ||
} else { | ||
requirements.data.clear(); | ||
vec![vec![]; plan.children().len()] | ||
} | ||
} else { | ||
// Keep everything unchanged | ||
return Ok(Transformed::No(requirements)); | ||
let request_key_ordering = vec![vec![]; plan.children().len()]; | ||
return Ok((Transformed::No(plan), request_key_ordering)); | ||
} | ||
} else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() { | ||
let expr = proj.expr(); | ||
// For Projection, we need to transform the requirements to the columns before the Projection | ||
// And then to push down the requirements | ||
// Construct a mapping from new name to the the orginal Column | ||
let new_required = map_columns_before_projection(&requirements.data, expr); | ||
if new_required.len() == requirements.data.len() { | ||
requirements.children[0].data = new_required; | ||
let new_required = map_columns_before_projection(&parent_required, expr); | ||
if new_required.len() == parent_required.len() { | ||
vec![new_required] | ||
} else { | ||
// Can not satisfy, clear the current requirements and generate new empty requirements | ||
requirements.data.clear(); | ||
vec![vec![]; plan.children().len()] | ||
} | ||
} else if plan.as_any().downcast_ref::<RepartitionExec>().is_some() | ||
|| plan | ||
|
@@ -400,30 +416,28 @@ fn adjust_input_keys_ordering( | |
.is_some() | ||
|| plan.as_any().downcast_ref::<WindowAggExec>().is_some() | ||
{ | ||
requirements.data.clear(); | ||
vec![vec![]; plan.children().len()] | ||
} else { | ||
// By default, push down the parent requirements to children | ||
for child in requirements.children.iter_mut() { | ||
child.data = requirements.data.clone(); | ||
} | ||
} | ||
Ok(Transformed::Yes(requirements)) | ||
vec![parent_required.clone(); plan.children().len()] | ||
}; | ||
Ok((Transformed::Yes(plan), request_key_ordering)) | ||
} | ||
|
||
fn reorder_partitioned_join_keys<F>( | ||
mut join_plan: PlanWithKeyRequirements, | ||
mut join_plan: Arc<dyn ExecutionPlan>, | ||
parent_required: &[Arc<dyn PhysicalExpr>], | ||
on: &[(PhysicalExprRef, PhysicalExprRef)], | ||
sort_options: Vec<SortOptions>, | ||
join_constructor: &F, | ||
) -> Result<PlanWithKeyRequirements> | ||
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)> | ||
where | ||
F: Fn( | ||
(Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>), | ||
) -> Result<Arc<dyn ExecutionPlan>>, | ||
{ | ||
let parent_required = &join_plan.data; | ||
let join_key_pairs = extract_join_keys(on); | ||
let eq_properties = join_plan.plan.equivalence_properties(); | ||
let eq_properties = join_plan.equivalence_properties(); | ||
|
||
if let Some(( | ||
JoinKeyPairs { | ||
|
@@ -438,25 +452,22 @@ where | |
let new_sort_options = (0..sort_options.len()) | ||
.map(|idx| sort_options[new_positions[idx]]) | ||
.collect(); | ||
join_plan.plan = join_constructor((new_join_on, new_sort_options))?; | ||
join_plan = join_constructor((new_join_on, new_sort_options))?; | ||
} | ||
let mut requirements = join_plan; | ||
requirements.children[0].data = left_keys; | ||
requirements.children[1].data = right_keys; | ||
Ok(requirements) | ||
Ok((join_plan, vec![left_keys, right_keys])) | ||
} else { | ||
let mut requirements = join_plan; | ||
requirements.children[0].data = join_key_pairs.left_keys; | ||
requirements.children[1].data = join_key_pairs.right_keys; | ||
Ok(requirements) | ||
Ok(( | ||
join_plan, | ||
vec![join_key_pairs.left_keys, join_key_pairs.right_keys], | ||
)) | ||
} | ||
} | ||
|
||
fn reorder_aggregate_keys( | ||
mut agg_node: PlanWithKeyRequirements, | ||
agg_node: Arc<dyn ExecutionPlan>, | ||
parent_required: &[Arc<dyn PhysicalExpr>], | ||
agg_exec: &AggregateExec, | ||
) -> Result<PlanWithKeyRequirements> { | ||
let parent_required = &agg_node.data; | ||
) -> Result<(Arc<dyn ExecutionPlan>, Vec<RequiredKeyOrdering>)> { | ||
let output_columns = agg_exec | ||
.group_by() | ||
.expr() | ||
|
@@ -508,18 +519,10 @@ fn reorder_aggregate_keys( | |
new_group_by, | ||
agg_exec.aggr_expr().to_vec(), | ||
agg_exec.filter_expr().to_vec(), | ||
partial_agg.clone(), | ||
partial_agg, | ||
agg_exec.input_schema(), | ||
)?); | ||
|
||
agg_node.plan = new_final_agg.clone(); | ||
agg_node.data.clear(); | ||
agg_node.children = vec![PlanWithKeyRequirements::new( | ||
partial_agg as _, | ||
vec![], | ||
agg_node.children.swap_remove(0).children, | ||
)]; | ||
|
||
// Need to create a new projection to change the expr ordering back | ||
let agg_schema = new_final_agg.schema(); | ||
let mut proj_exprs = output_columns | ||
|
@@ -538,14 +541,14 @@ fn reorder_aggregate_keys( | |
let plan = Arc::new(Column::new(name, idx)) as _; | ||
proj_exprs.push((plan, name.clone())) | ||
} | ||
return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| { | ||
PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node]) | ||
}); | ||
return ProjectionExec::try_new(proj_exprs, new_final_agg) | ||
.map(|p| (Arc::new(p) as _, vec![vec![]; 1])); | ||
} | ||
} | ||
} | ||
} | ||
Ok(agg_node) | ||
}; | ||
let request_key_ordering = vec![vec![]; agg_node.children().len()]; | ||
Ok((agg_node, request_key_ordering)) | ||
} | ||
|
||
fn shift_right_required( | ||
|
@@ -1295,9 +1298,6 @@ struct JoinKeyPairs { | |
right_keys: Vec<Arc<dyn PhysicalExpr>>, | ||
} | ||
|
||
/// Keeps track of parent required key orderings. | ||
type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>; | ||
|
||
/// Since almost all of these tests explicitly use `ParquetExec` they only run with the parquet feature flag on | ||
#[cfg(feature = "parquet")] | ||
#[cfg(test)] | ||
|
@@ -1783,13 +1783,7 @@ pub(crate) mod tests { | |
{ | ||
let adjusted = if config.optimizer.top_down_join_key_reordering { | ||
// Run adjust_input_keys_ordering rule | ||
let plan_requirements = | ||
PlanWithKeyRequirements::new_default($PLAN.clone()); | ||
let adjusted = plan_requirements | ||
.transform_down(&adjust_input_keys_ordering) | ||
.and_then(check_integrity)?; | ||
// TODO: End state payloads will be checked here. | ||
adjusted.plan | ||
$PLAN.clone().transform_down_with_payload(&mut adjust_input_keys_ordering, vec![])? | ||
} else { | ||
// Run reorder_join_keys_to_inputs rule | ||
$PLAN.clone().transform_up(&|plan| { | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could potentially reduce redundancy by implementing this function terms of
transform_with_payload
and emptyf_up
andPU=()
-- it probably doesn't matter but I figured I would bring it upThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, both the down and up versions can be implemented using
transform_with_payload
. But I would keep them separated in this PR and check later what happens if we we use empty f_up andPU=()
. Does the "up logic" get optimized out by the compiler at "down invocations"? I.e.payload_up
vec is not allocated?