Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Add Monotonic Definition #59

Closed
wants to merge 68 commits into from

Conversation

mertak-synnada
Copy link

Which issue does this PR close?

Closes #.

Rationale for this change

This PR creates a definition of set-monotonicity for Aggregate expressions. Some aggregation functions create ordered results by definition (such as count, min, max). With this PR, we're adding this information to the output ordering and be able to remove some SortExecs while optimizing

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

let idx = schema.index_of(self.name()).unwrap_or(0);
let expr = Arc::new(Column::new(self.name(), idx));

let options = SortOptions::new(!is_ascending, false);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assumed Nulls Last by definition, I'm not fully sure about this will be as is in the future as well

/// Returns None if the function is not monotonic.
/// If the function is monotonically decreasing returns Some(false) e.g. Min
/// If the function is monotonically increasing returns Some(true) e.g. Max
fn is_monotonic(&self) -> Option<bool> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will need to accept a data_type argument here since Sums monotonicity depends on it.

Copy link

@jayzhan-synnada jayzhan-synnada Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add it first so we don't have to break API again in the future

pub fn natural_sort_expr(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
// If the aggregate expressions are monotonic, the output data is naturally ordered with it.
let is_ascending = self.is_monotonic()?;
let idx = schema.index_of(self.name()).unwrap_or(0);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the way we get the expr here but couldn't find a better way

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Result for this function and not return incorrect 0 index seems better

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to implement so but got too many errors on tests, the expression name does not always match the actual column name. But there should not be any need for us to search with name, I'll try to get the actual index to here and insert

11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
12)----------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true
03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], ordering_mode=PartiallySorted([1])
04)------SortExec: expr=[min(aggregate_test_100.c1)@1 ASC NULLS LAST], preserve_partitioning=[true]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SortExec is being added, because there's another logic on AggregateExec's partition_by values. The parent AggregateExec is adding the min column as an ordering requirement, and since prefer_existing_sort is false by default RepartitionExec does not maintain input order.

@alihan-synnada
Copy link

Small nit: is_monotonic returning Option<bool> is a little confusing. The documentation is clear about what the returned value means so it isn't a huge deal. I don't want to waste too much time bikeshedding but here are a few alternatives:

  • is_monotonically_increasing() sounds more descriptive but it's rather long
  • monotonicity() could return an enum with 3 variants Monotonicity(NonDecreasing, NonIncreasing, NonMonotonic)

LGTM otherwise 👍

];
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AggregateMode::Final,
AggregateMode::Single,

Final is used in 2-step aggregation, should be Single here

