Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add nullable in StateFieldArgs #11433

Closed
jcsherin opened this issue Jul 12, 2024 · 17 comments
Closed

Add nullable in StateFieldArgs #11433

jcsherin opened this issue Jul 12, 2024 · 17 comments

Comments

@jcsherin
Copy link
Contributor

jcsherin commented Jul 12, 2024

I think given the existing nth function, we should let nullable configurable. And, the nullability is actually for the list element. We should add nullable in StateFieldArgs.

let mut fields = vec![Field::new_list(
            format_state_name(self.name(), "nth_value"),
            Field::new("item", args.input_type.clone(), self.nullable),
            false)]

@eejbyfeldt is working on it in #11063

Originally posted by @jayzhan211 in #11287 (comment)

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 12, 2024

Hold on, I think it is outdated, since we found that most of the sql database returns null if no row qualified. #11299

@jcsherin
Copy link
Contributor Author

For comparison I ran a few queries from #11299 with nth_value. This looks right to me.

DataFusion CLI v40.0.0
> create table t(a int, b float, c bigint) as values (1, 1.2, 2);
0 row(s) fetched. 

> select nth_value(a, 1) from t where a > 2;
+-------------------------+
| nth_value(t.a,Int64(1)) |
+-------------------------+
|                         |
+-------------------------+
1 row(s) fetched.

> select nth_value(a, 1), count(1) from t where a > 3 group by a;
+-------------------------+-----------------+
| nth_value(t.a,Int64(1)) | count(Int64(1)) |
+-------------------------+-----------------+
+-------------------------+-----------------+
0 row(s) fetched.

> select nth_value(a, 1), count(1) from t where a > 3;
+-------------------------+-----------------+
| nth_value(t.a,Int64(1)) | count(Int64(1)) |
+-------------------------+-----------------+
|                         | 0               |
+-------------------------+-----------------+
1 row(s) fetched.

> select nth_value(a, 1 order by a desc), count(1) from t where a > 2;
+---------------------------------------------------------+-----------------+
| nth_value(t.a,Int64(1)) ORDER BY [t.a DESC NULLS FIRST] | count(Int64(1)) |
+---------------------------------------------------------+-----------------+
|                                                         | 0               |
+---------------------------------------------------------+-----------------+
1 row(s) fetched.   

@jcsherin
Copy link
Contributor Author

This comment is also outdated because of #11299.

// TODO: The nullability of the list element should be configurable.
// The hard-coded `true` should be changed once the field for
// nullability is added to `StateFieldArgs` struct.
// See: https://github.com/apache/datafusion/pull/11063

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 12, 2024

This one looks good to me too. But, it is nice to have a use case that is benefit on the nullability of the element (like the query optimized based on nullability)

let mut fields = vec![Field::new_list(
            format_state_name(self.name(), "nth_value"),
            Field::new("item", args.input_type.clone(), self.nullable),
            true)]

Otherwise for simplicity, we can assume it is nullable

let mut fields = vec![Field::new_list(
            format_state_name(self.name(), "nth_value"),
            Field::new("item", args.input_type.clone(), true),
            true)]

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 12, 2024

This comment is also outdated because of #11299.

// TODO: The nullability of the list element should be configurable.
// The hard-coded `true` should be changed once the field for
// nullability is added to `StateFieldArgs` struct.
// See: https://github.com/apache/datafusion/pull/11063

Oh, so actually the current code is incorrect 😕 I thought that is the one you proposed to change. But why the result is expected 🤔

@jcsherin
Copy link
Contributor Author

Sorry, I do not follow. Could you please elaborate on the incorrect part.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 12, 2024

Like what we have in #11299 . To returns null if row is not qualified, the nullable for list should be true, but the current code is false which is not what I thought of.

@jcsherin
Copy link
Contributor Author

For nth_value aggregation there are 3 possible states:

  1. The nth row exists and the value is returned.
  2. The nth row is out of range and NULL is returned.
  3. No rows qualified for aggregation and NULL is returned.

2 & 3 is covered by the else branch in Accumulator::evaluate. The result does not depend on nullable of list in the accumulator state.

if let Some(idx) = nth_value_idx {
Ok(self.values[idx].clone())
} else {
ScalarValue::try_from(self.datatypes[0].clone())
}
}

The nullable(Line 141) for the list is rendered no-op.

let mut fields = vec![Field::new_list(
format_state_name(self.name(), "nth_value"),
// TODO: The nullability of the list element should be configurable.
// The hard-coded `true` should be changed once the field for
// nullability is added to `StateFieldArgs` struct.
// See: https://github.com/apache/datafusion/pull/11063
Field::new("item", args.input_type.clone(), true),
false,
)];

This possibly explains passing tests. Thoughts?

@jayzhan211
Copy link
Contributor

My guess is because the query is not multiple phase aggregate so there is no result that check with state_field. Maybe we should figure out some complex query that has state result for each aggregate partition

@jcsherin
Copy link
Contributor Author

When adding trace statements in nth_value aggregate I can see that the following are executed in order:

  • update_batch()
  • state()
  • merge_batch()
  • evaluate()
  1. nth_value always exists - In the query below the 2nd item in C13 is always present because for all groups a..=e the count in each group is either 18, 19 or 21.
SELECT C1
       , COUNT(C1) as n
       , NTH_VALUE(C13, 2 ORDER BY C1, C13 ASC) as nth -- get 2nd row
    FROM aggregate_test_100
   GROUP BY C1
   ORDER BY C1;
