Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove PartiallyOrdered handling from BoundedWindowAggExec #11

Open
wants to merge 24 commits into
base: apache_main
Choose a base branch
from

Conversation

mustafasrepo
Copy link
Collaborator

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@mustafasrepo
Copy link
Collaborator Author

mustafasrepo commented Mar 15, 2024

According to comparison this behaviour harms performance. I did compare following plans

"AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, d@3 as d], aggr=[sum1], ordering_mode=Sorted",
"  PartialSortExec: expr=[a@0 ASC,b@1 ASC,d@3 ASC], common_prefix_length=[2]",
"    MemoryExec: partitions=1, partition_sizes=[61], output_ordering=a@0 ASC,b@1 ASC,c@2 ASC",

and

"AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b, d@3 as d], aggr=[sum1], ordering_mode=PartiallySorted([0, 1])",
"  MemoryExec: partitions=1, partition_sizes=[61], output_ordering=a@0 ASC,b@1 ASC,c@2 ASC",

according to tests performance of the second version is better. Hence, this change is not beneficial in terms of performance.

I did another set of tests with window. It seems that, this change is beneficial for window functions
following plan:

"BoundedWindowAggExec: wdw=[count(x) PB([\"a\", \"c\"]), OB:[\"b\"]: Ok(Field { name: \"count(x) PB([\\\"a\\\", \\\"c\\\"]), OB:[\\\"b\\\"]\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[Sorted]",
"  PartialSortExec: expr=[a@0 ASC,c@2 ASC,b@1 ASC], common_prefix_length=[1]",
"    MemoryExec: partitions=1, partition_sizes=[80], output_ordering=a@0 ASC,b@1 ASC,c@2 ASC",

executes faster than

"BoundedWindowAggExec: wdw=[count(x) PB([\"a\", \"c\"]), OB:[\"b\"]: Ok(Field { name: \"count(x) PB([\\\"a\\\", \\\"c\\\"]), OB:[\\\"b\\\"]\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: CurrentRow, is_causal: true }], mode=[PartiallySorted([0])]",
"  MemoryExec: partitions=1, partition_sizes=[80], output_ordering=a@0 ASC,b@1 ASC,c@2 ASC",

@mustafasrepo mustafasrepo changed the title Remove PartiallyOrdered handling from BoundedWindowAggExec and AggregateExec Remove PartiallyOrdered handling from BoundedWindowAggExec Mar 18, 2024
}

fn is_mode_linear(&self) -> bool {
self.ordered_partition_by_indices.is_empty()
true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

@@ -2933,6 +2933,27 @@ LOCATION '../core/tests/data/window_2.csv';

# test_infinite_source_partition_by

query TT
EXPLAIN SELECT a, b, c,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance of this test may be checked.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I compared the performance of this test with following plan1:

"ProjectionExec: expr=[a@0 as a, b@1 as b, x@3 as x, SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@4 as sum1]",
"  BoundedWindowAggExec: wdw=[SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: \"SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[PartiallySorted([0])]",
"    StreamingTableExec: partition_sizes=1, projection=[a, b, c, x], infinite_source=true, output_ordering=[a@0 ASC, b@1 ASC, c@2 ASC]",

and plan2:

"PartialSortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], common_prefix_length=[1]",
"  ProjectionExec: expr=[a@0 as a, b@1 as b, x@3 as x, SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING@4 as sum1]",
"    BoundedWindowAggExec: wdw=[SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: \"SUM(source.c) PARTITION BY [source.a, source.x] ORDER BY [source.b ASC NULLS LAST, source.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)), is_causal: false }], mode=[Sorted]",
"      PartialSortExec: expr=[a@0 ASC,x@3 ASC,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST], common_prefix_length=[1]",
"        StreamingTableExec: partition_sizes=1, projection=[a, b, c, x], infinite_source=true, output_ordering=[a@0 ASC, b@1 ASC, c@2 ASC]",

results are:

  single chunk multi chunk
plan1 41.968927ms 79.660012ms
plan2 39.103565ms 42.766507ms
It seems that we benefit from the coalesce effect of the PartialSortExec in the plans. However, when batch size is large benefit is marginal.

@ozankabak
Copy link
Collaborator

Since the benefit is marginal, let's keep this one the backburner for a while and revisit later.

mustafasrepo pushed a commit that referenced this pull request Jul 17, 2024
… `interval` (apache#11466)

* Unparser rule for datatime cast (#10)

* use timestamp as the identifier for date64

* rename

* implement CustomDialectBuilder

* fix

* dialect with interval style (#11)

---------

Co-authored-by: Phillip LeBlanc <[email protected]>

* fmt

* clippy

* doc

* Update datafusion/sql/src/unparser/expr.rs

Co-authored-by: Andrew Lamb <[email protected]>

* update the doc for CustomDialectBuilder

* fix doc test

---------

Co-authored-by: Phillip LeBlanc <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants