Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer data type from schema for Values and add struct coercion to coalesce #12864

Merged
merged 16 commits into from
Oct 24, 2024
1 change: 1 addition & 0 deletions datafusion/expr-common/src/type_coercion/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ fn type_union_resolution_coercion(
}
}

/// Handle type union resolution including struct type and others.
pub fn try_type_union_resolution(data_types: &[DataType]) -> Result<Vec<DataType>> {
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
let err = match try_type_union_resolution_with_struct(data_types) {
Ok(struct_types) => return Ok(struct_types),
Expand Down
39 changes: 34 additions & 5 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,29 @@ impl LogicalPlanBuilder {
})))
}

pub fn values(values: Vec<Vec<Expr>>) -> Result<Self> {
if values.is_empty() {
return plan_err!("Values list cannot be empty");
}
let n_cols = values[0].len();
if n_cols == 0 {
return plan_err!("Values list cannot be zero length");
}
for (i, row) in values.iter().enumerate() {
if row.len() != n_cols {
return plan_err!(
"Inconsistent data length across values list: got {} values in row {} but expected {}",
row.len(),
i,
n_cols
);
}
}

// Infer from data itself
Self::infer_data(values)
}

/// Create a values list based relation, and the schema is inferred from data itself or table schema if provided, consuming
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand All @@ -179,7 +202,10 @@ impl LogicalPlanBuilder {
/// so it's usually better to override the default names with a table alias list.
///
/// If the values include params/binders such as $1, $2, $3, etc, then the `param_data_types` should be provided.
pub fn values(values: Vec<Vec<Expr>>, schema: Option<&DFSchemaRef>) -> Result<Self> {
pub fn values_with_schema(
values: Vec<Vec<Expr>>,
schema: Option<&DFSchemaRef>,
) -> Result<Self> {
if values.is_empty() {
return plan_err!("Values list cannot be empty");
}
Expand All @@ -200,14 +226,17 @@ impl LogicalPlanBuilder {

// Check the type of value against the schema
if let Some(schema) = schema {
Self::infer_from_schema(values, schema)
Self::infer_values_from_schema(values, schema)
} else {
// Infer from data itself
Self::infer_data(values)
}
}

fn infer_from_schema(values: Vec<Vec<Expr>>, schema: &DFSchema) -> Result<Self> {
fn infer_values_from_schema(
values: Vec<Vec<Expr>>,
schema: &DFSchema,
) -> Result<Self> {
let n_cols = values[0].len();
let mut field_types: Vec<DataType> = Vec::with_capacity(n_cols);
for j in 0..n_cols {
Expand Down Expand Up @@ -2364,10 +2393,10 @@ mod tests {
fn test_union_after_join() -> Result<()> {
let values = vec![vec![lit(1)]];

let left = LogicalPlanBuilder::values(values.clone(), None)?
let left = LogicalPlanBuilder::values_with_schema(values.clone(), None)?
.alias("left")?
.build()?;
let right = LogicalPlanBuilder::values(values, None)?
let right = LogicalPlanBuilder::values_with_schema(values, None)?
.alias("right")?
.build()?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ mod tests {
Box::new(lit(1)),
));
let values = vec![vec![expr1, expr2]];
let plan = LogicalPlanBuilder::values(values, None)?.build()?;
let plan = LogicalPlanBuilder::values_with_schema(values, None)?.build()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just values(values) since schema is None?


let expected = "\
Values: (Int32(3) AS Int32(1) + Int32(2), Int32(1) AS Int32(2) - Int32(1))";
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.map_err(|e| e.into())
}?;

LogicalPlanBuilder::values(values, None)?.build()
LogicalPlanBuilder::values_with_schema(values, None)?.build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just values(values) since schema is None?

}
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct PlannerContext {
/// FROM clauses, this should become a suffix of the `outer_query_schema`.
outer_from_schema: Option<DFSchemaRef>,
/// The query schema defined by the table
table_schema: Option<DFSchemaRef>,
create_table_schema: Option<DFSchemaRef>,
}

impl Default for PlannerContext {
Expand All @@ -156,7 +156,7 @@ impl PlannerContext {
ctes: HashMap::new(),
outer_query_schema: None,
outer_from_schema: None,
table_schema: None,
create_table_schema: None,
}
}

Expand Down Expand Up @@ -188,12 +188,12 @@ impl PlannerContext {
&mut self,
mut schema: Option<DFSchemaRef>,
) -> Option<DFSchemaRef> {
std::mem::swap(&mut self.table_schema, &mut schema);
std::mem::swap(&mut self.create_table_schema, &mut schema);
schema
}

pub fn table_schema(&self) -> Option<DFSchemaRef> {
self.table_schema.clone()
self.create_table_schema.clone()
}

// Return a clone of the outer FROM schema
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<_>>>()?;

if schema.fields().is_empty() {
LogicalPlanBuilder::values(values, None)?.build()
LogicalPlanBuilder::values_with_schema(values, None)?.build()
} else {
LogicalPlanBuilder::values(values, Some(&schema))?.build()
LogicalPlanBuilder::values_with_schema(values, Some(&schema))?.build()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This remains correct only for VALUES that are direct input to CREATE TABLE.

}
}
}