-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
apply a schema to fix column names #331
Merged
Merged
Changes from all commits
Commits
Show all changes
54 commits
Select commit
Hold shift + click to select a range
5a7c419
apply a schema to fix column names
31e1977
remove an unwrap
5acbdb2
cleanup
1ea7554
checkpoint with working rename
54493aa
cleanup unneeded complexity, fmt
532a870
add map types
a5feb17
fmt
91ca99c
only apply if needed
29b09f7
fmt+clippy
nicklan 451a76d
Merge branch 'main' into fix-cm-name-take1
nicklan ada2f7b
checkpoint
nicklan 1633e49
Merge branch 'main' into fix-cm-name-take1
nicklan 4af1bbd
checkpoint
nicklan e0e3ed6
refactor for helpers
nicklan ce4d3f3
small bug fix
nicklan 9d3b49d
arg order
nicklan 69c1e27
checkpoint
nicklan e232c77
Merge branch 'main' into fix-cm-name-take1
nicklan 40833db
match main
nicklan f7652d5
use transform
nicklan 4113603
match main
nicklan 7ca2705
golden test not ready
nicklan 39cfe0f
also align nullability
nicklan 07c56ca
Also fix metadata
nicklan f2f75b8
use as_string instead
nicklan 3dda376
ok_or_else all the things
nicklan e67360c
address comments
nicklan 085bd19
remove unneeded arg
nicklan 88b4db3
Adjust flow for comment
nicklan 09bc974
also update metadata
nicklan 29645a7
validate type on non-struct exprs
nicklan 927bbaf
Update doc comment
nicklan b605e5e
fmt
nicklan 2d00469
make column mapping test work :)
nicklan 0b80c0e
Merge branch 'main' into fix-cm-name-take1
nicklan a4c6de6
Merge branch 'main' into fix-cm-name-take1
nicklan 38456d1
address some comments
nicklan 5e65247
address comments
nicklan 9020a25
add comment
nicklan 9cbd992
fix comment
nicklan 17a5303
comment
nicklan ae67479
more consistent naming
nicklan b2e143a
Merge branch 'main' into fix-cm-name-take1
nicklan 0b48b4c
add tests
nicklan 65b6218
address some comments
nicklan 479758d
ArrayRef all the things
nicklan efc5193
refactor to new_field_with_metadata
nicklan e02357a
move ensure_data_types to own module
nicklan eb786d8
avoid feature flag repetition
nicklan d9af456
switch to struct for ensure
nicklan 622c03f
address final comments, and make feature flags at least consistent
nicklan dd65781
Merge branch 'main' into fix-cm-name-take1
nicklan ecbd668
Apply suggestions from code review
nicklan 929a187
final comments
nicklan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,12 @@ | ||
//! Expression handling based on arrow-rs compute kernels. | ||
use std::borrow::Borrow; | ||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
|
||
use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene}; | ||
use arrow_arith::numeric::{add, div, mul, sub}; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::types::*; | ||
use arrow_array::{types::*, MapArray}; | ||
use arrow_array::{ | ||
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array, | ||
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch, | ||
|
@@ -21,20 +23,21 @@ use arrow_select::concat::concat; | |
use itertools::Itertools; | ||
|
||
use super::arrow_conversion::LIST_ARRAY_ROOT; | ||
use super::arrow_utils::make_arrow_error; | ||
use crate::engine::arrow_data::ArrowEngineData; | ||
use crate::engine::arrow_utils::ensure_data_types; | ||
use crate::engine::arrow_utils::prim_array_cmp; | ||
use crate::engine::ensure_data_types::ensure_data_types; | ||
use crate::error::{DeltaResult, Error}; | ||
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator}; | ||
use crate::schema::{DataType, PrimitiveType, SchemaRef}; | ||
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField}; | ||
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; | ||
|
||
// TODO leverage scalars / Datum | ||
|
||
fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> { | ||
arr.as_any() | ||
.downcast_ref::<BooleanArray>() | ||
.ok_or(Error::generic("expected boolean array")) | ||
.ok_or_else(|| Error::generic("expected boolean array")) | ||
} | ||
|
||
impl Scalar { | ||
|
@@ -128,21 +131,21 @@ impl Scalar { | |
} | ||
|
||
fn wrap_comparison_result(arr: BooleanArray) -> ArrayRef { | ||
Arc::new(arr) as Arc<dyn Array> | ||
Arc::new(arr) as _ | ||
} | ||
|
||
trait ProvidesColumnByName { | ||
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>>; | ||
fn column_by_name(&self, name: &str) -> Option<&ArrayRef>; | ||
} | ||
|
||
impl ProvidesColumnByName for RecordBatch { | ||
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>> { | ||
fn column_by_name(&self, name: &str) -> Option<&ArrayRef> { | ||
self.column_by_name(name) | ||
} | ||
} | ||
|
||
impl ProvidesColumnByName for StructArray { | ||
fn column_by_name(&self, name: &str) -> Option<&Arc<dyn Array>> { | ||
fn column_by_name(&self, name: &str) -> Option<&ArrayRef> { | ||
self.column_by_name(name) | ||
} | ||
} | ||
|
@@ -201,12 +204,11 @@ fn evaluate_expression( | |
.iter() | ||
.zip(output_schema.fields()) | ||
.map(|(expr, field)| evaluate_expression(expr, batch, Some(field.data_type()))); | ||
let output_cols: Vec<Arc<dyn Array>> = columns.try_collect()?; | ||
let output_cols: Vec<ArrayRef> = columns.try_collect()?; | ||
let output_fields: Vec<ArrowField> = output_cols | ||
.iter() | ||
.zip(output_schema.fields()) | ||
.map(|(output_col, output_field)| -> DeltaResult<_> { | ||
ensure_data_types(output_field.data_type(), output_col.data_type())?; | ||
Ok(ArrowField::new( | ||
output_field.name(), | ||
output_col.data_type().clone(), | ||
|
@@ -306,7 +308,7 @@ fn evaluate_expression( | |
let left_arr = evaluate_expression(left.as_ref(), batch, None)?; | ||
let right_arr = evaluate_expression(right.as_ref(), batch, None)?; | ||
|
||
type Operation = fn(&dyn Datum, &dyn Datum) -> Result<Arc<dyn Array>, ArrowError>; | ||
type Operation = fn(&dyn Datum, &dyn Datum) -> Result<ArrayRef, ArrowError>; | ||
let eval: Operation = match op { | ||
Plus => add, | ||
Minus => sub, | ||
|
@@ -350,6 +352,164 @@ fn evaluate_expression( | |
} | ||
} | ||
|
||
// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the | ||
// names of fields, nullable, and metadata in the struct have been transformed to match those in | ||
// schema specified by `schema` | ||
fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<RecordBatch> { | ||
let DataType::Struct(struct_schema) = schema else { | ||
return Err(Error::generic( | ||
"apply_schema at top-level must be passed a struct schema", | ||
)); | ||
}; | ||
let applied = apply_schema_to_struct(array, struct_schema)?; | ||
Ok(applied.into()) | ||
} | ||
|
||
// helper to transform an arrow field+col into the specified target type. If `rename` is specified | ||
// the field will be renamed to the contained `str`. | ||
fn new_field_with_metadata( | ||
field_name: &str, | ||
data_type: &ArrowDataType, | ||
nullable: bool, | ||
metadata: Option<HashMap<String, String>>, | ||
) -> ArrowField { | ||
let mut field = ArrowField::new(field_name, data_type.clone(), nullable); | ||
if let Some(metadata) = metadata { | ||
field.set_metadata(metadata); | ||
}; | ||
field | ||
} | ||
|
||
// A helper that is a wrapper over `transform_field_and_col`. This will take apart the passed struct | ||
// and use that method to transform each column and then put the struct back together. Target types | ||
// and names for each column should be passed in `target_types_and_names`. The number of elements in | ||
// the `target_types_and_names` iterator _must_ be the same as the number of columns in | ||
// `struct_array`. The transformation is ordinal. That is, the order of fields in `target_fields` | ||
// _must_ match the order of the columns in `struct_array`. | ||
fn transform_struct( | ||
struct_array: &StructArray, | ||
target_fields: impl Iterator<Item = impl Borrow<StructField>>, | ||
) -> DeltaResult<StructArray> { | ||
let (_, arrow_cols, nulls) = struct_array.clone().into_parts(); | ||
let input_col_count = arrow_cols.len(); | ||
let result_iter = | ||
arrow_cols | ||
.into_iter() | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.zip(target_fields) | ||
.map(|(sa_col, target_field)| -> DeltaResult<_> { | ||
let target_field = target_field.borrow(); | ||
let transformed_col = apply_schema_to(&sa_col, target_field.data_type())?; | ||
let transformed_field = new_field_with_metadata( | ||
&target_field.name, | ||
transformed_col.data_type(), | ||
target_field.nullable, | ||
Some(target_field.metadata_with_string_values()), | ||
); | ||
Ok((transformed_field, transformed_col)) | ||
}); | ||
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<ArrayRef>) = | ||
result_iter.process_results(|iter| iter.unzip())?; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if transformed_cols.len() != input_col_count { | ||
return Err(Error::InternalError(format!( | ||
"Passed struct had {input_col_count} columns, but transformed column has {}", | ||
transformed_cols.len() | ||
))); | ||
} | ||
Ok(StructArray::try_new( | ||
transformed_fields.into(), | ||
transformed_cols, | ||
nulls, | ||
)?) | ||
} | ||
|
||
// Transform a struct array. The data is in `array`, and the target fields are in `kernel_fields`. | ||
fn apply_schema_to_struct(array: &dyn Array, kernel_fields: &Schema) -> DeltaResult<StructArray> { | ||
let Some(sa) = array.as_struct_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a struct but isn't a StructArray", | ||
)); | ||
}; | ||
transform_struct(sa, kernel_fields.fields()) | ||
} | ||
|
||
// deconstruct the array, then rebuild the mapped version | ||
fn apply_schema_to_list( | ||
array: &dyn Array, | ||
target_inner_type: &ArrayType, | ||
) -> DeltaResult<ListArray> { | ||
let Some(la) = array.as_list_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a list but isn't a ListArray", | ||
)); | ||
}; | ||
let (field, offset_buffer, values, nulls) = la.clone().into_parts(); | ||
|
||
let transformed_values = apply_schema_to(&values, &target_inner_type.element_type)?; | ||
let transformed_field = ArrowField::new( | ||
field.name(), | ||
transformed_values.data_type().clone(), | ||
target_inner_type.contains_null, | ||
); | ||
Ok(ListArray::try_new( | ||
Arc::new(transformed_field), | ||
offset_buffer, | ||
transformed_values, | ||
nulls, | ||
)?) | ||
} | ||
|
||
// deconstruct a map, and rebuild it with the specified target kernel type | ||
fn apply_schema_to_map(array: &dyn Array, kernel_map_type: &MapType) -> DeltaResult<MapArray> { | ||
let Some(ma) = array.as_map_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a map but isn't a MapArray", | ||
)); | ||
}; | ||
let (map_field, offset_buffer, map_struct_array, nulls, ordered) = ma.clone().into_parts(); | ||
let target_fields = map_struct_array | ||
.fields() | ||
.iter() | ||
.zip([&kernel_map_type.key_type, &kernel_map_type.value_type]) | ||
.zip([false, kernel_map_type.value_contains_null]) | ||
.map(|((arrow_field, target_type), nullable)| { | ||
StructField::new(arrow_field.name(), target_type.clone(), nullable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double checking -- no metadata here, because this is a hidden/internal struct that isn't part of the user-visible schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||
}); | ||
|
||
// Arrow puts the key type/val as the first field/col and the value type/val as the second. So | ||
// we just transform like a 'normal' struct, but we know there are two fields/cols and we | ||
// specify the key/value types as the target type iterator. | ||
let transformed_map_struct_array = transform_struct(&map_struct_array, target_fields)?; | ||
|
||
let transformed_map_field = ArrowField::new( | ||
map_field.name().clone(), | ||
transformed_map_struct_array.data_type().clone(), | ||
map_field.is_nullable(), | ||
); | ||
Ok(MapArray::try_new( | ||
Arc::new(transformed_map_field), | ||
offset_buffer, | ||
transformed_map_struct_array, | ||
nulls, | ||
ordered, | ||
)?) | ||
} | ||
|
||
// apply `schema` to `array`. This handles renaming, and adjusting nullability and metadata. if the | ||
// actual data types don't match, this will return an error | ||
fn apply_schema_to(array: &ArrayRef, schema: &DataType) -> DeltaResult<ArrayRef> { | ||
use DataType::*; | ||
let array: ArrayRef = match schema { | ||
Struct(stype) => Arc::new(apply_schema_to_struct(array, stype)?), | ||
Array(atype) => Arc::new(apply_schema_to_list(array, atype)?), | ||
Map(mtype) => Arc::new(apply_schema_to_map(array, mtype)?), | ||
_ => { | ||
ensure_data_types(schema, array.data_type(), true)?; | ||
array.clone() | ||
} | ||
}; | ||
Ok(array) | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct ArrowExpressionHandler; | ||
|
||
|
@@ -380,7 +540,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { | |
let batch = batch | ||
.as_any() | ||
.downcast_ref::<ArrowEngineData>() | ||
.ok_or(Error::engine_data_type("ArrowEngineData"))? | ||
.ok_or_else(|| Error::engine_data_type("ArrowEngineData"))? | ||
.record_batch(); | ||
let _input_schema: ArrowSchema = self.input_schema.as_ref().try_into()?; | ||
// TODO: make sure we have matching schemas for validation | ||
|
@@ -392,13 +552,11 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { | |
// ))); | ||
// }; | ||
let array_ref = evaluate_expression(&self.expression, batch, Some(&self.output_type))?; | ||
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?; | ||
let batch: RecordBatch = if let DataType::Struct(_) = self.output_type { | ||
array_ref | ||
.as_struct_opt() | ||
.ok_or(Error::unexpected_column_type("Expected a struct array"))? | ||
.into() | ||
apply_schema(&array_ref, &self.output_type)? | ||
} else { | ||
let array_ref = apply_schema_to(&array_ref, &self.output_type)?; | ||
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?; | ||
let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]); | ||
RecordBatch::try_new(Arc::new(schema), vec![array_ref])? | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescuing #331 (comment):
Down to 7% now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I've added some more tests and it's up now