diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 11181863d..057574744 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -75,6 +75,28 @@ impl DataSkippingFilter { let (predicate, referenced_schema) = physical_predicate?; debug!("Creating a data skipping filter for {:#?}", predicate); + // Convert all fields into nullable, as stats may not be available for all columns + // (and usually aren't for partition columns). + struct NullableStatsTransform; + impl<'a> SchemaTransform<'a> for NullableStatsTransform { + fn transform_struct_field( + &mut self, + field: &'a StructField, + ) -> Option> { + use Cow::*; + let field = match self.transform(&field.data_type)? { + Borrowed(_) if field.is_nullable() => Borrowed(field), + data_type => Owned(StructField { + name: field.name.clone(), + data_type: data_type.into_owned(), + nullable: true, + metadata: field.metadata.clone(), + }), + }; + Some(field) + } + } + // Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG) struct NullCountStatsTransform; impl<'a> SchemaTransform<'a> for NullCountStatsTransform { @@ -85,14 +107,19 @@ impl DataSkippingFilter { Some(Cow::Owned(PrimitiveType::Long)) } } - let nullcount_schema = NullCountStatsTransform + + let stats_schema = NullableStatsTransform .transform_struct(&referenced_schema)? .into_owned(); + + let nullcount_schema = NullCountStatsTransform + .transform_struct(&stats_schema)? + .into_owned(); let stats_schema = Arc::new(StructType::new([ StructField::nullable("numRecords", DataType::LONG), StructField::nullable("nullCount", nullcount_schema), - StructField::nullable("minValues", referenced_schema.clone()), - StructField::nullable("maxValues", referenced_schema), + StructField::nullable("minValues", stats_schema.clone()), + StructField::nullable("maxValues", stats_schema), ])); // Skipping happens in several steps: diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 9d5d24314..1ce9b9017 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -15,10 +15,12 @@ use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, use delta_kernel::scan::Scan; use delta_kernel::schema::{DataType, Schema}; use delta_kernel::{Engine, FileMeta, Table}; +use itertools::Itertools; use object_store::{memory::InMemory, path::Path, ObjectStore}; +use parquet::file::properties::{EnabledStatistics, WriterProperties}; use test_utils::{ actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch, - record_batch_to_bytes, IntoArray, TestAction, METADATA, + record_batch_to_bytes, record_batch_to_bytes_with_props, IntoArray, TestAction, METADATA, }; use url::Url; @@ -906,6 +908,129 @@ fn with_predicate_and_removes() -> Result<(), Box> { Ok(()) } +#[tokio::test] +async fn predicate_on_non_nullable_partition_column() -> Result<(), Box> { + // Test for https://github.com/delta-io/delta-kernel-rs/issues/698 + let batch = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?; + + let storage = Arc::new(InMemory::new()); + let actions = [ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(), + r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"id\"]"},"isBlindAppend":true}}"#.to_string(), + r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":["id"],"configuration":{},"createdTime":1587968585495}}"#.to_string(), + format!(r#"{{"add":{{"path":"id=1/{PARQUET_FILE1}","partitionValues":{{"id":"1"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + format!(r#"{{"add":{{"path":"id=2/{PARQUET_FILE2}","partitionValues":{{"id":"2"}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + ]; + + add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?; + storage + .put( + &Path::from("id=1").child(PARQUET_FILE1), + record_batch_to_bytes(&batch).into(), + ) + .await?; + storage + .put( + &Path::from("id=2").child(PARQUET_FILE2), + record_batch_to_bytes(&batch).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let table = Table::new(location); + + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + )); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?); + + let predicate = Expression::eq(column_expr!("id"), 2); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let stream = scan.execute(engine)?; + + let mut files_scanned = 0; + for engine_data in stream { + let mut result_batch = into_record_batch(engine_data?.raw_data?); + let _ = result_batch.remove_column(result_batch.schema().index_of("id")?); + assert_eq!(&batch, &result_batch); + files_scanned += 1; + } + // Partition pruning is not yet implemented, so we still read the data for both partitions + assert_eq!(2, files_scanned); + Ok(()) +} + +#[tokio::test] +async fn predicate_on_non_nullable_column_missing_stats() -> Result<(), Box> +{ + let batch_1 = generate_batch(vec![("val", vec!["a", "b", "c"].into_array())])?; + let batch_2 = generate_batch(vec![("val", vec!["d", "e", "f"].into_array())])?; + + let storage = Arc::new(InMemory::new()); + let actions = [ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string(), + r#"{"commitInfo":{"timestamp":1587968586154,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}"#.to_string(), + r#"{"metaData":{"id":"5fba94ed-9794-4965-ba6e-6ee3c0d22af9","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1587968585495}}"#.to_string(), + // Add one file with stats, one file without + format!(r#"{{"add":{{"path":"{PARQUET_FILE1}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{\"val\":0}},\"minValues\":{{\"val\":\"a\"}},\"maxValues\":{{\"val\":\"c\"}}}}"}}}}"#), + format!(r#"{{"add":{{"path":"{PARQUET_FILE2}","partitionValues":{{}},"size":0,"modificationTime":1587968586000,"dataChange":true, "stats":"{{\"numRecords\":3,\"nullCount\":{{}},\"minValues\":{{}},\"maxValues\":{{}}}}"}}}}"#), + ]; + + // Disable writing Parquet statistics so these cannot be used for pruning row groups + let writer_props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + add_commit(storage.as_ref(), 0, actions.iter().join("\n")).await?; + storage + .put( + &Path::from(PARQUET_FILE1), + record_batch_to_bytes_with_props(&batch_1, writer_props.clone()).into(), + ) + .await?; + storage + .put( + &Path::from(PARQUET_FILE2), + record_batch_to_bytes_with_props(&batch_2, writer_props).into(), + ) + .await?; + + let location = Url::parse("memory:///")?; + let table = Table::new(location); + + let engine = Arc::new(DefaultEngine::new( + storage.clone(), + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + )); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?); + + let predicate = Expression::eq(column_expr!("val"), "g"); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let stream = scan.execute(engine)?; + + let mut files_scanned = 0; + for engine_data in stream { + let result_batch = into_record_batch(engine_data?.raw_data?); + assert_eq!(&batch_2, &result_batch); + files_scanned += 1; + } + // One file is scanned as stats are missing so we don't know the predicate isn't satisfied + assert_eq!(1, files_scanned); + + Ok(()) +} + #[test] fn short_dv() -> Result<(), Box> { let expected = vec![ diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 2605bea56..0aeee887d 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -37,9 +37,17 @@ pub fn actions_to_string(actions: Vec) -> String { /// convert a RecordBatch into a vector of bytes. We can't use `From` since these are both foreign /// types pub fn record_batch_to_bytes(batch: &RecordBatch) -> Vec { - let mut data: Vec = Vec::new(); let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(&mut data, batch.schema(), Some(props)).unwrap(); + record_batch_to_bytes_with_props(batch, props) +} + +pub fn record_batch_to_bytes_with_props( + batch: &RecordBatch, + writer_properties: WriterProperties, +) -> Vec { + let mut data: Vec = Vec::new(); + let mut writer = + ArrowWriter::try_new(&mut data, batch.schema(), Some(writer_properties)).unwrap(); writer.write(batch).expect("Writing batch"); // writer must be closed to write footer writer.close().unwrap();