Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SortPreservingMerge::enable_round_robin_repartition docs #13826

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {

/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
///
/// To address the issue of unbalanced polling between partitions due to tie-breakers being based
/// on partition index, especially in cases of low cardinality, we are making changes to the winner
/// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners,
/// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions
/// to grow excessively, as they continued receiving data without consuming it.
/// This option controls the tie-breaker strategy and attempts to avoid the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this related description and updated it to describe the current state of the code, rather than the changes that were made previously

/// issue of unbalanced polling between partitions
///
/// For example, an upstream operator like a repartition execution would keep sending data to certain partitions,
/// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage.
/// If `true`, when multiple partitions have the same value, the partition
/// that has the fewest poll counts is selected. This strategy ensures that
/// multiple partitions with the same value are chosen equally, distributing
/// the polling load in a round-robin fashion. This approach balances the
/// workload more effectively across partitions and avoids excessive buffer
/// growth.
///
/// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index,
/// we now select the partition that has the fewest poll counts for the same value.
/// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion.
/// This approach balances the workload more effectively across partitions and avoids excessive buffer growth.
/// if `false`, partitions with smaller indices are consistently chosen as
/// the winners, which can lead to an uneven distribution of polling and potentially
/// causing upstream operator buffers for the other partitions to grow
/// excessively, as they continued receiving data without consuming it.
///
/// For example, an upstream operator like `RepartitonExec` execution would
/// keep sending data to certain partitions, but those partitions wouldn't
/// consume the data if they weren't selected as winners. This resulted in
/// inefficient buffer usage.
enable_round_robin_tie_breaker: bool,

/// Flag indicating whether we are in the mode of round-robin
Expand Down
45 changes: 34 additions & 11 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Defines the sort preserving merge plan
//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream.

use std::any::Any;
use std::sync::Arc;
Expand All @@ -38,10 +38,22 @@ use log::{debug, trace};

/// Sort preserving merge execution plan
///
/// This takes an input execution plan and a list of sort expressions, and
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
/// # Overview
///
/// This operator implements a K-way merge. It is used to merge multiple sorted
/// streams into a single sorted stream and is highly optimized.
///
/// ## Inputs:
///
/// 1. A list of sort expressions
/// 2. An input plan, where each partition is sorted with respect to
/// these sort expressions.
///
/// ## Output:
///
/// 1. A single partition that is also sorted with respect to the expressions
///
/// ## Diagram
///
/// ```text
/// ┌─────────────────────────┐
Expand All @@ -55,12 +67,12 @@ use log::{debug, trace};
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
/// └─────────────────────────┘ places equal rows from stream 1
/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`:
/// └─────────────────────────┘ the merged stream places equal rows from stream 1
/// Stream 2
///
///
/// Input Streams Output stream
/// Input Partitions Output Partition
/// (sorted) (sorted)
/// ```
///
Expand All @@ -70,7 +82,7 @@ use log::{debug, trace};
/// the output and inputs are not polled again.
#[derive(Debug, Clone)]
pub struct SortPreservingMergeExec {
/// Input plan
/// Input plan with sorted partitions
input: Arc<dyn ExecutionPlan>,
/// Sort expressions
expr: LexOrdering,
Expand All @@ -80,7 +92,9 @@ pub struct SortPreservingMergeExec {
fetch: Option<usize>,
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Configuration parameter to enable round-robin selection of tied winners of loser tree.
/// Use round-robin selection of tied winners of loser tree
///
/// See [`Self::with_round_robin_repartition`] for more information.
enable_round_robin_repartition: bool,
}

Expand All @@ -105,6 +119,14 @@ impl SortPreservingMergeExec {
}

/// Sets the selection strategy of tied winners of the loser tree algorithm
///
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the actual public facing API descriptin change

/// If true (the default) equal output rows are placed in the merged stream
/// in round robin fashion. This approach consumes input streams at more
/// even rates when there are many rows with the same sort key.
///
/// If false, equal output rows are always placed in the merged stream in
/// the order of the inputs, resulting in potentially slower execution but a
/// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
Expand All @@ -128,7 +150,8 @@ impl SortPreservingMergeExec {
self.fetch
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
/// Creates the cache object that stores the plan properties
/// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! This is an order-preserving merge.

use crate::metrics::BaselineMetrics;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::sorts::{
merge::SortPreservingMergeStream,
stream::{FieldCursorStream, RowCursorStream},
Expand Down Expand Up @@ -120,6 +121,8 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}

/// See [SortPreservingMergeExec::with_round_robin_repartition] for more
/// information.
pub fn with_round_robin_tie_breaker(
mut self,
enable_round_robin_tie_breaker: bool,
Expand Down
Loading