Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not add redundant subquery ordering into plan #30

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d5d6cda
do not add redundant subquery ordering into plan
mertak-synnada Aug 6, 2024
eeabaf1
format code
mertak-synnada Aug 6, 2024
85e23e7
add license
mertak-synnada Aug 6, 2024
bd63098
fix test cases with sort plan removing
mertak-synnada Aug 6, 2024
930c204
fix comment
mertak-synnada Aug 6, 2024
9d5f875
keep sorting on ordering mode test cases
mertak-synnada Aug 6, 2024
2d40a8d
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
b6bc6d4
protect test intentions with order + limit
mertak-synnada Aug 6, 2024
510b16c
Tmp
mustafasrepo Aug 7, 2024
6ef4369
Minor changes
mustafasrepo Aug 7, 2024
c3efafc
Minor changes
mustafasrepo Aug 7, 2024
2d1b48f
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
2bf220d
Minor changes
mustafasrepo Aug 7, 2024
eb83917
Implement top down recursion with delete check
mustafasrepo Aug 7, 2024
0b66b15
Minor changes
mustafasrepo Aug 7, 2024
9d3a972
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
c769f9f
Minor changes
mustafasrepo Aug 7, 2024
07dca3a
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 7, 2024
9192ca9
initialize fetch() api for execution plan
mertak-synnada Aug 7, 2024
0ad7063
Address reviews
mustafasrepo Aug 7, 2024
3661f06
Update comments
mustafasrepo Aug 7, 2024
60967c1
Minor changes
mustafasrepo Aug 7, 2024
6b87c4c
Make test deterministic
mustafasrepo Aug 7, 2024
a029d6f
add supports limit push down to union exec
mertak-synnada Aug 8, 2024
74041e7
support limit push down with multi children cases
mertak-synnada Aug 8, 2024
1d73ddb
fix typos
mertak-synnada Aug 8, 2024
8dd7e0a
Add fetch info to the statistics
mustafasrepo Aug 8, 2024
23a33df
optimize tpch test plans
mertak-synnada Aug 8, 2024
15423ae
Enforce distribution use inexact count estimate also.
mustafasrepo Aug 8, 2024
94fb83d
Minor changes
mustafasrepo Aug 8, 2024
9053b9f
Minor changes
mustafasrepo Aug 8, 2024
54fc4b2
Merge remote-tracking branch 'refs/remotes/origin/bug_fix/enforce_sor…
mertak-synnada Aug 8, 2024
501f403
Merge branch 'refs/heads/bug_fix/enforce_sorting' into feature/Sort-F…
mertak-synnada Aug 8, 2024
ebda00a
Merge remote-tracking branch 'refs/remotes/origin/apache_main' into f…
mertak-synnada Aug 14, 2024
27342ff
merge with apache main
mertak-synnada Aug 14, 2024
1d04db8
format code
mertak-synnada Aug 14, 2024
ec67b36
fix doc paths
mertak-synnada Aug 14, 2024
4564f4b
fix doc paths
mertak-synnada Aug 14, 2024
8e7d1df
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 14, 2024
b139b02
remove redundant code block
mertak-synnada Aug 14, 2024
4472e15
if partition count is 1 put GlobalLimitExec
mertak-synnada Aug 14, 2024
eb96912
fix test cases
mertak-synnada Aug 14, 2024
128676e
Apply suggestions from code review
ozankabak Aug 14, 2024
782487c
fix syntax errors
mertak-synnada Aug 14, 2024
ff227a2
Simplify branches
ozankabak Aug 15, 2024
07820cc
Merge branch 'refs/heads/apache_main' into feature/Sort-For-Subqueries
mertak-synnada Aug 19, 2024
c6a9abc
remove redundant limit plans from merge
mertak-synnada Aug 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

