diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 105e652a7056..2a4b74b0e21f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -640,12 +640,14 @@ mod tests { #[test] fn row_group_pruning_predicate_null_expr() { use datafusion_expr::{col, lit}; - // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 + let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); + + // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0 let expr = col("c1").gt(lit(15)).and(col("c2").is_null()); let expr = logical2physical(&expr, &schema); let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); @@ -664,19 +666,38 @@ mod tests { ), vec![1] ); + + let expr = col("c1").lt(lit(5)).and(col("c2").is_null()); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // First row group was filtered out because it contains no null value on "c2". + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &groups, + None, + Some(&pruning_predicate), + &metrics + ), + Vec::::new() + ); } #[test] fn row_group_pruning_predicate_eq_null_expr() { use datafusion_expr::{col, lit}; // test row group predicate with an unknown (Null) expr - // - // int > 1 and bool = NULL => c1_max > 1 and null let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Int32, false), Field::new("c2", DataType::Boolean, false), ])); let schema_descr = arrow_to_parquet_schema(&schema).unwrap(); + + // int > 1 and bool = NULL => c1_max > 1 and null let expr = col("c1") .gt(lit(15)) .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); @@ -698,6 +719,28 @@ mod tests { ), vec![1] ); + + let expr = col("c1") + .lt(lit(5)) + .and(col("c2").eq(lit(ScalarValue::Boolean(None)))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let groups = gen_row_group_meta_data_for_pruning_predicate(); + + let metrics = parquet_file_metrics(); + // bool = NULL always evaluates to NULL (and thus will not + // pass predicates. Ideally these should both be false + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &groups, + None, + Some(&pruning_predicate), + &metrics, + ), + Vec::::new() + ); } #[test] @@ -895,11 +938,6 @@ mod tests { .with_precision(18) .with_byte_len(16); let schema_descr = get_test_schema_descr(vec![field]); - // cast the type of c1 to decimal(28,3) - let left = cast(col("c1"), DataType::Decimal128(28, 3)); - let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -920,9 +958,9 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::fixed_len_byte_array( - // 5.00 + // 10.00 Some(FixedLenByteArray::from(ByteArray::from( - 500i128.to_be_bytes().to_vec(), + 1000i128.to_be_bytes().to_vec(), ))), // 200.00 Some(FixedLenByteArray::from(ByteArray::from( @@ -933,25 +971,80 @@ mod tests { false, )], ); - let rgm3 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::fixed_len_byte_array( None, None, None, 0, false, )], ); + let rgms = [rgm1, rgm2, rgm3]; + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( &schema, &schema_descr, - &[rgm1, rgm2, rgm3], + &rgms, None, Some(&pruning_predicate), &metrics ), vec![1, 2] ); + // c1 in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + false, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 2] + ); + // c1 not in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + true, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 1, 2] + ); // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2) // the type of parquet is decimal(18,2) @@ -969,11 +1062,6 @@ mod tests { .with_precision(18) .with_byte_len(16); let schema_descr = get_test_schema_descr(vec![field]); - // cast the type of c1 to decimal(28,3) - let left = cast(col("c1"), DataType::Decimal128(28, 3)); - let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let expr = logical2physical(&expr, &schema); - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); // we must use the big-endian when encode the i128 to bytes or vec[u8]. let rgm1 = get_row_group_meta_data( &schema_descr, @@ -990,8 +1078,8 @@ mod tests { let rgm2 = get_row_group_meta_data( &schema_descr, vec![ParquetStatistics::byte_array( - // 5.00 - Some(ByteArray::from(500i128.to_be_bytes().to_vec())), + // 10.00 + Some(ByteArray::from(1000i128.to_be_bytes().to_vec())), // 200.00 Some(ByteArray::from(20000i128.to_be_bytes().to_vec())), None, @@ -1003,18 +1091,74 @@ mod tests { &schema_descr, vec![ParquetStatistics::byte_array(None, None, None, 0, false)], ); + let rgms = [rgm1, rgm2, rgm3]; + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); let metrics = parquet_file_metrics(); assert_eq!( prune_row_groups_by_statistics( &schema, &schema_descr, - &[rgm1, rgm2, rgm3], + &rgms, None, Some(&pruning_predicate), &metrics ), vec![1, 2] ); + // c1 in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + false, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 2] + ); + // c1 not in (10, 300, 400) + // cast the type of c1 to decimal(28,3) + let left = cast(col("c1"), DataType::Decimal128(28, 3)); + let expr = left.in_list( + vec![ + lit(ScalarValue::Decimal128(Some(8000), 28, 3)), + lit(ScalarValue::Decimal128(Some(300000), 28, 3)), + lit(ScalarValue::Decimal128(Some(400000), 28, 3)), + ], + true, + ); + let expr = logical2physical(&expr, &schema); + let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap(); + let metrics = parquet_file_metrics(); + assert_eq!( + prune_row_groups_by_statistics( + &schema, + &schema_descr, + &rgms, + None, + Some(&pruning_predicate), + &metrics + ), + vec![0, 1, 2] + ); } fn get_row_group_meta_data(