Skip to content

Commit

Permalink
Merge branch 'main' into expose-partition-cols-from-snapshot-ffi
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Feb 20, 2025
2 parents fe7adba + 72b585d commit ac5fbf3
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 31 deletions.
2 changes: 1 addition & 1 deletion ffi/examples/read-table/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ void print_schema(SharedSnapshot* snapshot)
.visit_timestamp = visit_timestamp,
.visit_timestamp_ntz = visit_timestamp_ntz,
};
uintptr_t schema_list_id = visit_schema(snapshot, &visitor);
uintptr_t schema_list_id = visit_snapshot_schema(snapshot, &visitor);
#ifdef VERBOSE
printf("Schema returned in list %" PRIxPTR "\n", schema_list_id);
#endif
Expand Down
27 changes: 24 additions & 3 deletions ffi/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::os::raw::c_void;

use crate::scan::CStringMap;
use crate::scan::{CStringMap, SharedSchema};
use crate::{handle::Handle, kernel_string_slice, KernelStringSlice, SharedSnapshot};
use delta_kernel::schema::{ArrayType, DataType, MapType, PrimitiveType, StructType};

Expand Down Expand Up @@ -201,11 +201,32 @@ pub struct EngineSchemaVisitor {
///
/// Caller is responsible for passing a valid snapshot handle and schema visitor.
#[no_mangle]
pub unsafe extern "C" fn visit_schema(
pub unsafe extern "C" fn visit_snapshot_schema(
snapshot: Handle<SharedSnapshot>,
visitor: &mut EngineSchemaVisitor,
) -> usize {
let snapshot = unsafe { snapshot.as_ref() };
visit_schema_impl(snapshot.schema(), visitor)
}

/// Visit the given `schema` using the provided `visitor`. See the documentation of
/// [`EngineSchemaVisitor`] for a description of how this visitor works.
///
/// This method returns the id of the list allocated to hold the top level schema columns.
///
/// # Safety
///
/// Caller is responsible for passing a valid schema handle and schema visitor.
#[no_mangle]
pub unsafe extern "C" fn visit_schema(
schema: Handle<SharedSchema>,
visitor: &mut EngineSchemaVisitor,
) -> usize {
let schema = unsafe { schema.as_ref() };
visit_schema_impl(schema, visitor)
}

fn visit_schema_impl(schema: &StructType, visitor: &mut EngineSchemaVisitor) -> usize {
// Visit all the fields of a struct and return the list of children
fn visit_struct_fields(visitor: &EngineSchemaVisitor, s: &StructType) -> usize {
let child_list_id = (visitor.make_field_list)(visitor.data, s.fields.len());
Expand Down Expand Up @@ -316,5 +337,5 @@ pub unsafe extern "C" fn visit_schema(
}
}

visit_struct_fields(visitor, snapshot.schema())
visit_struct_fields(visitor, schema)
}
14 changes: 13 additions & 1 deletion kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use tracing::debug;

use std::collections::{HashMap, HashSet};

/// ArrowEngineData holds an Arrow RecordBatch, implements `EngineData` so the kernel can extract from it.
pub use crate::engine::arrow_utils::fix_nested_null_masks;

/// ArrowEngineData holds an Arrow `RecordBatch`, implements `EngineData` so the kernel can extract from it.
///
/// WARNING: Row visitors require that all leaf columns of the record batch have correctly computed
/// NULL masks. The arrow parquet reader is known to produce incomplete NULL masks, for
/// example. When in doubt, call [`fix_nested_null_masks`] first.
pub struct ArrowEngineData {
data: RecordBatch,
}
Expand Down Expand Up @@ -43,6 +49,12 @@ impl From<RecordBatch> for ArrowEngineData {
}
}

impl From<StructArray> for ArrowEngineData {
fn from(value: StructArray) -> Self {
ArrowEngineData::new(value.into())
}
}

impl From<ArrowEngineData> for RecordBatch {
fn from(value: ArrowEngineData) -> Self {
value.data
Expand Down
170 changes: 168 additions & 2 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use crate::{
};

use arrow_array::{
cast::AsArray, new_null_array, Array as ArrowArray, GenericListArray, OffsetSizeTrait,
RecordBatch, StringArray, StructArray,
cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray,
OffsetSizeTrait, RecordBatch, StringArray, StructArray,
};
use arrow_buffer::NullBuffer;
use arrow_json::{LineDelimitedWriter, ReaderBuilder};
use arrow_schema::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
Expand Down Expand Up @@ -62,6 +63,21 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s.into())).with_backtrace()
}

/// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
/// accurate null masks that row visitors rely on for correctness.
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
) -> DeltaResult<T>
where
StructArray: Into<T>,
{
let data = reorder_struct_array(batch.into(), requested_ordering)?;
let data = fix_nested_null_masks(data);
Ok(data.into())
}