mod column;
mod dfschema;
mod error;
mod functional_dependencies;
mod join_type;
mod param_value;
Expand All @@ -33,6 +32,7 @@ pub mod alias;
pub mod cast;
pub mod config;
pub mod display;
pub mod error;
pub mod file_options;
pub mod format;
pub mod hash_utils;
Expand Down
26 changes: 12 additions & 14 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3046,13 +3046,12 @@ mod tests {
assert_eq!(
"\
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
format!("{}", df_with_column.clone().into_optimized_plan()?)
);

Expand Down Expand Up @@ -3240,13 +3239,12 @@ mod tests {

assert_eq!("\
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{}", df_renamed.clone().into_optimized_plan()?)
);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ impl ExecutionPlan for ArrowExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ExecutionPlan for AvroExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ impl ExecutionPlan for CsvExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ impl ExecutionPlan for NdJsonExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,10 @@ impl ExecutionPlan for ParquetExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
15 changes: 12 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::limit::LocalLimitExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::ExecutionPlanProperties;
Expand Down Expand Up @@ -405,7 +405,16 @@ fn analyze_immediate_sort_removal(
node.children = node.children.swap_remove(0).children;
if let Some(fetch) = sort_exec.fetch() {
// If the sort has a fetch, we need to add a limit:
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
if sort_exec
.properties()
.output_partitioning()
.partition_count()
== 1
{
Arc::new(GlobalLimitExec::new(sort_input.clone(), 0, Some(fetch)))
} else {
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch))
}
} else {
sort_input.clone()
}
Expand Down Expand Up @@ -1124,7 +1133,7 @@ mod tests {
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=2",
"GlobalLimitExec: skip=0, fetch=2",
" SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2177,9 +2177,6 @@ mod tests {
assert!(format!("{plan:?}").contains("GlobalLimitExec"));
assert!(format!("{plan:?}").contains("skip: 3, fetch: Some(5)"));

// LocalLimitExec adjusts the `fetch`
assert!(format!("{plan:?}").contains("LocalLimitExec"));
assert!(format!("{plan:?}").contains("fetch: 8"));
Ok(())
}

Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,15 @@ async fn sort_preserving_merge() {
// SortPreservingMergeExec (not a Sort which would compete
// with the SortPreservingMergeExec for memory)
&[
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | LocalLimitExec: fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
]
)
.run()
Expand Down
80 changes: 72 additions & 8 deletions datafusion/core/tests/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::physical_optimizer::limit_pushdown::LimitPushdown;
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::config::ConfigOptions;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::BinaryExpr;
use datafusion_physical_expr::expressions::{col, lit};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -31,8 +32,10 @@ use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::projection::ProjectionExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan};
use datafusion_physical_plan::{get_plan_string, ExecutionPlan, ExecutionPlanProperties};
use std::sync::Arc;

struct DummyStreamPartition {
Expand Down Expand Up @@ -201,6 +204,52 @@ fn pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batc
Ok(())
}

#[test]
fn pushes_global_limit_into_multiple_fetch_plans() -> datafusion_common::Result<()> {
let schema = create_schema();
let streaming_table = streaming_table_exec(schema.clone()).unwrap();
let coalesce_batches = coalesce_batches_exec(streaming_table);
let projection = projection_exec(schema.clone(), coalesce_batches)?;
let repartition = repartition_exec(projection)?;
let sort = sort_exec(
vec![PhysicalSortExpr {
expr: col("c1", &schema)?,
options: SortOptions::default(),
}],
repartition,
);
let spm = sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort);
let global_limit = global_limit_exec(spm, 0, Some(5));

let initial = get_plan_string(&global_limit);
let expected_initial = [
"GlobalLimitExec: skip=0, fetch=5",
" SortPreservingMergeExec: [c1@0 ASC]",
" SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];

assert_eq!(initial, expected_initial);

let after_optimize =
LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;

let expected = [
"SortPreservingMergeExec: [c1@0 ASC], fetch=5",
" SortExec: TopK(fetch=5), expr=[c1@0 ASC], preserve_partitioning=[false]",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
" CoalesceBatchesExec: target_batch_size=8192",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Ok(())
}

#[test]
fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
) -> datafusion_common::Result<()> {
Expand All @@ -227,10 +276,9 @@ fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
let expected = [
"GlobalLimitExec: skip=0, fetch=5",
" CoalescePartitionsExec",
" LocalLimitExec: fetch=5",
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
" FilterExec: c3@2 > 0",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true"
];
assert_eq!(get_plan_string(&after_optimize), expected);

Expand All @@ -256,7 +304,7 @@ fn merges_local_limit_with_local_limit() -> datafusion_common::Result<()> {
let after_optimize =
LimitPushdown::new().optimize(parent_local_limit, &ConfigOptions::new())?;

let expected = ["LocalLimitExec: fetch=10", " EmptyExec"];
let expected = ["GlobalLimitExec: skip=0, fetch=10", " EmptyExec"];
assert_eq!(get_plan_string(&after_optimize), expected);

Ok(())
Expand Down Expand Up @@ -375,6 +423,22 @@ fn local_limit_exec(
Arc::new(LocalLimitExec::new(input, fetch))
}

fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::new(sort_exprs, input))
}

fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}

fn projection_exec(
schema: SchemaRef,
input: Arc<dyn ExecutionPlan>,
Expand Down
28 changes: 11 additions & 17 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
"GlobalLimitExec: skip=0, fetch=3, ",
"metrics=[output_rows=1, elapsed_compute="
);
assert_metrics!(
&formatted,
"LocalLimitExec: fetch=3",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
Expand Down Expand Up @@ -612,18 +607,17 @@ async fn test_physical_plan_display_indent() {
let dataframe = ctx.sql(sql).await.unwrap();
let physical_plan = dataframe.create_physical_plan().await.unwrap();
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
"SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
];

let normalizer = ExplainNormalizer::new();
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ impl OptimizerRule for PushDownLimit {
Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
};
if new_fetch == sort.fetch {
original_limit(skip, fetch, LogicalPlan::Sort(sort))
if skip > 0 {
original_limit(skip, fetch, LogicalPlan::Sort(sort))
} else {
Ok(Transformed::yes(LogicalPlan::Sort(sort)))
}
} else {
sort.fetch = new_fetch;
limit.input = Arc::new(LogicalPlan::Sort(sort));
Expand Down
Loading