-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
apply a schema to fix column names #331
Changes from 34 commits
5a7c419
31e1977
5acbdb2
1ea7554
54493aa
532a870
a5feb17
91ca99c
29b09f7
451a76d
ada2f7b
1633e49
4af1bbd
e0e3ed6
ce4d3f3
9d3b49d
69c1e27
e232c77
40833db
f7652d5
4113603
7ca2705
39cfe0f
07c56ca
f2f75b8
3dda376
e67360c
085bd19
88b4db3
09bc974
29645a7
927bbaf
b605e5e
2d00469
0b80c0e
a4c6de6
38456d1
5e65247
9020a25
9cbd992
17a5303
ae67479
b2e143a
0b48b4c
65b6218
479758d
efc5193
e02357a
eb786d8
d9af456
622c03f
dd65781
ecbd668
929a187
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,11 @@ | ||
//! Expression handling based on arrow-rs compute kernels. | ||
use std::collections::HashMap; | ||
use std::sync::Arc; | ||
|
||
use arrow_arith::boolean::{and_kleene, is_null, not, or_kleene}; | ||
use arrow_arith::numeric::{add, div, mul, sub}; | ||
use arrow_array::cast::AsArray; | ||
use arrow_array::types::*; | ||
use arrow_array::{types::*, MapArray}; | ||
use arrow_array::{ | ||
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Datum, Decimal128Array, Float32Array, | ||
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, RecordBatch, | ||
|
@@ -21,20 +22,21 @@ use arrow_select::concat::concat; | |
use itertools::Itertools; | ||
|
||
use super::arrow_conversion::LIST_ARRAY_ROOT; | ||
use super::arrow_utils::make_arrow_error; | ||
use crate::engine::arrow_data::ArrowEngineData; | ||
use crate::engine::arrow_utils::ensure_data_types; | ||
use crate::engine::arrow_utils::prim_array_cmp; | ||
use crate::error::{DeltaResult, Error}; | ||
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator}; | ||
use crate::schema::{DataType, PrimitiveType, SchemaRef}; | ||
use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField}; | ||
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler}; | ||
|
||
// TODO leverage scalars / Datum | ||
|
||
fn downcast_to_bool(arr: &dyn Array) -> DeltaResult<&BooleanArray> { | ||
arr.as_any() | ||
.downcast_ref::<BooleanArray>() | ||
.ok_or(Error::generic("expected boolean array")) | ||
.ok_or_else(|| Error::generic("expected boolean array")) | ||
} | ||
|
||
impl Scalar { | ||
|
@@ -154,10 +156,7 @@ fn extract_column<'array, 'path>( | |
) -> Result<&'array Arc<dyn Array>, ArrowError> { | ||
let child = array | ||
.column_by_name(path_step) | ||
.ok_or(ArrowError::SchemaError(format!( | ||
"No such field: {}", | ||
path_step, | ||
)))?; | ||
.ok_or_else(|| ArrowError::SchemaError(format!("No such field: {}", path_step,)))?; | ||
if let Some(next_path_step) = remaining_path_steps.next() { | ||
// This is not the last path step. Drill deeper. | ||
extract_column( | ||
|
@@ -176,10 +175,10 @@ fn column_as_struct<'a>( | |
column: &Option<&'a Arc<dyn Array>>, | ||
) -> Result<&'a StructArray, ArrowError> { | ||
column | ||
.ok_or(ArrowError::SchemaError(format!("No such column: {}", name)))? | ||
.ok_or_else(|| ArrowError::SchemaError(format!("No such column: {}", name)))? | ||
.as_any() | ||
.downcast_ref::<StructArray>() | ||
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name))) | ||
.ok_or_else(|| ArrowError::SchemaError(format!("{} is not a struct", name))) | ||
} | ||
|
||
fn evaluate_expression( | ||
|
@@ -201,7 +200,7 @@ fn evaluate_expression( | |
} else { | ||
batch | ||
.column_by_name(name) | ||
.ok_or(Error::missing_column(name)) | ||
.ok_or_else(|| Error::missing_column(name)) | ||
.cloned() | ||
} | ||
} | ||
|
@@ -215,7 +214,6 @@ fn evaluate_expression( | |
.iter() | ||
.zip(output_schema.fields()) | ||
.map(|(output_col, output_field)| -> DeltaResult<_> { | ||
ensure_data_types(output_field.data_type(), output_col.data_type())?; | ||
Ok(ArrowField::new( | ||
output_field.name(), | ||
output_col.data_type().clone(), | ||
|
@@ -366,6 +364,193 @@ fn evaluate_expression( | |
} | ||
} | ||
|
||
// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the | ||
// names of fields, nullable, and metadata in the struct have been transformed to match those in | ||
// schema specified by `schema` | ||
fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<RecordBatch> { | ||
let DataType::Struct(struct_schema) = schema else { | ||
return Err(Error::generic( | ||
"apply_schema at top-level must be passed a struct schema", | ||
))?; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
let applied = apply_schema_to_struct(array, struct_schema)?; | ||
Ok(applied.into()) | ||
} | ||
|
||
// helper to transform an arrow field+col into the specified target type. If `rename` is specified | ||
// the field will be renamed to the contained `str`. | ||
fn transform_field_and_col( | ||
arrow_field: &Arc<ArrowField>, | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
arrow_col: Arc<dyn Array>, | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
target_type: &DataType, | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nullable: bool, | ||
rename: Option<&str>, | ||
metadata: Option<HashMap<String, String>>, | ||
) -> DeltaResult<(ArrowField, Arc<dyn Array>)> { | ||
let transformed_col = apply_schema_to(&arrow_col, target_type)?.unwrap_or(arrow_col); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. clarifying: if we fail to apply schema to |
||
let transformed_field = arrow_field | ||
.as_ref() | ||
.clone() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this a non-Arc clone? that is, we actually deep clone and the make changes? interesting it isn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clone is never mut, the whole point is to copy a borrowed reference? But at least struct fields break the clone recursion by being Arc. Maybe we should consider doing that as well, so that schema manipulation doesn't deep-clone the whole tree every time? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. However, this current code will clone the original data type twice (!) before replacing it. Recommend instead to just create a new field directly: let mut transformed_field = ArrowField::new(
rename.unwrap_or_else(|| arrow_field.name()),
transformed_col.data_type().clone(),
arrow_field.nullable(),
);
if let Some(metadata) = metadata {
transformed_field.set_metadata(metadata);
}; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Filed as #404 |
||
.with_nullable(nullable) | ||
.with_data_type(transformed_col.data_type().clone()); | ||
let transformed_field = match rename { | ||
Some(name) => transformed_field.with_name(name), | ||
None => transformed_field, | ||
}; | ||
let transformed_field = match metadata { | ||
Some(metadata) => transformed_field.with_metadata(metadata), | ||
None => transformed_field, | ||
}; | ||
Ok((transformed_field, transformed_col)) | ||
} | ||
|
||
// A helper that is a wrapper over `transform_field_and_col`. This will take apart the passed struct | ||
// and use that method to transform each column and then put the struct back together. Target types | ||
// and names for each column should be passed in `target_types_and_names`. The number of elements in | ||
// the `target_types_and_names` iterator _must_ be the same as the number of columns in | ||
// `struct_array`. The transformation is ordinal. That is, the order of fields in `target_fields` | ||
// _must_ match the order of the columns in `struct_array`. | ||
fn transform_struct<'a>( | ||
struct_array: &StructArray, | ||
target_fields: impl Iterator<Item = &'a StructField>, | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) -> DeltaResult<StructArray> { | ||
let (arrow_fields, arrow_cols, nulls) = struct_array.clone().into_parts(); | ||
let input_col_count = arrow_cols.len(); | ||
let result_iter = arrow_fields | ||
.into_iter() | ||
.zip(arrow_cols) | ||
.zip(target_fields) | ||
.map(|((sa_field, sa_col), target_field)| { | ||
transform_field_and_col( | ||
sa_field, | ||
sa_col, | ||
target_field.data_type(), | ||
target_field.nullable, | ||
Some(target_field.name.as_str()), | ||
Some(target_field.metadata_as_string()), | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
}); | ||
let (transformed_fields, transformed_cols): (Vec<ArrowField>, Vec<Arc<dyn Array>>) = | ||
result_iter.process_results(|iter| iter.unzip())?; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if transformed_cols.len() != input_col_count { | ||
return Err(Error::InternalError(format!( | ||
"Passed struct had {input_col_count} columns, but transformed column has {}", | ||
transformed_cols.len() | ||
))); | ||
} | ||
Ok(StructArray::try_new( | ||
transformed_fields.into(), | ||
transformed_cols, | ||
nulls, | ||
)?) | ||
} | ||
|
||
// Transform a struct array. The data is in `sa`, the current fields are in `arrow_fields`, and the | ||
// target fields are in `kernel_fields`. | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn apply_schema_to_struct(array: &dyn Array, kernel_fields: &Schema) -> DeltaResult<StructArray> { | ||
let Some(sa) = array.as_struct_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a struct but isn't a StructArray", | ||
)); | ||
}; | ||
transform_struct(sa, kernel_fields.fields()) | ||
} | ||
|
||
// deconstruct the array, then rebuild the mapped version | ||
fn apply_schema_to_list( | ||
array: &dyn Array, | ||
target_inner_type: &ArrayType, | ||
) -> DeltaResult<ListArray> { | ||
let Some(la) = array.as_list_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a list but isn't a ListArray", | ||
)); | ||
}; | ||
let (field, offset_buffer, values, nulls) = la.clone().into_parts(); | ||
let (transformed_field, transformed_values) = transform_field_and_col( | ||
&field, | ||
values, | ||
&target_inner_type.element_type, | ||
target_inner_type.contains_null, | ||
None, | ||
None, | ||
)?; | ||
Ok(ListArray::try_new( | ||
Arc::new(transformed_field), | ||
offset_buffer, | ||
transformed_values, | ||
nulls, | ||
)?) | ||
} | ||
|
||
// deconstruct a map, and rebuild it with the specified target kernel type | ||
fn apply_schema_to_map(array: &dyn Array, kernel_map_type: &MapType) -> DeltaResult<MapArray> { | ||
let Some(ma) = array.as_map_opt() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a map but isn't a MapArray", | ||
)); | ||
}; | ||
|
||
let ArrowDataType::Map(arrow_map_type, _) = array.data_type() else { | ||
return Err(make_arrow_error( | ||
"Arrow claimed to be a map but doesn't have map DataType", | ||
)); | ||
}; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let (map_field, offset_buffer, map_struct_array, nulls, ordered) = ma.clone().into_parts(); | ||
|
||
let ArrowDataType::Struct(_) = arrow_map_type.data_type() else { | ||
return Err(make_arrow_error("Arrow map type wasn't a struct.")); | ||
}; | ||
|
||
let target_fields: Vec<StructField> = map_struct_array | ||
.fields() | ||
.iter() | ||
.zip([ | ||
(&kernel_map_type.key_type, false), | ||
( | ||
&kernel_map_type.value_type, | ||
kernel_map_type.value_contains_null, | ||
), | ||
]) | ||
.map(|(arrow_field, (target_type, nullable))| { | ||
StructField::new(arrow_field.name(), target_type.clone(), nullable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just double checking -- no metadata here, because this is a hidden/internal struct that isn't part of the user-visible schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||
}) | ||
.collect(); | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Arrow puts the key type/val as the first field/col and the value type/val as the second. So | ||
// we just transform like a 'normal' struct, but we know there are two fields/cols and we | ||
// specify the key/value types as the target type iterator. | ||
let transformed_map_struct_array = transform_struct(&map_struct_array, target_fields.iter())?; | ||
|
||
let transformed_map_field = Arc::new( | ||
map_field | ||
.as_ref() | ||
.clone() | ||
.with_data_type(transformed_map_struct_array.data_type().clone()), | ||
); | ||
Ok(MapArray::try_new( | ||
transformed_map_field, | ||
nicklan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
offset_buffer, | ||
transformed_map_struct_array, | ||
nulls, | ||
ordered, | ||
)?) | ||
} | ||
|
||
// make column `col` with type `arrow_type` look like `kernel_type`. For now this only handles name | ||
// transforms. if the actual data types don't match, this will return an error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about nullability? IIRC we have to read parquet with everything nullable, because parquet can't express the concept of a non-nullable field nesting inside a nullable field. Or did we handle that already by just making everything nullable in our action schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We made all the top level structs nullable in the action schema (which is correct). But we do have non-nullable things inside. So I suspect maybe the final things don't match on nullability if the parquet schema really says everything is nullable. I will test and if we get mismatch we can fix that here too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we do decide to allow non-nullable fields nested inside nullable fields, we'll have to verify that our default engine's row visitor handles it gracefully? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do indeed have nullability mismatches in that the arrow schema says things can be null and our schema says they cannot. I feel like this is something that might need a little thought to fix as it'll slow down metadata parsing if we have to go through and fix up the schema every time with nullability changes, although that will never be exposed to the connector. There's also the issue of if we should adjust the metadata of each schema element, although that's less semantically important. Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code now really makes the schema match the target, both for nullability and for metadata. This isn't cheap so we need to discuss it a bit probably There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we add to this comment and give the result<option<...>> semantics? that is (AFAICT) we return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment -- I think we can simplify the method contract |
||
fn apply_schema_to(array: &dyn Array, schema: &DataType) -> DeltaResult<Option<ArrayRef>> { | ||
use DataType::*; | ||
let array: ArrayRef = match schema { | ||
Struct(stype) => Arc::new(apply_schema_to_struct(array, stype)?), | ||
Array(atype) => Arc::new(apply_schema_to_list(array, atype)?), | ||
Map(mtype) => Arc::new(apply_schema_to_map(array, mtype)?), | ||
_ => return ensure_data_types(schema, array.data_type(), true).map(|_| None), | ||
}; | ||
Ok(Some(array)) | ||
} | ||
|
||
#[derive(Debug)] | ||
pub struct ArrowExpressionHandler; | ||
|
||
|
@@ -396,7 +581,7 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { | |
let batch = batch | ||
.as_any() | ||
.downcast_ref::<ArrowEngineData>() | ||
.ok_or(Error::engine_data_type("ArrowEngineData"))? | ||
.ok_or_else(|| Error::engine_data_type("ArrowEngineData"))? | ||
.record_batch(); | ||
let _input_schema: ArrowSchema = self.input_schema.as_ref().try_into()?; | ||
// TODO: make sure we have matching schemas for validation | ||
|
@@ -408,13 +593,14 @@ impl ExpressionEvaluator for DefaultExpressionEvaluator { | |
// ))); | ||
// }; | ||
let array_ref = evaluate_expression(&self.expression, batch, Some(&self.output_type))?; | ||
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?; | ||
let batch: RecordBatch = if let DataType::Struct(_) = self.output_type { | ||
array_ref | ||
.as_struct_opt() | ||
.ok_or(Error::unexpected_column_type("Expected a struct array"))? | ||
.into() | ||
apply_schema(&array_ref, &self.output_type)? | ||
} else { | ||
let array_ref = match apply_schema_to(&array_ref, &self.output_type)? { | ||
Some(transformed) => transformed, | ||
None => array_ref, // Were a primitive type, we just validated | ||
}; | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let arrow_type: ArrowDataType = ArrowDataType::try_from(&self.output_type)?; | ||
let schema = ArrowSchema::new(vec![ArrowField::new("output", arrow_type, true)]); | ||
RecordBatch::try_new(Arc::new(schema), vec![array_ref])? | ||
scovich marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rescuing #331 (comment):
Down to 7% now...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I've added some more tests and it's up now