diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index d2b79d57d1be..458c1c29c0cf 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -445,9 +445,14 @@ impl SortPreservingMergeStream { self.update_winner(cmp_node, winner, challenger); } } else if challenger < *winner { - // If the winner doesn't survive in the final match, it means the value has changed. - // The polls count are outdated (because the value advanced) but not yet cleaned-up at this point. - // Given the value is equal, we choose the smaller index if the value is the same. + // If the winner doesn’t survive in the final match, it indicates that the original winner + // has moved up in value, so the challenger now becomes the new winner. + // This also means that we’re in a new round of the tie breaker, + // and the polls count is outdated (though not yet cleaned up). + // + // By the time we reach this code, both the new winner and the current challenger + // have the same value, and neither has an updated polls count. + // Therefore, we simply select the one with the smaller index. self.update_winner(cmp_node, winner, challenger); } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 019f3b2d59f2..caae5c5598b6 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -357,6 +357,8 @@ mod tests { use futures::{FutureExt, Stream, StreamExt}; use tokio::time::timeout; + // The number in the function is highly related to the memory limit we are testing + // any change of the constant should be aware of fn generate_task_ctx_for_round_robin_tie_breaker() -> Result> { let runtime = RuntimeEnvBuilder::new() .with_memory_limit(20_000_000, 1.0) @@ -367,6 +369,8 @@ mod tests { .with_session_config(config); Ok(Arc::new(task_ctx)) } + // The number in the function is highly related to the memory limit we are testing, + // any change of the constant should be aware of fn generate_spm_for_round_robin_tie_breaker( enable_round_robin_repartition: bool, ) -> Result> {