Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GroupColumn Decimal128Array #13564

Merged
merged 10 commits into from
Dec 4, 2024
Merged

Conversation

jonathanc-n
Copy link
Contributor

Which issue does this PR close?

Closes #13505.

Rationale for this change

What changes are included in this PR?

Added group column for Decimal128Array

Are these changes tested?

Yes, slt tests.

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt) labels Nov 26, 2024
@github-actions github-actions bot added the core Core DataFusion crate label Nov 26, 2024
// Set timezone information for timestamp
Arc::new(arr.with_data_type(data_type))
adjust_output_array(&data_type, array_ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need adjust_output_array then I think we don't need with_data_type to set the timezone information since it is handled by adjust_output_array as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The precision and scale aren't kept in the generic when constructing the buffer, so i think i need t keep with_data_type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is kept in data_type. When you run adjust_output_array, the precision of Decimal should be set with with_precision_and_scale.

pub fn adjust_output_array(data_type: &DataType, array: ArrayRef) -> Result<ArrayRef> {
    let array = match data_type {
        DataType::Decimal128(p, s) => Arc::new(
            array
                .as_primitive::<Decimal128Type>()
                .clone()
                .with_precision_and_scale(*p, *s)?,
        ) as ArrayRef,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that is what I thought as well but in ea6f77a it fails due to precision/scale errors due to the original array not having .with_data_type on it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error you mentioned can be fixed by adding support for Decimal256

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried backing out the use of adjust_data_type and I didn't see any failures locally 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on this @jonathanc-n -- your code seems good but something is going wonky with the data types:

  1. the fuzz test is failing intermittently
  2. The extra adjustment of output type seems unecessary 🤔

@@ -87,7 +87,12 @@ impl DatasetGeneratorConfig {
.iter()
.filter_map(|d| {
if d.column_type.is_numeric()
&& !matches!(d.column_type, DataType::Float32 | DataType::Float64)
&& !matches!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong here. This change effectively turns off fuzz testing for sum with decimal:

When I reverted this change the fuzz tests fail occasionally like this:

test fuzz_cases::aggregate_fuzz::test_sum ... FAILED
...
Arrow error: Invalid argument error: column types must match schema types, expected Decimal128(21, -112) but found Decimal128(38, 10) at column index 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective other than this change, this PR is ready to go.

Thank you @jonathanc-n

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thanks for the tip, I reverted that change! For the test_sum fuzz test, I removed it the same way I did for the Float types due to casting to a DateType. This was the error I got after executing with a backtrace (many more of this same error): ERROR: Cast error: Failed to convert 39087111289254881.41 to datetime for Timestamp(Millisecond, None)

I'm getting this, should I still remove it?

Copy link
Contributor

@jayzhan211 jayzhan211 Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real issue is that we have decimal(38, 10) which is the fixed precision for sum and it mismatches with the fuzz test which has random precision

    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
        match &arg_types[0] {
            DataType::Int64 => Ok(DataType::Int64),
            DataType::UInt64 => Ok(DataType::UInt64),
            DataType::Float64 => Ok(DataType::Float64),
            DataType::Decimal128(precision, scale) => {
                // in the spark, the result type is DECIMAL(min(38,precision+10), s)
                // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66
                let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 10);
                Ok(DataType::Decimal128(new_precision, *scale))
            }
            DataType::Decimal256(precision, scale) => {
                // in the spark, the result type is DECIMAL(min(38,precision+10), s)
                // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66
                let new_precision = DECIMAL256_MAX_PRECISION.min(*precision + 10);
                Ok(DataType::Decimal256(new_precision, *scale))
            }
            other => {
                exec_err!("[return_type] SUM not supported for {}", other)
            }
        }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimal128(38, 3) for normal precision, Decimal128(30, 3) for grouping. Not sure why there is mismatch in fuzz test. We should either align the precision for both cases or fix the fuzz schema check if they are not necessary to have the same precision like slt

query TT
select arrow_typeof(sum(column1)), arrow_typeof(sum(distinct column1)) from t group by column2;
----
Decimal128(38, 3) Decimal128(30, 3)
Decimal128(38, 3) Decimal128(30, 3)

query TT
explain select sum(column1), sum(distinct column1) from t group by column2;
----
logical_plan
01)Projection: sum(alias2) AS sum(t.column1), sum(alias1) AS sum(DISTINCT t.column1)
02)--Aggregate: groupBy=[[t.column2]], aggr=[[sum(alias2), sum(alias1)]]
03)----Aggregate: groupBy=[[t.column2, t.column1 AS alias1]], aggr=[[sum(t.column1) AS alias2]]
04)------TableScan: t projection=[column1, column2]
physical_plan
01)ProjectionExec: expr=[sum(alias2)@1 as sum(t.column1), sum(alias1)@2 as sum(DISTINCT t.column1)]
02)--AggregateExec: mode=FinalPartitioned, gby=[column2@0 as column2], aggr=[sum(alias2), sum(alias1)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([column2@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[column2@0 as column2], aggr=[sum(alias2), sum(alias1)]
06)----------AggregateExec: mode=FinalPartitioned, gby=[column2@0 as column2, alias1@1 as alias1], aggr=[alias2]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([column2@0, alias1@1], 4), input_partitions=4
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------AggregateExec: mode=Partial, gby=[column2@1 as column2, column1@0 as alias1], aggr=[alias2]
11)--------------------MemoryExec: partitions=1, partition_sizes=[1]

// Set timezone information for timestamp
Arc::new(arr.with_data_type(data_type))
adjust_output_array(&data_type, array_ref)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried backing out the use of adjust_data_type and I didn't see any failures locally 🤔

@alamb
Copy link
Contributor

alamb commented Nov 27, 2024

Here is a PR to deprecate adjust_output_array: #13585

@jonathanc-n
Copy link
Contributor Author

@alamb thanks for the tip, I reverted that change! For the test_sum fuzz test, I removed it the same way I did for the Float types due to casting to a DateType. This was the error I got after executing with a backtrace (many more of this same error):
ERROR: Cast error: Failed to convert 39087111289254881.41 to datetime for Timestamp(Millisecond, None)

jayzhan211
jayzhan211 previously approved these changes Nov 28, 2024
@jayzhan211
Copy link
Contributor

For the test_sum fuzz test, I removed it the same way I did for the Float types due to casting to a DateType

I didn't see the removal you mentioned, but the change looks better to me now

@jayzhan211 jayzhan211 dismissed their stale review December 2, 2024 12:01

Decimal128 support for DatasetGeneratorConfig seems incorrect

@alamb
Copy link
Contributor

alamb commented Dec 2, 2024

@alamb thanks for the tip, I reverted that change! For the test_sum fuzz test, I removed it the same way I did for the Float types due to casting to a DateType. This was the error I got after executing with a backtrace (many more of this same error): ERROR: Cast error: Failed to convert 39087111289254881.41 to datetime for Timestamp(Millisecond, None)

I think I found the issue (thanks @jayzhan211 for the sluthing):

(also shout out to @Rachelint for writing the first version of these fuzz testers. We just avoided a potential bug with it!)

@github-actions github-actions bot removed the core Core DataFusion crate label Dec 3, 2024
@jonathanc-n
Copy link
Contributor Author

@alamb @jayzhan211 Thanks for the reviews and changes!

Copy link
Contributor

@jayzhan211 jayzhan211 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n

@jayzhan211 jayzhan211 merged commit 143ef97 into apache:main Dec 4, 2024
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement GroupColumn Decimal128Array
3 participants