+----+----+--------------------------------+
| c1 | n  | nth                            |
+----+----+--------------------------------+
| a  | 21 | Amn2K87Db5Es3dFQO9cw9cvpAM6h35 |
| b  | 19 | 6FPJlLAcaQ5uokyOWZ9HGdLZObFvOZ |
| c  | 21 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW |
| d  | 18 | 1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO |
| e  | 21 | 3BEOHQsMEFZ58VcNTOJYShTBpAPzbt |
+----+----+--------------------------------+
5 row(s) fetched.
  1. nth_value is sometime out of bounds - Since 'b' and 'd' groups have only 18 and 19 values, we can set N to be 20.
SELECT C1
       , COUNT(C1) as n
       , NTH_VALUE(C13, 20 ORDER BY C1, C13 ASC) as nth -- get 20th row
    FROM aggregate_test_100
   GROUP BY C1
   ORDER BY C1;
+----+----+--------------------------------+
| c1 | n  | nth                            |
+----+----+--------------------------------+
| a  | 21 | waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs |
| b  | 19 |                                |
| c  | 21 | pLk3i59bZwd5KBZrI1FiweYTd5hteG |
| d  | 18 |                                |
| e  | 21 | ukOiFGGFnQJDHFgZxHMpvhD3zybF0M |
+----+----+--------------------------------+
5 row(s) fetched.

Create Table

CREATE EXTERNAL TABLE aggregate_test_100 (
  c1  VARCHAR NOT NULL,
  c2  TINYINT NOT NULL,
  c3  SMALLINT NOT NULL,
  c4  SMALLINT,
  c5  INT,
  c6  BIGINT NOT NULL,
  c7  SMALLINT NOT NULL,
  c8  INT NOT NULL,
  c9  BIGINT UNSIGNED NOT NULL,
  c10 VARCHAR NOT NULL,
  c11 FLOAT NOT NULL,
  c12 DOUBLE NOT NULL,
  c13 VARCHAR NOT NULL
)
STORED AS CSV
-- path is relative to `datafusion-cli`
LOCATION '../testing/data/csv/aggregate_test_100.csv'
OPTIONS ('format.has_header' 'true');

@jcsherin
Copy link
Contributor Author

jcsherin commented Jul 15, 2024

This is a recent change introduced in #11093.

ScalarValue::List(ScalarValue::new_list_nullable(
values_slice,
&self.datatypes[0],
))

In state_fields the nullability is set to false. But inside Accumulator::evaluate() the nullability of the list is overridden to always be true.

It is surprising to me that the schema defined in state_fields doesn't match the Accumulator and still this works.

@jayzhan211
Copy link
Contributor

This is a recent change introduced in #11093.

ScalarValue::List(ScalarValue::new_list_nullable(
values_slice,
&self.datatypes[0],
))

In state_fields the nullability is set to false. But inside Accumulator::evaluate() the nullability of the list is overridden to always be true.

It is surprising to me that the schema defined in state_fields doesn't match the Accumulator and still this works.

Maybe the schema matching is not correct

@jcsherin
Copy link
Contributor Author

jcsherin commented Jul 18, 2024

I tested this out in multiple ways. Each time making sure it's multi-phase grouping. So that state fields are being used. This is the understanding I've reached about the correct nullable value for both the list and the items in the list:

  1. It is impossible for the intermediate state of values (the list) to ever be null. It can be empty, but never null. So the current setting of nullable: false for the list of values is correct.
  2. The nullable for items in the list need not be configurable(like I initially thought). The computation of nth row does not depend on the nullable defined in schema of the input column expression. It should remain set to true to allow null value to exist as an item in the list. Adding this field to StateFieldArgs for the sake of nth_value is unnecessary in my opinion.
let mut fields = vec![Field::new_list(
	format_state_name(self.name(), "nth_value"),
	Field::new("item", args.input_type.clone(), true), // See [2]
	false, // See [1]
)];

It will be beneficial to future readers of this code to add some comments here. It took me more time than I think was necessary to reach the conclusions stated above because of how they are now implicit in code.

But other than that the good news is no code changes are required.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 19, 2024

cc @eejbyfeldt and @alamb I think @jcsherin find something interesting that nullable for list element should be true in order to allow computing with null element. I think the idea here applies to array_agg too. We don't need configuarable null for list element and we could set nullable to false for list itself in state_fields (intermediate result), while true for list in fields (final result).

    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<Field>> {
        if args.is_distinct {
            return Ok(vec![Field::new_list(
                format_state_name(args.name, "distinct_array_agg"),
                Field::new("item", args.input_type.clone(), true), // keep as always true for nulls
                 // true
                 false,
            )]);
        }

        Ok(vec![Field::new_list(
            format_state_name(args.name, "array_agg"),
            Field::new("item", args.input_type.clone(), true), // keep as always true for nulls
            // true,
            false,
        )])
    }

I think we should definitely add docs about this, we had several PRs dealing with nullability already. It would be helpful for others too.

@alamb
Copy link
Contributor

alamb commented Jul 19, 2024

@jcsherin would you be willing to draft a PR updating the docs?

@jcsherin
Copy link
Contributor Author

@jcsherin would you be willing to draft a PR updating the docs?

Yes, I'll be happy to take care of this.

@jcsherin
Copy link
Contributor Author

Thanks again @jayzhan211.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants