Skip to content

Commit

Permalink
Update comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Feb 15, 2024
1 parent fe0b1fe commit 27b6805
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 21 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub fn check_finiteness_requirements(
the 'allow_symmetric_joins_without_pruning' configuration flag");
}
}
if !input.unbounded_output().is_executable() {
if !input.unbounded_output().pipeline_friendly() {
Err(plan_datafusion_err!(
"Cannot execute pipeline breaking queries, operator: {:?}",
input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ pub(crate) fn replace_with_order_preserving_variants(
// For unbounded cases, we replace with the order-preserving variant in any
// case, as doing so helps fix the pipeline. Also replace if config allows.
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
|| !requirements.plan.unbounded_output().is_executable();
|| !requirements.plan.unbounded_output().pipeline_friendly();

// Create an alternate plan with order-preserving variants:
let mut alternate_plan = plan_with_order_preserving_variants(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ impl AggregateExec {
&& self.input_order_mode == InputOrderMode::Linear
{
// Cannot run without breaking pipeline.
unbounded_output = ExecutionMode::InExecutable;
unbounded_output = ExecutionMode::PipelineBreaking;
}

self.cache = PlanPropertiesCache::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl CrossJoinExec {
let exec_mode = match (left_unbounded, right_unbounded) {
(ExecutionMode::Bounded, ExecutionMode::Bounded) => ExecutionMode::Bounded,
// If any of the inputs is unbounded, cross join break pipeline.
(_, _) => ExecutionMode::InExecutable,
(_, _) => ExecutionMode::PipelineBreaking,
};

self.cache =
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ impl HashJoinExec {
));

let exec_mode = if breaking {
ExecutionMode::InExecutable
ExecutionMode::PipelineBreaking
} else {
exec_mode_flatten([left, right])
};
Expand Down
72 changes: 60 additions & 12 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,45 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
}
}

/// Describes the execution mode of each operator's resulting stream.
///
/// This enum defines the possible execution modes:
///
/// - `Bounded`: Represents a mode where execution is limited or constrained within certain bounds.
/// In this mode, the process operates within defined limits or boundaries.
///
/// - `Unbounded`: Indicates a mode where execution is not limited by any specific bounds.
/// Processes or tasks in this mode can operate without constraints or limitations.
///
/// - `PipelineBreaking`: Denotes a mode where execution can potentially break pipeline constraints.
/// This mode may disrupt established pipelines or processes that rely on sequential operation.
///
/// This enum can be used to specify the behavior or characteristics of a process or task
/// in various execution scenarios.
#[derive(Clone, Copy, PartialEq, Debug)]
pub enum ExecutionMode {
/// Represents the mode where generated stream is bounded, e.g. finite.
Bounded,
/// Represents the mode where generated stream is unbounded, e.g. infinite.
/// Operator can generate streaming results with bounded memory.
/// In this mode, execution can still continue successfully.
Unbounded,
InExecutable,
/// Represents the mode, where input stream to the operator is unbounded. However,
/// operator cannot generate streaming results from streaming inputs. In this case,
/// execution mode will be pipeline breaking. e.g. operator requires unbounded memory
/// to generate its result.
PipelineBreaking,
}

impl ExecutionMode {
/// Check whether the execution mode is unbounded or not.
pub fn is_unbounded(&self) -> bool {
matches!(self, ExecutionMode::Unbounded)
}

pub fn is_executable(&self) -> bool {
/// Check whether the execution is pipeline friendly. If so, operator can execute
/// safely.
pub fn pipeline_friendly(&self) -> bool {
matches!(self, ExecutionMode::Bounded | ExecutionMode::Unbounded)
}
}
Expand All @@ -471,10 +497,11 @@ fn exec_mode_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode {
let mut result = ExecutionMode::Bounded;
for mode in modes {
match (mode, result) {
(ExecutionMode::InExecutable, _) | (_, ExecutionMode::InExecutable) => {
// If any of the modes is `InExecutable`. result is `InExecutable`
(ExecutionMode::PipelineBreaking, _)
| (_, ExecutionMode::PipelineBreaking) => {
// If any of the modes is `InExecutable`. result is `PipelineBreaking`
// Return immediately
return ExecutionMode::InExecutable;
return ExecutionMode::PipelineBreaking;
}
(
ExecutionMode::Unbounded,
Expand All @@ -493,6 +520,9 @@ fn exec_mode_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode {
result
}

/// Constructs execution mode of the operator using its children.
/// This util assumes operator applied is pipeline friendly. For operators
/// not pipeline friendly, `exec_mode_safe_flatten` should be used.
fn exec_mode_flatten<'a>(
children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
) -> ExecutionMode {
Expand All @@ -507,21 +537,23 @@ fn exec_mode_safe_flatten_helper(modes: &[ExecutionMode]) -> ExecutionMode {
let mut result = ExecutionMode::Bounded;
for mode in modes {
match (mode, result) {
(ExecutionMode::Unbounded | ExecutionMode::InExecutable, _)
| (_, ExecutionMode::Unbounded | ExecutionMode::InExecutable) => {
// If any of the modes is `InExecutable`. result is `InExecutable`
// Return immediately
return ExecutionMode::InExecutable;
}
(ExecutionMode::Bounded, ExecutionMode::Bounded) => {
// When both modes are bounded, result is bounded
result = ExecutionMode::Bounded;
}
(_, _) => {
// If any of the modes is `InExecutable` or `Unbounded`. result is `PipelineBreaking`
// Return immediately
return ExecutionMode::PipelineBreaking;
}
}
}
result
}

/// Constructs execution mode of the operator using its children.
/// This util assumes operator applied is not pipeline friendly. For operators
/// that pipeline friendly, `exec_mode_flatten` should be used.
fn exec_mode_safe_flatten<'a>(
children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
) -> ExecutionMode {
Expand All @@ -532,20 +564,30 @@ fn exec_mode_safe_flatten<'a>(
exec_mode_safe_flatten_helper(&modes)
}

/// Represents a cache for plan properties used in query optimization.
///
/// This struct holds various properties useful for the query planning, which are used
/// during optimization and execution phases.
#[derive(Debug, Clone)]
pub struct PlanPropertiesCache {
/// Stores Equivalence Properties of the [`ExecutionPlan`]. See [`EquivalenceProperties`]
pub eq_properties: EquivalenceProperties,
/// Stores Output Partitioning of the [`ExecutionPlan`]. See [`Partitioning`]
pub partitioning: Partitioning,
/// Stores Execution Mode of the [`ExecutionPlan`]. See [`ExecutionMode`]
pub exec_mode: ExecutionMode,
/// Stores output ordering of the [`ExecutionPlan`]. `None` represents, no ordering.
output_ordering: Option<LexOrdering>,
}

impl PlanPropertiesCache {
/// Construct a new `PlanPropertiesCache` from the
pub fn new(
eq_properties: EquivalenceProperties,
partitioning: Partitioning,
exec_mode: ExecutionMode,
) -> Self {
// Output ordering can be derived from `eq_properties`.
let output_ordering = eq_properties.oeq_class().output_ordering();
Self {
eq_properties,
Expand All @@ -555,11 +597,13 @@ impl PlanPropertiesCache {
}
}

/// Construct a default `PlanPropertiesCache`, for a given schema.
pub fn new_default(schema: SchemaRef) -> PlanPropertiesCache {
// Defaults are most restrictive possible values.
let eq_properties = EquivalenceProperties::new(schema);
// Please note that this default is not safe, and should be overwritten.
let partitioning = Partitioning::UnknownPartitioning(0);
let exec_mode = ExecutionMode::InExecutable;
let exec_mode = ExecutionMode::PipelineBreaking;
let output_ordering = None;
Self {
eq_properties,
Expand All @@ -569,17 +613,21 @@ impl PlanPropertiesCache {
}
}

/// Overwrite partitioning with its new value
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
self.partitioning = partitioning;
self
}

/// Overwrite Execution Mode with its new value
pub fn with_exec_mode(mut self, exec_mode: ExecutionMode) -> Self {
self.exec_mode = exec_mode;
self
}

/// Overwrite Equivalence Properties with its new value
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
// Changing equivalence properties, changes output ordering also. Make sure to overwrite it.
self.output_ordering = eq_properties.oeq_class().output_ordering();
self.eq_properties = eq_properties;
self
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,8 @@ impl SortExec {

// Execution Mode
let exec_mode = match self.input.unbounded_output() {
ExecutionMode::Unbounded | ExecutionMode::InExecutable => {
ExecutionMode::InExecutable
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
ExecutionMode::PipelineBreaking
}
ExecutionMode::Bounded => ExecutionMode::Bounded,
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl WindowAggExec {
// unbounded output
let unbounded_output = match self.input.unbounded_output() {
ExecutionMode::Bounded => ExecutionMode::Bounded,
ExecutionMode::Unbounded | ExecutionMode::InExecutable => {
ExecutionMode::InExecutable
ExecutionMode::Unbounded | ExecutionMode::PipelineBreaking => {
ExecutionMode::PipelineBreaking
}
};

Expand Down

0 comments on commit 27b6805

Please sign in to comment.