Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply a schema to fix column names #331

Merged
merged 54 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
5a7c419
apply a schema to fix column names
Sep 10, 2024
31e1977
remove an unwrap
Sep 10, 2024
5acbdb2
cleanup
Sep 10, 2024
1ea7554
checkpoint with working rename
Sep 12, 2024
54493aa
cleanup unneeded complexity, fmt
Sep 12, 2024
532a870
add map types
Sep 12, 2024
a5feb17
fmt
Sep 12, 2024
91ca99c
only apply if needed
Sep 12, 2024
29b09f7
fmt+clippy
nicklan Oct 8, 2024
451a76d
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 10, 2024
ada2f7b
checkpoint
nicklan Oct 10, 2024
1633e49
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 11, 2024
4af1bbd
checkpoint
nicklan Oct 11, 2024
e0e3ed6
refactor for helpers
nicklan Oct 12, 2024
ce4d3f3
small bug fix
nicklan Oct 15, 2024
9d3b49d
arg order
nicklan Oct 15, 2024
69c1e27
checkpoint
nicklan Oct 15, 2024
e232c77
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 15, 2024
40833db
match main
nicklan Oct 15, 2024
f7652d5
use transform
nicklan Oct 15, 2024
4113603
match main
nicklan Oct 15, 2024
7ca2705
golden test not ready
nicklan Oct 15, 2024
39cfe0f
also align nullability
nicklan Oct 16, 2024
07c56ca
Also fix metadata
nicklan Oct 16, 2024
f2f75b8
use as_string instead
nicklan Oct 16, 2024
3dda376
ok_or_else all the things
nicklan Oct 16, 2024
e67360c
address comments
nicklan Oct 16, 2024
085bd19
remove unneeded arg
nicklan Oct 16, 2024
88b4db3
Adjust flow for comment
nicklan Oct 16, 2024
09bc974
also update metadata
nicklan Oct 16, 2024
29645a7
validate type on non-struct exprs
nicklan Oct 16, 2024
927bbaf
Update doc comment
nicklan Oct 16, 2024
b605e5e
fmt
nicklan Oct 16, 2024
2d00469
make column mapping test work :)
nicklan Oct 17, 2024
0b80c0e
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 17, 2024
a4c6de6
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 18, 2024
38456d1
address some comments
nicklan Oct 18, 2024
5e65247
address comments
nicklan Oct 18, 2024
9020a25
add comment
nicklan Oct 18, 2024
9cbd992
fix comment
nicklan Oct 18, 2024
17a5303
comment
nicklan Oct 18, 2024
ae67479
more consistent naming
nicklan Oct 18, 2024
b2e143a
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 18, 2024
0b48b4c
add tests
nicklan Oct 18, 2024
65b6218
address some comments
nicklan Oct 18, 2024
479758d
ArrayRef all the things
nicklan Oct 18, 2024
efc5193
refactor to new_field_with_metadata
nicklan Oct 18, 2024
e02357a
move ensure_data_types to own module
nicklan Oct 21, 2024
eb786d8
avoid feature flag repetition
nicklan Oct 21, 2024
d9af456
switch to struct for ensure
nicklan Oct 21, 2024
622c03f
address final comments, and make feature flags at least consistent
nicklan Oct 21, 2024
dd65781
Merge branch 'main' into fix-cm-name-take1
nicklan Oct 21, 2024
ecbd668
Apply suggestions from code review
nicklan Oct 21, 2024
929a187
final comments
nicklan Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct Cli {
/// to the aws metadata server, which will fail unless you're on an ec2 instance.
#[arg(long)]
public: bool,

/// Only print the schema of the table
#[arg(long)]
schema_only: bool,
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
Expand Down Expand Up @@ -90,6 +94,11 @@ fn try_main() -> DeltaResult<()> {

let snapshot = table.snapshot(engine.as_ref(), None)?;

if cli.schema_only {
println!("{:#?}", snapshot.schema());
return Ok(());
}

let read_schema_opt = cli
.columns
.map(|cols| -> DeltaResult<_> {
Expand Down
204 changes: 194 additions & 10 deletions kernel/src/engine/arrow_expression.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #331 (comment):

We're still only at 9% patch coverage. We might need to take @zachschuermann advice and make a schema just for this?

Maybe you could reuse this one from the schema depth checker test?
https://github.com/delta-incubator/delta-kernel-rs/blob/main/kernel/src/schema.rs#L1005
It has a wide variety of nested structures, at least.

Down to 7% now...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've added some more tests and it's up now

Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Expression handling based on arrow-rs compute kernels.
use std::borrow::Borrow;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene};
use arrow_arith::numeric::{add, div, mul, sub};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::{types::*, MapArray};
use arrow_array::{
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array,
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch,
Expand All @@ -21,20 +23,21 @@ use arrow_select::concat::concat;
use itertools::Itertools;

use super::arrow_conversion::LIST_ARRAY_ROOT;
use super::arrow_utils::make_arrow_error;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_utils::ensure_data_types;
use crate::engine::arrow_utils::prim_array_cmp;
use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef};
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField};
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};

// TODO leverage scalars / Datum

fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> {
arr.as_any()
.downcast_ref::<BooleanArray>()
.ok_or(Error::generic("expected boolean array"))
.ok_or_else(|| Error::generic("expected boolean array"))
}

impl Scalar {
Expand Down Expand Up @@ -206,7 +209,6 @@ fn evaluate_expression(
.iter()
.zip(output_schema.fields())
.map(|(output_col, output_field)| -> DeltaResult<_> {
ensure_data_types(output_field.data_type(), output_col.data_type())?;
Ok(ArrowField::new(
output_field.name(),
output_col.data_type().clone(),
Expand Down Expand Up @@ -350,6 +352,190 @@ fn evaluate_expression(
}
}

// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the
// names of fields, nullable, and metadata in the struct have been transformed to match those in
// schema specified by `schema`
fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<RecordBatch> {
let DataType::Struct(struct_schema) = schema else {
return Err(Error::generic(
"apply_schema at top-level must be passed a struct schema",
));
};
let applied = apply_schema_to_struct(array, struct_schema)?;
Ok(applied.into())
}

// helper to transform an arrow field+col into the specified target type. If `rename` is specified
// the field will be renamed to the contained `str`.
fn transform_field_and_col(
arrow_field: &Arc<ArrowField>,
arrow_col: Arc<dyn Array>,
target_type: &DataType,
nullable: bool,
rename: Option<&str>,
metadata: Option<HashMap<String, String>>,
) -> DeltaResult<(ArrowField, Arc<dyn Array>)> {
let transformed_col = apply_schema_to(&arrow_col, target_type)?;
let mut transformed_field = ArrowField::new(
rename.unwrap_or_else(|| arrow_field.name()),
transformed_col.data_type().clone(),
nullable,
);
if let Some(metadata) = metadata {
transformed_field.set_metadata(metadata);
};
Ok((transformed_field, transformed_col))
}

// A helper that is a wrapper over `transform_field_and_col`. This will take apart the passed struct
// and use that method to transform each column and then put the struct back together. Target types
// and names for each column should be passed in `target_types_and_names`. The number of elements in
// the `target_types_and_names` iterator _must_ be the same as the number of columns in
// `struct_array`. The transformation is ordinal. That is, the order of fields in `target_fields`
// _must_ match the order of the columns in `struct_array`.
fn transform_struct(
struct_array: &StructArray,
target_fields: impl Iterator<Item = impl Borrow<StructField>>,
) -> DeltaResult<StructArray> {
let (arrow_fields, arrow_cols, nulls) = struct_array.clone().into_parts();
let input_col_count = arrow_cols.len();
let result_iter = arrow_fields
.into_iter()
.zip(arrow_cols)
.zip(target_fields)
.map(|((sa_field, sa_col), target_field)| {
let target_field = target_field.borrow();
transform_field_and_col(
sa_field,
sa_col,
target_field.data_type(),
target_field.nullable,
Some(target_field.name.as_str()),
Some(target_field.metadata_with_string_values()),
)
});
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<Arc<dyn Array>>) =
result_iter.process_results(|iter| iter.unzip())?;
if transformed_cols.len() != input_col_count {
return Err(Error::InternalError(format!(
"Passed struct had {input_col_count} columns, but transformed column has {}",
transformed_cols.len()
)));
}
Ok(StructArray::try_new(
transformed_fields.into(),
transformed_cols,
nulls,
)?)
}

// Transform a struct array. The data is in `array`, and the target fields are in `kernel_fields`.
fn apply_schema_to_struct(array: &dyn Array, kernel_fields: &Schema) -> DeltaResult<StructArray> {
let Some(sa) = array.as_struct_opt() else {
return Err(make_arrow_error(
"Arrow claimed to be a struct but isn't a StructArray",
));
};
transform_struct(sa, kernel_fields.fields())
}

// deconstruct the array, then rebuild the mapped version
fn apply_schema_to_list(
array: &dyn Array,
target_inner_type: &ArrayType,
) -> DeltaResult<ListArray> {
let Some(la) = array.as_list_opt() else {
return Err(make_arrow_error(
"Arrow claimed to be a list but isn't a ListArray",
));
};
let (field, offset_buffer, values, nulls) = la.clone().into_parts();
let (transformed_field, transformed_values) = transform_field_and_col(
&field,
values,
&target_inner_type.element_type,
target_inner_type.contains_null,
None,
None,
)?;
Ok(ListArray::try_new(
Arc::new(transformed_field),
offset_buffer,
transformed_values,
nulls,
)?)
}

// deconstruct a map, and rebuild it with the specified target kernel type
fn apply_schema_to_map(array: &dyn Array, kernel_map_type: &MapType) -> DeltaResult<MapArray> {
let Some(ma) = array.as_map_opt() else {
return Err(make_arrow_error(
"Arrow claimed to be a map but isn't a MapArray",
));
};

let ArrowDataType::Map(arrow_map_type, _) = array.data_type() else {
return Err(make_arrow_error(
"Arrow claimed to be a map but doesn't have map DataType",
));
};

let (map_field, offset_buffer, map_struct_array, nulls, ordered) = ma.clone().into_parts();

let ArrowDataType::Struct(_) = arrow_map_type.data_type() else {
return Err(make_arrow_error("Arrow map type wasn't a struct."));
};

let target_fields = map_struct_array
.fields()
.iter()
.zip([
(&kernel_map_type.key_type, false),
(
&kernel_map_type.value_type,
kernel_map_type.value_contains_null,
),
])
.map(|(arrow_field, (target_type, nullable))| {
StructField::new(arrow_field.name(), target_type.clone(), nullable)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double checking -- no metadata here, because this is a hidden/internal struct that isn't part of the user-visible schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

});

// Arrow puts the key type/val as the first field/col and the value type/val as the second. So
// we just transform like a 'normal' struct, but we know there are two fields/cols and we
// specify the key/value types as the target type iterator.
let transformed_map_struct_array = transform_struct(&map_struct_array, target_fields)?;

let transformed_map_field = Arc::new(
map_field
.as_ref()
.clone()
.with_data_type(transformed_map_struct_array.data_type().clone()),
);
Ok(MapArray::try_new(
transformed_map_field,
offset_buffer,
transformed_map_struct_array,
nulls,
ordered,
)?)
}

// apply `schema` to `array`. This handles renaming, and adjusting nullability and metadata. if the
// actual data types don't match, this will return an error
fn apply_schema_to(array: &ArrayRef, schema: &DataType) -> DeltaResult<ArrayRef> {
use DataType::*;
let array: ArrayRef = match schema {
Struct(stype) => Arc::new(apply_schema_to_struct(array, stype)?),
Array(atype) => Arc::new(apply_schema_to_list(array, atype)?),
Map(mtype) => Arc::new(apply_schema_to_map(array, mtype)?),
_ => {
ensure_data_types(schema, array.data_type(), true)?;
array.clone()
}
};
Ok(array)
}

#[derive(Debug)]
pub struct ArrowExpressionHandler;

Expand Down Expand Up @@ -380,7 +566,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {
let batch = batch
.as_any()
.downcast_ref::<ArrowEngineData>()
.ok_or(Error::engine_data_type("ArrowEngineData"))?
.ok_or_else(|| Error::engine_data_type("ArrowEngineData"))?
.record_batch();
let _input_schema: ArrowSchema = self.input_schema.as_ref().try_into()?;
// TODO: make sure we have matching schemas for validation
Expand All @@ -392,13 +578,11 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator {
// )));
// };
let array_ref = evaluate_expression(&self.expression, batch, Some(&self.output_type))?;
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?;
let batch: RecordBatch = if let DataType::Struct(_) = self.output_type {
array_ref
.as_struct_opt()
.ok_or(Error::unexpected_column_type("Expected a struct array"))?
.into()
apply_schema(&array_ref, &self.output_type)?
} else {
let array_ref = apply_schema_to(&array_ref, &self.output_type)?;
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?;
let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]);
RecordBatch::try_new(Arc::new(schema), vec![array_ref])?
};
Expand Down
Loading
Loading