/*
* The code below implements proper pruning of columns when reading parquet, reordering of columns to
* match the specified schema, and insertion of null columns if the requested schema includes a
Expand Down Expand Up @@ -609,6 +625,53 @@ fn reorder_list<O: OffsetSizeTrait>(
}
}

/// Use this function to recursively compute properly unioned null masks for all nested
/// columns of a record batch, making it safe to project out and consume nested columns.
///
/// Arrow does not guarantee that the null masks associated with nested columns are accurate --
/// instead, the reader must consult the union of logical null masks the column and all
/// ancestors. The parquet reader stopped doing this automatically as of arrow-53.3, for example.
pub fn fix_nested_null_masks(batch: StructArray) -> StructArray {
compute_nested_null_masks(batch, None)
}

/// Splits a StructArray into its parts, unions in the parent null mask, and uses the result to
/// recursively update the children as well before putting everything back together.
fn compute_nested_null_masks(sa: StructArray, parent_nulls: Option<&NullBuffer>) -> StructArray {
let (fields, columns, nulls) = sa.into_parts();
let nulls = NullBuffer::union(parent_nulls, nulls.as_ref());
let columns = columns
.into_iter()
.map(|column| match column.as_struct_opt() {
Some(sa) => Arc::new(compute_nested_null_masks(sa.clone(), nulls.as_ref())) as _,
None => {
let data = column.to_data();
let nulls = NullBuffer::union(nulls.as_ref(), data.nulls());
let builder = data.into_builder().nulls(nulls);
// Use an unchecked build to avoid paying a redundant O(k) validation cost for a
// `RecordBatch` with k leaf columns.
//
// SAFETY: The builder was constructed from an `ArrayData` we extracted from the
// column. The change we make is the null buffer, via `NullBuffer::union` with input
// null buffers that were _also_ extracted from the column and its parent. A union
// can only _grow_ the set of NULL rows, so data validity is preserved. Even if the
// `parent_nulls` somehow had a length mismatch --- which it never should, having
// also been extracted from our grandparent --- the mismatch would have already
// caused `NullBuffer::union` to panic.
let data = unsafe { builder.build_unchecked() };
make_array(data)
}
})
.collect();

// Use an unchecked constructor to avoid paying O(n*k) a redundant null buffer validation cost
// for a `RecordBatch` with n rows and k leaf columns.
//
// SAFETY: We are simply reassembling the input `StructArray` we previously broke apart, with
// updated null buffers. See above for details about null buffer safety.
unsafe { StructArray::new_unchecked(fields, columns, nulls) }
}

/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
pub(crate) fn parse_json(
Expand Down Expand Up @@ -1432,4 +1495,107 @@ mod tests {
);
Ok(())
}

#[test]
fn test_arrow_broken_nested_null_masks() {
use crate::engine::arrow_utils::fix_nested_null_masks;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

// Parse some JSON into a nested schema
let schema = Arc::new(Schema::new(vec![Field::new(
"outer",
DataType::Struct(Fields::from(vec![
Field::new(
"inner_nullable",
DataType::Struct(Fields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
true,
),
Field::new(
"inner_non_null",
DataType::Struct(Fields::from(vec![
Field::new("leaf_non_null", DataType::Int32, false),
Field::new("leaf_nullable", DataType::Int32, true),
])),
false,
),
])),
true,
)]));
let json_string = r#"
{ }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 1 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 2, "leaf_nullable" : 3 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } }
{ "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } }
"#;
let batch1 = arrow::json::ReaderBuilder::new(schema.clone())
.build(json_string.as_bytes())
.unwrap()
.next()
.unwrap()
.unwrap();
println!("Batch 1: {batch1:?}");

macro_rules! assert_nulls {
( $column: expr, $nulls: expr ) => {
assert_eq!($column.nulls().unwrap(), &NullBuffer::from(&$nulls[..]));
};
}

// If any of these tests ever fail, it means the arrow JSON reader started producing
// incomplete nested NULL masks. If that happens, we need to update all JSON reads to call
// `fix_nested_null_masks`.
let outer_1 = batch1.column(0).as_struct();
assert_nulls!(outer_1, [false, true, true, true, true]);
let inner_nullable_1 = outer_1.column(0).as_struct();
assert_nulls!(inner_nullable_1, [false, false, false, true, true]);
let nullable_leaf_non_null_1 = inner_nullable_1.column(0);
assert_nulls!(nullable_leaf_non_null_1, [false, false, false, true, true]);
let nullable_leaf_nullable_1 = inner_nullable_1.column(1);
assert_nulls!(nullable_leaf_nullable_1, [false, false, false, false, true]);
let inner_non_null_1 = outer_1.column(1).as_struct();
assert_nulls!(inner_non_null_1, [false, true, true, true, true]);
let non_null_leaf_non_null_1 = inner_non_null_1.column(0);
assert_nulls!(non_null_leaf_non_null_1, [false, true, true, true, true]);
let non_null_leaf_nullable_1 = inner_non_null_1.column(1);
assert_nulls!(non_null_leaf_nullable_1, [false, false, true, false, false]);

// Write the batch to a parquet file and read it back
let mut buffer = vec![];
let mut writer =
parquet::arrow::ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
writer.write(&batch1).unwrap();
writer.close().unwrap(); // writer must be closed to write footer
let batch2 = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(buffer))
.unwrap()
.build()
.unwrap()
.next()
.unwrap()
.unwrap();
println!("Batch 2 before: {batch2:?}");

// Starting from arrow-53.3, the parquet reader started returning broken nested NULL masks.
let batch2 = RecordBatch::from(fix_nested_null_masks(batch2.into()));

// Verify the data survived the round trip
let outer_2 = batch2.column(0).as_struct();
assert_eq!(outer_2, outer_1);
let inner_nullable_2 = outer_2.column(0).as_struct();
assert_eq!(inner_nullable_2, inner_nullable_1);
let nullable_leaf_non_null_2 = inner_nullable_2.column(0);
assert_eq!(nullable_leaf_non_null_2, nullable_leaf_non_null_1);
let nullable_leaf_nullable_2 = inner_nullable_2.column(1);
assert_eq!(nullable_leaf_nullable_2, nullable_leaf_nullable_1);
let inner_non_null_2 = outer_2.column(1).as_struct();
assert_eq!(inner_non_null_2, inner_non_null_1);
let non_null_leaf_non_null_2 = inner_non_null_2.column(0);
assert_eq!(non_null_leaf_non_null_2, non_null_leaf_non_null_1);
let non_null_leaf_nullable_2 = inner_non_null_2.column(1);
assert_eq!(non_null_leaf_nullable_2, non_null_leaf_nullable_1);
}
}
16 changes: 3 additions & 13 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
use crate::engine::default::executor::TaskExecutor;
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
Expand Down Expand Up @@ -281,12 +281,7 @@ impl FileOpener for ParquetOpener {

let stream = builder.with_batch_size(batch_size).build()?;

let stream = stream.map(move |rbr| {
// re-order each batch if needed
rbr.map_err(Error::Parquet).and_then(|rb| {
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
})
});
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
Ok(stream.boxed())
}))
}
Expand Down Expand Up @@ -355,12 +350,7 @@ impl FileOpener for PresignedUrlOpener {
let reader = builder.with_batch_size(batch_size).build()?;

let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
// re-order each batch if needed
rbr.map_err(Error::Arrow).and_then(|rb| {
reorder_struct_array(rb.into(), &requested_ordering).map(Into::into)
})
});
let stream = stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering));
Ok(stream.boxed())
}))
}
Expand Down
8 changes: 3 additions & 5 deletions kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReader

use super::read_files;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array};
use crate::engine::arrow_utils::{fixup_parquet_read, generate_mask, get_requested_indices};
use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping;
use crate::schema::SchemaRef;
use crate::{DeltaResult, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler};
Expand All @@ -28,10 +28,8 @@ fn try_create_from_parquet(
if let Some(predicate) = predicate {
builder = builder.with_row_group_filter(predicate.as_ref());
}
Ok(builder.build()?.map(move |data| {
let reordered = reorder_struct_array(data?.into(), &requested_ordering)?;
Ok(ArrowEngineData::new(reordered.into()))
}))
let stream = builder.build()?;
Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering)))
}

impl ParquetHandler for SyncParquetHandler {
Expand Down
33 changes: 30 additions & 3 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ impl DataSkippingFilter {
let (predicate, referenced_schema) = physical_predicate?;
debug!("Creating a data skipping filter for {:#?}", predicate);

// Convert all fields into nullable, as stats may not be available for all columns
// (and usually aren't for partition columns).
struct NullableStatsTransform;
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
fn transform_struct_field(
&mut self,
field: &'a StructField,
) -> Option<Cow<'a, StructField>> {
use Cow::*;
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: field.metadata.clone(),
}),
};
Some(field)
}
}

// Convert a min/max stats schema into a nullcount schema (all leaf fields are LONG)
struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
Expand All @@ -85,14 +107,19 @@ impl DataSkippingFilter {
Some(Cow::Owned(PrimitiveType::Long))
}
}
let nullcount_schema = NullCountStatsTransform

let stats_schema = NullableStatsTransform
.transform_struct(&referenced_schema)?
.into_owned();

let nullcount_schema = NullCountStatsTransform
.transform_struct(&stats_schema)?
.into_owned();
let stats_schema = Arc::new(StructType::new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", nullcount_schema),
StructField::nullable("minValues", referenced_schema.clone()),
StructField::nullable("maxValues", referenced_schema),
StructField::nullable("minValues", stats_schema.clone()),
StructField::nullable("maxValues", stats_schema),
]));

// Skipping happens in several steps:
Expand Down
Loading

0 comments on commit ac5fbf3

Please sign in to comment.