Skip to content

Commit

Permalink
Fix count on all null VALUES clause (apache#13029)
Browse files Browse the repository at this point in the history
* Test Count accumulator with all-nulls

* Fix count on null values

Before the change, the `ValuesExec` containing `NullArray` would
incorrectly report column statistics as being non-null, which would
misinform `AggregateStatistics` optimizer and fold `count(always_null)`
into row count instead of 0.

This commit fixes the column statistics derivation for values with
`NullArray` and therefore fixes execution of logical plans with count
over such values.

Note that the bug was not reproducible using DataFusion SQL frontend,
because in DataFusion SQL the `VALUES (NULL)` doesn't have type
`DataType:Null` (it has some apparently arbitrarily picked type
instead).

As a follow-up, all usages of `Array:null_count` should be inspected.
The function can easily be misused (it returns "physical nulls", which
do not exist for null type).
  • Loading branch information
findepi authored Oct 21, 2024
1 parent 4fca0d5 commit 34fbe8e
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 1 deletion.
3 changes: 3 additions & 0 deletions datafusion/core/tests/core_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ mod dataframe;
/// Run all tests that are found in the `macro_hygiene` directory
mod macro_hygiene;

/// Run all tests that are found in the `execution` directory
mod execution;

/// Run all tests that are found in the `expr_api` directory
mod expr_api;

Expand Down
95 changes: 95 additions & 0 deletions datafusion/core/tests/execution/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::Int64Array;
use arrow_schema::{DataType, Field};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
use datafusion_execution::TaskContext;
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::logical_plan::{LogicalPlan, Values};
use datafusion_expr::{Aggregate, AggregateUDF, Expr};
use datafusion_functions_aggregate::count::Count;
use datafusion_physical_plan::collect;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

///! Logical plans need to provide stable semantics, as downstream projects
///! create them and depend on them. Test executable semantics of logical plans.
#[tokio::test]
async fn count_only_nulls() -> Result<()> {
// Input: VALUES (NULL), (NULL), (NULL) AS _(col)
let input_schema = Arc::new(DFSchema::from_unqualified_fields(
vec![Field::new("col", DataType::Null, true)].into(),
HashMap::new(),
)?);
let input = Arc::new(LogicalPlan::Values(Values {
schema: input_schema,
values: vec![
vec![Expr::Literal(ScalarValue::Null)],
vec![Expr::Literal(ScalarValue::Null)],
vec![Expr::Literal(ScalarValue::Null)],
],
}));
let input_col_ref = Expr::Column(Column {
relation: None,
name: "col".to_string(),
});

// Aggregation: count(col) AS count
let aggregate = LogicalPlan::Aggregate(Aggregate::try_new(
input,
vec![],
vec![Expr::AggregateFunction(AggregateFunction {
func: Arc::new(AggregateUDF::new_from_impl(Count::new())),
args: vec![input_col_ref],
distinct: false,
filter: None,
order_by: None,
null_treatment: None,
})],
)?);

// Execute and verify results
let session_state = SessionStateBuilder::new().build();
let physical_plan = session_state.create_physical_plan(&aggregate).await?;
let result =
collect(physical_plan, Arc::new(TaskContext::from(&session_state))).await?;

let result = only(result.as_slice());
let result_schema = result.schema();
let field = only(result_schema.fields().deref());
let column = only(result.columns());

assert_eq!(field.data_type(), &DataType::Int64); // TODO should be UInt64
assert_eq!(column.deref(), &Int64Array::from(vec![0]));

Ok(())
}

fn only<T>(elements: &[T]) -> &T
where
T: Debug,
{
let [element] = elements else {
panic!("Expected exactly one element, got {:?}", elements);
};
element
}
18 changes: 18 additions & 0 deletions datafusion/core/tests/execution/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod logical_plan;
14 changes: 14 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,17 @@ impl Accumulator for DistinctCountAccumulator {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::NullArray;

#[test]
fn count_accumulator_nulls() -> Result<()> {
let mut accumulator = CountAccumulator::new();
accumulator.update_batch(&[Arc::new(NullArray::new(10))])?;
assert_eq!(accumulator.evaluate()?, ScalarValue::Int64(Some(0)));
Ok(())
}
}
6 changes: 5 additions & 1 deletion datafusion/physical-plan/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ pub fn compute_record_batch_statistics(
for partition in batches.iter() {
for batch in partition {
for (stat_index, col_index) in projection.iter().enumerate() {
null_counts[stat_index] += batch.column(*col_index).null_count();
null_counts[stat_index] += batch
.column(*col_index)
.logical_nulls()
.map(|nulls| nulls.null_count())
.unwrap_or_default();
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions datafusion/physical-plan/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ mod tests {
use crate::test::{self, make_partition};

use arrow_schema::{DataType, Field};
use datafusion_common::stats::{ColumnStatistics, Precision};

#[tokio::test]
async fn values_empty_case() -> Result<()> {
Expand Down Expand Up @@ -269,4 +270,34 @@ mod tests {
let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]])
.unwrap_err();
}

#[test]
fn values_stats_with_nulls_only() -> Result<()> {
let data = vec![
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = ValuesExec::try_new(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;

assert_eq!(
values.statistics()?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), // not important
column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(rows), // there are only nulls
distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
},],
}
);

Ok(())
}
}

0 comments on commit 34fbe8e

Please sign in to comment.