From 55cc4d806c01fa1698f726920f3818e320e04a10 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 07:12:04 -0500 Subject: [PATCH 1/5] Clarify SortPreservingMerge::enable_round_robin_repartition docs --- .../src/sorts/sort_preserving_merge.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 906164f21b8c..3f225dfadfb5 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -55,8 +55,8 @@ use log::{debug, trace}; /// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘ /// │ ╔═══╦═══╗ │ │ /// │ ║ B ║ E ║ ... │──┘ │ -/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream -/// └─────────────────────────┘ places equal rows from stream 1 +/// │ ╚═══╩═══╝ │ Stable sort if `enable_round_robin_repartition=false`: +/// └─────────────────────────┘ the merged stream places equal rows from stream 1 /// Stream 2 /// /// @@ -80,7 +80,9 @@ pub struct SortPreservingMergeExec { fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, - /// Configuration parameter to enable round-robin selection of tied winners of loser tree. + /// Use round-robin selection of tied winners of loser tree + /// + /// See [`Self::with_round_robin_repartition`] for more information. enable_round_robin_repartition: bool, } @@ -105,6 +107,14 @@ impl SortPreservingMergeExec { } /// Sets the selection strategy of tied winners of the loser tree algorithm + /// + /// When true (the default) equal output rows are placed in the merged + /// stream when ready, which is faster but not stable (can vary from + /// run to run). + /// + /// If false, equal output rows are placed in the merged stream in the order + /// of the inputs, resulting in potentially slower execution but in a stable + /// output order. pub fn with_round_robin_repartition( mut self, enable_round_robin_repartition: bool, @@ -128,7 +138,8 @@ impl SortPreservingMergeExec { self.fetch } - /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + /// Creates the cache object that stores the plan properties + /// such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, ordering: LexOrdering, From 16e315a3cbde3f448c4d415b8ff55f03ceb68cb3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 18 Dec 2024 07:20:48 -0500 Subject: [PATCH 2/5] tweaks --- .../src/sorts/sort_preserving_merge.rs | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 3f225dfadfb5..a52f39182599 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Defines the sort preserving merge plan +//! [`SortPreservingMergeExec`] merges multiple sorted streams into one sorted stream. use std::any::Any; use std::sync::Arc; @@ -38,10 +38,22 @@ use log::{debug, trace}; /// Sort preserving merge execution plan /// -/// This takes an input execution plan and a list of sort expressions, and -/// provided each partition of the input plan is sorted with respect to -/// these sort expressions, this operator will yield a single partition -/// that is also sorted with respect to them +/// # Overview +/// +/// This operator implements a K-way merge. It is used to merge multiple sorted +/// streams into a single sorted stream and is highly optimized. +/// +/// ## Inputs: +/// +/// 1. A list of sort expressions +/// 2. An input plan, where each partition is sorted with respect to +/// these sort expressions. +/// +/// ## Output: +/// +/// 1. A single partition that is also sorted with respect to the expressions +/// +/// ## Diagram /// /// ```text /// ┌─────────────────────────┐ @@ -60,7 +72,7 @@ use log::{debug, trace}; /// Stream 2 /// /// -/// Input Streams Output stream +/// Input Partitions Output Partition /// (sorted) (sorted) /// ``` /// @@ -70,7 +82,7 @@ use log::{debug, trace}; /// the output and inputs are not polled again. #[derive(Debug, Clone)] pub struct SortPreservingMergeExec { - /// Input plan + /// Input plan with sorted partitions input: Arc, /// Sort expressions expr: LexOrdering, From 57fddea3c9b0450ed4b29d347bee8998671dd1dc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 19 Dec 2024 22:08:44 -0500 Subject: [PATCH 3/5] Improve comments more --- datafusion/physical-plan/src/sorts/merge.rs | 28 +++++++++++-------- .../src/sorts/sort_preserving_merge.rs | 12 ++++---- .../src/sorts/streaming_merge.rs | 3 ++ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 458c1c29c0cf..258e234b35c7 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -99,19 +99,25 @@ pub(crate) struct SortPreservingMergeStream { /// Configuration parameter to enable round-robin selection of tied winners of loser tree. /// - /// To address the issue of unbalanced polling between partitions due to tie-breakers being based - /// on partition index, especially in cases of low cardinality, we are making changes to the winner - /// selection mechanism. Previously, partitions with smaller indices were consistently chosen as the winners, - /// leading to an uneven distribution of polling. This caused upstream operator buffers for the other partitions - /// to grow excessively, as they continued receiving data without consuming it. + /// This option controls the tie-breaker strategy and attempts to avoid the + /// issue of unbalanced polling between partitions /// - /// For example, an upstream operator like a repartition execution would keep sending data to certain partitions, - /// but those partitions wouldn't consume the data if they weren't selected as winners. This resulted in inefficient buffer usage. + /// If `true`, when multiple partitions have the same value, the partition + /// that has the fewest poll counts is selected. This strategy ensures that + /// multiple partitions with the same value are chosen equally, distributing + /// the polling load in a round-robin fashion. This approach balances the + /// workload more effectively across partitions and avoids excessive buffer + /// growth. /// - /// To resolve this, we are modifying the tie-breaking logic. Instead of always choosing the partition with the smallest index, - /// we now select the partition that has the fewest poll counts for the same value. - /// This ensures that multiple partitions with the same value are chosen equally, distributing the polling load in a round-robin fashion. - /// This approach balances the workload more effectively across partitions and avoids excessive buffer growth. + /// if `false`, partitions with smaller indices are consistently chosen as + /// the winners, which can lead to an uneven distribution of polling and potentially + /// causing upstream operator buffers for the other partitions to grow + /// excessively, as they continued receiving data without consuming it. + /// + /// For example, an upstream operator like `RepartitonExec` execution would + /// keep sending data to certain partitions, but those partitions wouldn't + /// consume the data if they weren't selected as winners. This resulted in + /// inefficient buffer usage. enable_round_robin_tie_breaker: bool, /// Flag indicating whether we are in the mode of round-robin diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index a52f39182599..258762235159 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -120,13 +120,13 @@ impl SortPreservingMergeExec { /// Sets the selection strategy of tied winners of the loser tree algorithm /// - /// When true (the default) equal output rows are placed in the merged - /// stream when ready, which is faster but not stable (can vary from - /// run to run). + /// If true (the default) equal output rows are placed in the merged stream + /// in round robin fashion. This approach consumes input streams at more + /// even rates when there are many rows with the same sort key. /// - /// If false, equal output rows are placed in the merged stream in the order - /// of the inputs, resulting in potentially slower execution but in a stable - /// output order. + /// If false, equal output rows are always placed in the merged stream in + /// the order of the inputs, resulting in potentially slower execution but a + /// stable output order. pub fn with_round_robin_repartition( mut self, enable_round_robin_repartition: bool, diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 2178cc012a10..aa245a0ac2ca 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -19,6 +19,7 @@ //! This is an order-preserving merge. use crate::metrics::BaselineMetrics; +use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::sorts::{ merge::SortPreservingMergeStream, stream::{FieldCursorStream, RowCursorStream}, @@ -120,6 +121,8 @@ impl<'a> StreamingMergeBuilder<'a> { self } + /// See [SortPreservingMergeExec::with_round_robin_repartition] for more + /// information. pub fn with_round_robin_tie_breaker( mut self, enable_round_robin_tie_breaker: bool, From de10c08da2db916f3bbad20723d2afe8451e686f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Dec 2024 09:38:29 -0500 Subject: [PATCH 4/5] clippy --- datafusion/physical-plan/src/sorts/streaming_merge.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index aa245a0ac2ca..202134e8636a 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -19,7 +19,6 @@ //! This is an order-preserving merge. use crate::metrics::BaselineMetrics; -use crate::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::sorts::{ merge::SortPreservingMergeStream, stream::{FieldCursorStream, RowCursorStream}, From 19fa5233aafec0c94ced9b876e22e204e86e6bc6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Dec 2024 15:39:06 -0500 Subject: [PATCH 5/5] fix doc link --- datafusion/physical-plan/src/sorts/streaming_merge.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 202134e8636a..448d70760de1 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -122,6 +122,8 @@ impl<'a> StreamingMergeBuilder<'a> { /// See [SortPreservingMergeExec::with_round_robin_repartition] for more /// information. + /// + /// [SortPreservingMergeExec::with_round_robin_repartition]: crate::sorts::sort_preserving_merge::SortPreservingMergeExec::with_round_robin_repartition pub fn with_round_robin_tie_breaker( mut self, enable_round_robin_tie_breaker: bool,