Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle predicates on non-nullable columns without stats #700

Merged
merged 6 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ impl DataSkippingFilter {
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);

// Convert all fields into nullable, as stats may not be available for all columns
// (and usually aren't for partition columns).
struct NullableStatsTransform;
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: field.metadata.clone(),
}),
};
Some(field)
}
}

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
Expand All @@ -85,14 +107,19 @@ impl DataSkippingFilter {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform

let stats_schema = NullableStatsTransform
.transform_struct(&referenced_schema)?
.into_owned();

let nullcount_schema = NullCountStatsTransform
.transform_struct(&stats_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", nullcount_schema),
StructField::nullable("minValues", referenced_schema.clone()),
StructField::nullable("maxValues", referenced_schema),
StructField::nullable("minValues", stats_schema.clone()),
StructField::nullable("maxValues", stats_schema),
]));

// Skipping happens in several steps:
Expand Down
127 changes: 126 additions & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo,
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use itertools::Itertools;
use object_store::{memory::InMemory, path::Path, ObjectStore};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use test_utils::{
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
record_batch_to_bytes, IntoArray, TestAction, METADATA,
record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA,
};
use url::Url;

Expand Down Expand Up @@ -906,6 +908,129 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_partition_column() -> Result<(), Box<dyn std::error::Error>> {
// Test for https://github.com/delta-io/delta-kernel-rs/issues/698
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, thanks for adding test!

let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
];

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from("id=1").child(PARQUET_FILE1),
record_batch_to_bytes(&batch).into(),
)
.await?;
storage
.put(
&Path::from("id=2").child(PARQUET_FILE2),
record_batch_to_bytes(&batch).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("id"), 2);
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let mut result_batch = into_record_batch(engine_data?.raw_data?);
let _ = result_batch.remove_column(result_batch.schema().index_of("id")?);
assert_eq!(&batch, &result_batch);
files_scanned += 1;
}
// Partition pruning is not yet implemented, so we still read the data for both partitions
assert_eq!(2, files_scanned);
Ok(())
}

#[tokio::test]
async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box<dyn std::error::Error>>
{
let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?;
let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?;

let storage = Arc::new(InMemory::new());
let actions = [
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(),
r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(),
r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(),
// Add one file with stats, one file without
format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#),
format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#),
];

// Disable writing Parquet statistics so these cannot be used for pruning row groups
let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.build();

add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?;
storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes_with_props(&batch_2, writer_props).into(),
)
.await?;

let location = Url::parse("memory:///")?;
let table = Table::new(location);

let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
));
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

let predicate = Expression::eq(column_expr!("val"), "g");
let scan = snapshot
.scan_builder()
.with_predicate(Arc::new(predicate))
.build()?;

let stream = scan.execute(engine)?;

let mut files_scanned = 0;
for engine_data in stream {
let result_batch = into_record_batch(engine_data?.raw_data?);
assert_eq!(&batch_2, &result_batch);
files_scanned += 1;
}
// One file is scanned as stats are missing so we don't know the predicate isn't satisfied
assert_eq!(1, files_scanned);

Ok(())
}

#[test]
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
Expand Down
12 changes: 10 additions & 2 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec<TestAction>) -> String {
/// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign
/// types
pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let props = WriterProperties::builder().build();
let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap();
record_batch_to_bytes_with_props(batch, props)
}

pub fn record_batch_to_bytes_with_props(
batch: &RecordBatch,
writer_properties: WriterProperties,
) -> Vec<u8> {
let mut data: Vec<u8> = Vec::new();
let mut writer =
ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap();
writer.write(batch).expect("Writing batch");
// writer must be closed to write footer
writer.close().unwrap();
Expand Down
Loading