Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix check_not_null_constraints null detection #13033

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 144 additions & 7 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
use futures::stream::{StreamExt, TryStreamExt};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -852,7 +853,7 @@ pub fn execute_input_stream(
Ok(Box::pin(RecordBatchStreamAdapter::new(
sink_schema,
input_stream
.map(move |batch| check_not_null_contraits(batch?, &risky_columns)),
.map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
)))
}
}
Expand All @@ -872,7 +873,7 @@ pub fn execute_input_stream(
/// This function iterates over the specified column indices and ensures that none
/// of the columns contain null values. If any column contains null values, an error
/// is returned.
pub fn check_not_null_contraits(
pub fn check_not_null_constraints(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
Expand All @@ -885,7 +886,13 @@ pub fn check_not_null_contraits(
);
}

if batch.column(index).null_count() > 0 {
if batch
.column(index)
.logical_nulls()
.map(|nulls| nulls.null_count())
.unwrap_or_default()
> 0
Comment on lines +889 to +894
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified back if we had something like apache/arrow-rs#6608

{
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
Expand Down Expand Up @@ -920,11 +927,11 @@ pub enum CardinalityEffect {
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{DictionaryArray, Int32Array, NullArray, RunArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};

use datafusion_common::{Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};

Expand Down Expand Up @@ -1068,6 +1075,136 @@ mod tests {
fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
let _ = plan.name();
}
}

// pub mod test;
#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_reject_null() -> Result<()> {
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
// some null value inside REE array
let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
let run_end_array = RunArray::try_new(&run_ends, &values)?;
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
run_end_array.data_type().to_owned(),
true,
)])),
vec![Arc::new(run_end_array)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
let keys = Int32Array::from(vec![0, 1, 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

#[test]
fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
// some null value marked out by dictionary array
let values = Arc::new(Int32Array::from(vec![
Some(1),
None, // this null value is masked by dictionary keys
Some(3),
Some(4),
]));
let keys = Int32Array::from(vec![0, /*1,*/ 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
)?;
Ok(())
}

#[test]
fn test_check_not_null_constraints_on_null_type() -> Result<()> {
// null value of Null type
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
vec![Arc::new(NullArray::new(3))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_starts_with(
result.err().unwrap().message().as_ref(),
"Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}

fn assert_starts_with(actual: impl AsRef<str>, expected_prefix: impl AsRef<str>) {
let actual = actual.as_ref();
let expected_prefix = expected_prefix.as_ref();
assert!(
actual.starts_with(expected_prefix),
"Expected '{}' to start with '{}'",
actual,
expected_prefix
);
}
}
Loading