let expected_input = [
"SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=Final, gby=[], aggr=[avg]",
" SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess avg is monotonic for integer but not for float?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, it's not monotonic for unsigned integers, for example:

1st data 100 => Avg 100
2nd data 2 => (102/2) Avg 51 (decreased)
3rd data 3 => (105/3) Avg 35 (decreased)
4th data 1003 => (1108/4) Avg 277 (increased)

To be monotonic, it needs to be always increasing/decreasing

pub fn natural_sort_expr(&self, schema: &SchemaRef) -> Option<PhysicalSortExpr> {
// If the aggregate expressions are monotonic, the output data is naturally ordered with it.
let is_ascending = self.is_monotonic()?;
let idx = schema.index_of(self.name()).unwrap_or(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Result for this function and not return incorrect 0 index seems better

];
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AggregateMode::Final,
AggregateMode::Single,

pub fn add_equal_orderings(&self, eq_properties: &mut EquivalenceProperties) {
let Some(expr) = self
.get_aggregate_expr()
.natural_sort_expr(eq_properties.schema())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anyone know why schema is inside EquivalenceProperties but not PlanProperties 🤔 or even not storing them inside both of them but pass as a parameter instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️ Have no idea, but I'll remove the schema need anyways

# Conflicts:
#	datafusion/core/src/physical_optimizer/enforce_sorting.rs
#	datafusion/physical-optimizer/src/test_utils.rs
separate stubs and count_udafs
@github-actions github-actions bot removed the core label Jan 23, 2025
change monotonicity to return an Enum rather than Option<bool>
fix indices
re-add monotonicity tests
@github-actions github-actions bot added the core label Jan 24, 2025
Chen-Yuan-Lai and others added 10 commits January 29, 2025 15:48
…gle_group_by (apache#14360)

* refactor: switch BooleanBufferBuilder to NullBufferBuilder in single_group_by

* Update datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

---------

Co-authored-by: Cheng-Yuan-Lai <a186235@g,ail.com>
Co-authored-by: Andrew Lamb <[email protected]>
…pache#14265)

* chore(deps): bump rustyline from 14.0.0 to 15.0.0 in /datafusion-cli

Bumps [rustyline](https://github.com/kkawakam/rustyline) from 14.0.0 to 15.0.0.
- [Release notes](https://github.com/kkawakam/rustyline/releases)
- [Changelog](https://github.com/kkawakam/rustyline/blob/master/History.md)
- [Commits](kkawakam/rustyline@v14.0.0...v15.0.0)

---
updated-dependencies:
- dependency-name: rustyline
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>

* fix build

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: jonahgao <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
apache#14263)

Bumps [aws-sdk-sts](https://github.com/awslabs/aws-sdk-rust) from 1.51.0 to 1.57.0.
- [Release notes](https://github.com/awslabs/aws-sdk-rust/releases)
- [Commits](https://github.com/awslabs/aws-sdk-rust/commits)

---
updated-dependencies:
- dependency-name: aws-sdk-sts
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Andrew Lamb <[email protected]>
apache#14259)

Bumps [aws-sdk-sso](https://github.com/awslabs/aws-sdk-rust) from 1.50.0 to 1.56.0.
- [Release notes](https://github.com/awslabs/aws-sdk-rust/releases)
- [Commits](https://github.com/awslabs/aws-sdk-rust/commits)

---
updated-dependencies:
- dependency-name: aws-sdk-sso
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Andrew Lamb <[email protected]>
Bumps [korandoru/hawkeye](https://github.com/korandoru/hawkeye) from 5 to 6.
- [Release notes](https://github.com/korandoru/hawkeye/releases)
- [Changelog](https://github.com/korandoru/hawkeye/blob/main/CHANGELOG.md)
- [Commits](korandoru/hawkeye@v5...v6)

---
updated-dependencies:
- dependency-name: korandoru/hawkeye
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@ozankabak ozankabak force-pushed the feature/monotonic-sets branch from d7e3135 to 5e9b2db Compare January 30, 2025 12:54
dependabot bot and others added 9 commits January 30, 2025 09:14
Bumps [aws-sdk-ssooidc](https://github.com/awslabs/aws-sdk-rust) from 1.57.1 to 1.58.0.
- [Release notes](https://github.com/awslabs/aws-sdk-rust/releases)
- [Commits](https://github.com/awslabs/aws-sdk-rust/commits)

---
updated-dependencies:
- dependency-name: aws-sdk-ssooidc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…e#14359)

* Minor: include the number of files run in sqllogictest display

* fmt

* Update datafusion/sqllogictest/bin/sqllogictests.rs
* move information_schema to datafusion-catalog

* fix: formatting

* fix: doctests import

* Remove unecessary datafuson-catalog dependency

* remove some more unecessary dependencies

* Update datafusion-cli/Carglo.ock

* fix: doctest dependency

---------

Co-authored-by: Andrew Lamb <[email protected]>
* Fix incorrect searched CASE optimization

There is an optimization for searched CASE where values are of boolean
type. It was converting the expression like

    CASE
        WHEN X THEN A
        WHEN Y THEN B
        ..
        [ ELSE D ]
    END

into

    (X AND A)
        OR (Y AND NOT X AND B)
        [ OR (NOT (X OR Y) AND D) ]

This had the following problems

- does not work for nullable conditions. If X is nullable, we cannot use
  NOT (X) to compliment it. We need to use `X IS DISTINCT FROM true`
- it does not work correctly when some conditions are nullable and other
  values are false. E.g. X=NULL, A=true, Y=NULL, B=true, D=false, the
  CASE should return false, but the boolean expression will simplify to
  `(NULL AND ..) OR (NULL AND ..) OR (false)` which is NULL, not false
  - thus we use `X` for truthness check of `X`, we need to test `X IS
    NOT DISTINCT FROM true`
- it did not work correctly when default D is missing, but conditions
  do not evaluate to NULL. CASE's result should be NULL but was false.

This commit fixes that optimization.

* Fix complexity comment
…apache#13681)

* draft of `MedianGroupAccumulator`.

* impl `state`.

* impl rest methods of `MedianGroupsAccumulator`.

* improve comments.

* use `MedianGroupsAccumulator`.

* remove unused import.

* add `group_median_table` to test group median.

* complete group median test cases in aggregate slt.

* fix type of state.

* Clippy

* Fmt

* add fuzzy tests for median.

* fix decimal.

* fix clippy.

* improve comments.

* add median cases with nulls.

* Update datafusion/functions-aggregate/src/median.rs

Co-authored-by: Andrew Lamb <[email protected]>

* use `OffsetBuffer::new_unchecked` in `convert_to_state`.

* add todo.

* remove assert and switch to i32 try from.

* return error when try from failed.

---------

Co-authored-by: Daniël Heres <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
@berkaysynnada
Copy link
Collaborator

apache#14271 merged, closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.