diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs index 0ddd1f0a4112..29be76e56d38 100644 --- a/datafusion/common/src/column.rs +++ b/datafusion/common/src/column.rs @@ -33,8 +33,8 @@ use std::fmt; use std::str::FromStr; /// A named reference to a qualified field in a schema. -#[derive(Debug, Clone, Derivative)] -#[derivative(PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Derivative)] +#[derivative(PartialEq, Eq, Hash, PartialOrd, Ord, Clone)] pub struct Column { /// relation/table reference. pub relation: Option, @@ -254,8 +254,7 @@ impl Column { .flat_map(|s| s.columns_with_unqualified_name(&self.name)) .collect::>(); for using_col in using_columns { - let all_matched = - columns.iter().all(|c| using_col.contains(c)); + let all_matched = columns.iter().all(|c| using_col.contains(c)); // All matched fields belong to the same using column set, in orther words // the same join clause. We simply pick the qualifier from the first match. if all_matched { diff --git a/datafusion/common/src/dfschema/fields_spans.rs b/datafusion/common/src/dfschema/fields_spans.rs index bbfa9f2783a7..c4fd0d8406a0 100644 --- a/datafusion/common/src/dfschema/fields_spans.rs +++ b/datafusion/common/src/dfschema/fields_spans.rs @@ -9,7 +9,7 @@ pub struct FieldsSpans(Vec>); impl FieldsSpans { pub fn empty(field_count: usize) -> Self { - Self((0..field_count).into_iter().map(|_| Vec::new()).collect()) + Self((0..field_count).map(|_| Vec::new()).collect()) } pub fn iter(&self) -> impl Iterator> { @@ -20,7 +20,7 @@ impl FieldsSpans { &self, other: &FieldsSpans, join_type: &JoinType, - left_cols_len: usize, + _left_cols_len: usize, ) -> FieldsSpans { match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { diff --git a/datafusion/common/src/dfschema/mod.rs b/datafusion/common/src/dfschema/mod.rs index aadcfc423d12..eb07d2705c81 100644 --- a/datafusion/common/src/dfschema/mod.rs +++ b/datafusion/common/src/dfschema/mod.rs @@ -20,7 +20,6 @@ mod fields_spans; pub use fields_spans::FieldsSpans; -use std::backtrace::Backtrace; use std::collections::{BTreeSet, HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::hash::Hash; diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index 861fff185c75..46387c982eba 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -28,7 +28,7 @@ use std::sync::Arc; use crate::diagnostic::Diagnostic; use crate::utils::quote_identifier; -use crate::{Column, DFSchema, DiagnosticEntry, DiagnosticEntryKind, TableReference}; +use crate::{Column, DFSchema, TableReference}; #[cfg(feature = "avro")] use apache_avro::Error as AvroError; use arrow::error::ArrowError; @@ -514,8 +514,7 @@ impl DataFusionError { if let Some(source) = self .head .source() - .map(|source| source.downcast_ref::()) - .flatten() + .and_then(|source| source.downcast_ref::()) { self.head = source; } else { @@ -528,7 +527,9 @@ impl DataFusionError { DiagnosticsIterator { head: self } } - pub fn get_individual_errors(&self) -> impl Iterator + '_ { + pub fn get_individual_errors( + &self, + ) -> impl Iterator, &Self)> + '_ { fn contains_collection(err: &DataFusionError) -> bool { let mut head = err; loop { @@ -538,8 +539,7 @@ impl DataFusionError { if let Some(source) = head .source() - .map(|source| source.downcast_ref::()) - .flatten() + .and_then(|source| source.downcast_ref::()) { head = source; } else { @@ -549,29 +549,34 @@ impl DataFusionError { } struct IndividualErrorsIterator<'a> { - queue: Vec<&'a DataFusionError>, + queue: Vec<(&'a DataFusionError, Vec)>, } impl<'a> Iterator for IndividualErrorsIterator<'a> { - type Item = &'a DataFusionError; + type Item = (Vec, &'a DataFusionError); fn next(&mut self) -> Option { - while let Some(err) = self.queue.pop() { + while let Some((err, mut diagnostics_prefix)) = self.queue.pop() { if !contains_collection(err) { - return Some(err); + return Some((diagnostics_prefix, err)); } if let DataFusionError::Collection(errs) = err { - self.queue.extend(errs.iter()); + self.queue.extend( + errs.iter().map(|err| (err, diagnostics_prefix.clone())), + ); continue; } + if let DataFusionError::Diagnostic(diagnostics, _) = err { + diagnostics_prefix.push(diagnostics.clone()); + } + if let Some(source) = err .source() - .map(|source| source.downcast_ref::()) - .flatten() + .and_then(|source| source.downcast_ref::()) { - self.queue.push(source); + self.queue.push((source, diagnostics_prefix)); } } @@ -579,7 +584,9 @@ impl DataFusionError { } } - IndividualErrorsIterator { queue: vec![self] } + IndividualErrorsIterator { + queue: vec![(self, vec![])], + } } } diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 72ce99f78d00..46dea48e1787 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -33,6 +33,7 @@ pub mod alias; pub mod cast; pub mod config; pub mod cse; +pub mod diagnostic; pub mod display; pub mod error; pub mod file_options; @@ -47,15 +48,15 @@ pub mod test_util; pub mod tree_node; pub mod types; pub mod utils; -pub mod diagnostic; pub mod with_span; /// Reexport arrow crate pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, FieldsSpans, + qualified_name, DFSchema, DFSchemaRef, ExprSchema, FieldsSpans, SchemaExt, ToDFSchema, }; +pub use diagnostic::{Diagnostic, DiagnosticEntry, DiagnosticEntryKind}; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, SharedResult, @@ -78,7 +79,6 @@ pub use stats::{ColumnStatistics, Statistics}; pub use table_reference::{ResolvedTableReference, TableReference}; pub use unnest::{RecursionUnnestOption, UnnestOptions}; pub use utils::project_schema; -pub use diagnostic::{Diagnostic, DiagnosticEntry, DiagnosticEntryKind}; pub use with_span::WithSpans; // These are hidden from docs purely to avoid polluting the public view of what this crate exports. diff --git a/datafusion/common/src/with_span.rs b/datafusion/common/src/with_span.rs index d0a953b40578..d0e3c6d5e82b 100644 --- a/datafusion/common/src/with_span.rs +++ b/datafusion/common/src/with_span.rs @@ -1,5 +1,4 @@ use std::{ - cmp::Ordering, fmt::{self, Debug, Display}, ops::{Deref, DerefMut}, }; diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 847fd6339b22..40fbf6c272bd 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -34,7 +34,6 @@ use datafusion_common::{ Diagnostic, DiagnosticEntry, DiagnosticEntryKind, Result, WithSpans, }; use itertools::Itertools; -use sqlparser::tokenizer::Span; /// The type signature of an instantiation of binary operator expression such as /// `lhs + rhs` diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 438662e0642b..fcaf96791952 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -56,6 +56,7 @@ serde_json = { workspace = true } sqlparser = { workspace = true } strum = { version = "0.26.1", features = ["derive"] } strum_macros = "0.26.0" +derivative = { workspace = true } [dev-dependencies] ctor = { workspace = true } diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 5fd4faa06242..f2e5891feb4d 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -38,6 +38,7 @@ use datafusion_common::{ plan_err, Column, DFSchema, HashMap, Result, ScalarValue, TableReference, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; +use derivative::Derivative; use sqlparser::ast::{ display_comma_separated, ExceptSelectItem, ExcludeSelectItem, IlikeSelectItem, NullTreatment, RenameSelectItem, ReplaceSelectElement, @@ -401,11 +402,19 @@ impl Unnest { } /// Alias expression -#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] +#[derive(Clone, Derivative, Debug)] +#[derivative(PartialEq, Eq, PartialOrd, Hash)] pub struct Alias { pub expr: Box, pub relation: Option, pub name: String, + #[derivative( + PartialEq = "ignore", + Hash = "ignore", + PartialOrd = "ignore", + Ord = "ignore" + )] + pub span: Span, } impl Alias { @@ -419,8 +428,13 @@ impl Alias { expr: Box::new(expr), relation: relation.map(|r| r.into()), name: name.into(), + span: Span::empty(), } } + + pub fn with_span(self, span: Span) -> Self { + Self { span, ..self } + } } /// Binary expression @@ -1128,6 +1142,7 @@ impl Expr { relation, name, spans: _, + .. }) => (relation.clone(), name.clone()), Expr::Alias(Alias { relation, name, .. }) => (relation.clone(), name.clone()), _ => (None, self.schema_name().to_string()), @@ -1681,11 +1696,22 @@ impl Expr { } } - pub fn get_spans(&self) -> Option<&Vec> { + pub fn get_span(&self) -> Span { match self { - Expr::Column(Column { spans, .. }) => Some(spans), - Expr::Alias(Alias { expr, .. }) => expr.get_spans(), - _ => None, + Expr::Column(Column { spans, .. }) => match spans.as_slice() { + [] => panic!("No spans for column expr"), + [span] => *span, + _ => panic!("Column expr has more than one span"), + }, + Expr::Alias(Alias { + expr, + span: alias_span, + .. + }) => { + let span = expr.get_span(); + span.union(alias_span) + } + _ => Span::empty(), } } } @@ -1701,6 +1727,7 @@ impl HashNode for Expr { expr: _expr, relation, name, + .. }) => { relation.hash(state); name.hash(state); diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 2c5c83999038..f2557fb11281 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -186,6 +186,7 @@ pub fn create_col_from_scalar_expr( relation: _, name, spans, + .. }) => Ok( Column::new(Some::(subqry_alias.into()), name) .with_spans(spans.iter().copied()), diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index 35d0fc2807db..258991914012 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -23,7 +23,6 @@ use crate::{expr::Sort, Cast, Expr, LogicalPlan, TryCast}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{Column, Result}; -use sqlparser::tokenizer::Span; /// Rewrite sort on aggregate expressions to sort on the column of aggregate output /// For example, `max(x)` is written to `col("max(x)")` diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 6bb909a93111..0a28bc4707c0 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -33,6 +33,7 @@ use datafusion_common::{ }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; use recursive::recursive; +use sqlparser::tokenizer::Span; use std::collections::HashMap; use std::sync::Arc; @@ -405,16 +406,16 @@ impl ExprSchemable for Expr { }) => { let (left_type, left_is_nullable) = left.data_type_and_nullable(schema)?; - let left_type = if let Some(spans) = left.get_spans() { - WithSpans::new(&left_type, spans.iter().copied()) + let left_type = if left.get_span() != Span::empty() { + WithSpans::new(&left_type, [left.get_span()]) } else { (&left_type).into() }; let (right_type, right_is_nullable) = right.data_type_and_nullable(schema)?; - let right_type = if let Some(spans) = right.get_spans() { - WithSpans::new(&right_type, spans.iter().copied()) + let right_type = if right.get_span() != Span::empty() { + WithSpans::new(&right_type, [right.get_span()]) } else { (&right_type).into() }; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 445cf3dea3a6..75a0b3433089 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1543,14 +1543,20 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result Result> { - let metadata = input.schema().metadata().clone(); - let fields_spans = exprs - .iter() - .map(|e| e.get_spans().cloned().unwrap_or_else(|| vec![])) - .collect(); + let fields_spans = exprs.iter().map(|e| vec![e.get_span()]).collect(); let schema = DFSchema::new_with_metadata(exprlist_to_fields(exprs, input)?, metadata)? diff --git a/datafusion/expr/src/tree_node.rs b/datafusion/expr/src/tree_node.rs index eacace5ed046..a017953fd72d 100644 --- a/datafusion/expr/src/tree_node.rs +++ b/datafusion/expr/src/tree_node.rs @@ -127,7 +127,16 @@ impl TreeNode for Expr { expr, relation, name, - }) => f(*expr)?.update_data(|e| e.alias_qualified(relation, name)), + span, + .. + }) => f(*expr)?.update_data(|e| { + let e = e.alias_qualified(relation, name); + if let Expr::Alias(alias) = e { + Expr::Alias(alias.with_span(span)) + } else { + unreachable!(); + } + }), Expr::InSubquery(InSubquery { expr, subquery, diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 225b6098a9eb..bd883b2c2033 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -124,28 +124,23 @@ fn expand_exprlist(input: &LogicalPlan, expr: Vec) -> Result> { ref relation, ref name, ref spans, + .. }) => { if name.eq("*") { let expanded_columns = if let Some(qualifier) = relation { - expand_qualified_wildcard( - qualifier, - input.schema(), - None, - )? + expand_qualified_wildcard(qualifier, input.schema(), None)? } else { - expand_wildcard( - input.schema(), - input, - None, - )? + expand_wildcard(input.schema(), input, None)? }; - projected_expr.extend(expanded_columns.into_iter().map(|mut expr| { - if let Expr::Column(c) = &mut expr { - c.spans = spans.clone(); - } - expr - })); + projected_expr.extend(expanded_columns.into_iter().map( + |mut expr| { + if let Expr::Column(c) = &mut expr { + c.spans = spans.clone(); + } + expr + }, + )); } else { projected_expr.push(e.clone()); } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index b585beb1e7d5..904face71bec 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1001,6 +1001,7 @@ fn project_with_column_index( relation: _, ref name, spans: _, + .. }) if name != schema.field(i).name() => e.alias(schema.field(i).name()), Expr::Alias { .. } | Expr::Column { .. } => e, _ => e.alias(schema.field(i).name()), diff --git a/datafusion/optimizer/src/decorrelate.rs b/datafusion/optimizer/src/decorrelate.rs index 5c519dce8fbb..050084964dba 100644 --- a/datafusion/optimizer/src/decorrelate.rs +++ b/datafusion/optimizer/src/decorrelate.rs @@ -532,7 +532,12 @@ fn proj_exprs_evaluation_result_on_empty_batch( let result_expr = simplifier.simplify(result_expr)?; let expr_name = match expr { Expr::Alias(Alias { name, .. }) => name.to_string(), - Expr::Column(Column { relation: _, name, spans: _ }) => name.to_string(), + Expr::Column(Column { + relation: _, + name, + spans: _, + .. + }) => name.to_string(), _ => expr.schema_name().to_string(), }; expr_result_map_for_count_bug.insert(expr_name, result_expr); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1519c54dbf68..81e60ffd9da5 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -493,6 +493,7 @@ fn merge_consecutive_projections(proj: Projection) -> Result rewrite_expr(*expr, &prev_projection).map(|result| { result.update_data(|expr| Expr::Alias(Alias::new(expr, relation, name))) }), diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 4787ba8d4ca7..992a926cd91f 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -157,10 +157,15 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { .iter() .skip(expr_cnt) .zip(schema.iter()) - .map(|((new_qualifier, new_field, _), (old_qualifier, old_field, _))| { - col(Column::from((new_qualifier, new_field))) - .alias_qualified(old_qualifier.cloned(), old_field.name()) - }) + .map( + |( + (new_qualifier, new_field, _), + (old_qualifier, old_field, _), + )| { + col(Column::from((new_qualifier, new_field))) + .alias_qualified(old_qualifier.cloned(), old_field.name()) + }, + ) .collect::>(); let plan = LogicalPlanBuilder::from(plan) diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 1b9583516ced..adad67031487 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -266,7 +266,7 @@ impl TryFrom<&DFSchema> for protobuf::DfSchema { fn try_from(s: &DFSchema) -> Result { let columns = s .iter() - .map(|(qualifier, field)| { + .map(|(qualifier, field, _)| { Ok(protobuf::DfField { field: Some(field.as_ref().try_into()?), qualifier: qualifier.map(|r| protobuf::ColumnRelation { diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 735221eae6d5..7ee5bd45a058 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -20,8 +20,7 @@ use sqlparser::ast::{Expr as SQLExpr, Ident}; use datafusion_common::{ internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, - DataFusionError, Diagnostic, DiagnosticEntry, DiagnosticEntryKind, Result, - TableReference, + DataFusionError, Result, TableReference, }; use datafusion_expr::planner::PlannerResult; use datafusion_expr::{Case, Expr}; @@ -114,7 +113,7 @@ impl SqlToRel<'_, S> { })?; Ok(Expr::ScalarVariable(ty, var_names)) } else { - let span = ids.last().map(|id| id.span).unwrap_or(Span::empty()); + let span = Span::union_iter(ids.iter().map(|id| id.span)); let ids = ids .into_iter() .map(|id| self.ident_normalizer.normalize(id)) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index c08a9fd44e04..aa4dbb37e117 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -157,7 +157,9 @@ impl SqlToRel<'_, S> { } _ => false, }) { - Some((qualifier, df_field, _)) => Expr::from((qualifier, df_field)), + Some((qualifier, df_field, _)) => { + Expr::from((qualifier, df_field)) + } None => Expr::Column(col), } } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 33b3b97044af..ad30fc3bbb5d 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -394,7 +394,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { col.name, r ), DiagnosticEntryKind::Error, - col.spans.first().copied().unwrap_or(Span::empty()), + col.spans + .first() + .copied() + .unwrap_or(Span::empty()), )]) }) }, diff --git a/datafusion/sql/src/relation/join.rs b/datafusion/sql/src/relation/join.rs index 2ed1197e8fbf..faec947ce5d8 100644 --- a/datafusion/sql/src/relation/join.rs +++ b/datafusion/sql/src/relation/join.rs @@ -16,8 +16,8 @@ // under the License. use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; -use datafusion_common::{not_impl_err, Column, Result}; -use datafusion_expr::{JoinType, LogicalPlan, LogicalPlanBuilder}; +use datafusion_common::{not_impl_err, Column, DataFusionError, Result, ScalarValue}; +use datafusion_expr::{Expr, JoinType, LogicalPlan, LogicalPlanBuilder}; use sqlparser::ast::{Join, JoinConstraint, JoinOperator, TableFactor, TableWithJoins}; use std::collections::HashSet; @@ -26,19 +26,21 @@ impl SqlToRel<'_, S> { &self, t: TableWithJoins, planner_context: &mut PlannerContext, - ) -> Result { + ) -> Result<(LogicalPlan, Option)> { let mut left = if is_lateral(&t.relation) { self.create_relation_subquery(t.relation, planner_context)? } else { self.create_relation(t.relation, planner_context)? }; + let mut ignorable_error = None; let old_outer_from_schema = planner_context.outer_from_schema(); for join in t.joins { planner_context.extend_outer_from_schema(left.schema())?; - left = self.parse_relation_join(left, join, planner_context)?; + (left, ignorable_error) = + self.parse_relation_join(left, join, planner_context)?; } planner_context.set_outer_from_schema(old_outer_from_schema); - Ok(left) + Ok((left, ignorable_error)) } fn parse_relation_join( @@ -46,7 +48,7 @@ impl SqlToRel<'_, S> { left: LogicalPlan, join: Join, planner_context: &mut PlannerContext, - ) -> Result { + ) -> Result<(LogicalPlan, Option)> { let right = if is_lateral_join(&join)? { self.create_relation_subquery(join.relation, planner_context)? } else { @@ -93,7 +95,10 @@ impl SqlToRel<'_, S> { JoinOperator::FullOuter(constraint) => { self.parse_join(left, right, constraint, JoinType::Full, planner_context) } - JoinOperator::CrossJoin => self.parse_cross_join(left, right), + JoinOperator::CrossJoin => { + let plan = self.parse_cross_join(left, right)?; + Ok((plan, None)) + } other => not_impl_err!("Unsupported JOIN operator {other:?}"), } } @@ -113,24 +118,32 @@ impl SqlToRel<'_, S> { constraint: JoinConstraint, join_type: JoinType, planner_context: &mut PlannerContext, - ) -> Result { + ) -> Result<(LogicalPlan, Option)> { match constraint { JoinConstraint::On(sql_expr) => { let join_schema = left.schema().join(right.schema())?; // parse ON expression - let expr = self.sql_to_expr(sql_expr, &join_schema, planner_context)?; - LogicalPlanBuilder::from(left) + let (expr, ignorable_error) = + match self.sql_to_expr(sql_expr, &join_schema, planner_context) { + Ok(expr) => (expr, None), + Err(e) => { + (Expr::Literal(ScalarValue::Boolean(Some(false))), Some(e)) + } + }; + let plan = LogicalPlanBuilder::from(left) .join_on(right, join_type, Some(expr))? - .build() + .build()?; + Ok((plan, ignorable_error)) } JoinConstraint::Using(idents) => { let keys: Vec = idents .into_iter() .map(|x| Column::from_name(self.ident_normalizer.normalize(x))) .collect(); - LogicalPlanBuilder::from(left) + let plan = LogicalPlanBuilder::from(left) .join_using(right, join_type, keys)? - .build() + .build()?; + Ok((plan, None)) } JoinConstraint::Natural => { let left_cols: HashSet<&String> = @@ -143,17 +156,21 @@ impl SqlToRel<'_, S> { .filter(|f| left_cols.contains(f)) .map(Column::from_name) .collect(); - if keys.is_empty() { - self.parse_cross_join(left, right) + let plan = if keys.is_empty() { + self.parse_cross_join(left, right)? } else { LogicalPlanBuilder::from(left) .join_using(right, join_type, keys)? - .build() - } + .build()? + }; + Ok((plan, None)) + } + JoinConstraint::None => { + let plan = LogicalPlanBuilder::from(left) + .join_on(right, join_type, [])? + .build()?; + Ok((plan, None)) } - JoinConstraint::None => LogicalPlanBuilder::from(left) - .join_on(right, join_type, [])? - .build(), } } } diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 45a617daae96..56402aa0353a 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -100,7 +100,14 @@ impl SqlToRel<'_, S> { table_with_joins, alias, } => ( - self.plan_table_with_joins(*table_with_joins, planner_context)?, + { + let (plan, ignorable_error) = + self.plan_table_with_joins(*table_with_joins, planner_context)?; + if let Some(e) = ignorable_error { + return Err(e); + } + plan + }, alias, ), TableFactor::UNNEST { diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index b9ffce0b2af6..af3ed62ed7a8 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -27,7 +27,8 @@ use crate::utils::{ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{ - not_impl_err, plan_err, Diagnostic, DiagnosticEntry, DiagnosticEntryKind, Result, + not_impl_err, plan_err, DataFusionError, Diagnostic, DiagnosticEntry, + DiagnosticEntryKind, Result, }; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; @@ -75,8 +76,17 @@ impl SqlToRel<'_, S> { return not_impl_err!("SORT BY"); } + let mut errs = vec![]; + // Process `from` clause - let plan = self.plan_from_tables(select.from, planner_context)?; + let plan = { + let (plan, ignorable_error) = + self.plan_from_tables(select.from, planner_context)?; + if let Some(err) = ignorable_error { + errs.push(err); + } + plan + }; let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); // Process `where` clause @@ -198,13 +208,21 @@ impl SqlToRel<'_, S> { .is_empty() || !aggr_exprs.is_empty() { - self.aggregate( + match self.aggregate( &base_plan, &select_exprs, having_expr_opt.as_ref(), &group_by_exprs, &aggr_exprs, - )? + ) { + Err(err) => { + errs.push(err); + (base_plan.clone(), select_exprs.clone(), having_expr_opt) + } + Ok((plan, select_exprs_post_aggr, having_expr_post_aggr)) => { + (plan, select_exprs_post_aggr, having_expr_post_aggr) + } + } } else { match having_expr_opt { Some(having_expr) => return plan_err!("HAVING clause references: {having_expr} must appear in the GROUP BY clause or be used in an aggregate function"), @@ -288,7 +306,13 @@ impl SqlToRel<'_, S> { plan }; - self.order_by(plan, order_by_rex) + let plan = self.order_by(plan, order_by_rex)?; + + if !errs.is_empty() { + Err(DataFusionError::Collection(errs)) + } else { + Ok(plan) + } } /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection @@ -537,9 +561,12 @@ impl SqlToRel<'_, S> { &self, mut from: Vec, planner_context: &mut PlannerContext, - ) -> Result { + ) -> Result<(LogicalPlan, Option)> { match from.len() { - 0 => Ok(LogicalPlanBuilder::empty(true).build()?), + 0 => { + let plan = LogicalPlanBuilder::empty(true).build()?; + Ok((plan, None)) + } 1 => { let input = from.remove(0); self.plan_table_with_joins(input, planner_context) @@ -547,24 +574,49 @@ impl SqlToRel<'_, S> { _ => { let mut from = from.into_iter(); - let mut left = LogicalPlanBuilder::from({ - let input = from.next().unwrap(); - self.plan_table_with_joins(input, planner_context)? - }); + let mut ignorable_error = None; + let mut add_ignorable_error = |new_ignorable_err: Option< + DataFusionError, + >| match new_ignorable_err { + None => (), + Some(new_ignorable_err) => { + if ignorable_error.is_none() { + ignorable_error = Some(DataFusionError::Collection(vec![])); + } + if let Some(DataFusionError::Collection(ignorable_errors)) = + &mut ignorable_error + { + ignorable_errors.push(new_ignorable_err); + } + } + }; + + let left = { + let (left, ignorable_error) = self + .plan_table_with_joins(from.next().unwrap(), planner_context)?; + add_ignorable_error(ignorable_error); + left + }; + let mut left = LogicalPlanBuilder::from(left); let old_outer_from_schema = { let left_schema = Some(Arc::clone(left.schema())); planner_context.set_outer_from_schema(left_schema) }; for input in from { // Join `input` with the current result (`left`). - let right = self.plan_table_with_joins(input, planner_context)?; + let right = { + let (right, ignorable_error) = + self.plan_table_with_joins(input, planner_context)?; + add_ignorable_error(ignorable_error); + right + }; left = left.cross_join(right)?; // Update the outer FROM schema. let left_schema = Some(Arc::clone(left.schema())); planner_context.set_outer_from_schema(left_schema); } planner_context.set_outer_from_schema(old_outer_from_schema); - left.build() + Ok((left.build()?, ignorable_error)) } } } @@ -606,6 +658,7 @@ impl SqlToRel<'_, S> { Ok(vec![col]) } SelectItem::ExprWithAlias { expr, alias } => { + let alias_span = alias.span; let select_expr = self.sql_to_expr(expr, plan.schema(), planner_context)?; let col = normalize_col_with_schemas_and_ambiguity_check( @@ -617,7 +670,14 @@ impl SqlToRel<'_, S> { // avoiding adding an alias if the column name is the same. let expr = match &col { Expr::Column(column) if column.name.eq(&name) => col, - _ => col.alias(name), + _ => { + let expr = col.alias(name); + if let Expr::Alias(alias) = expr { + Expr::Alias(alias.with_span(alias_span)) + } else { + unreachable!(); + } + } }; Ok(vec![expr]) } @@ -815,11 +875,7 @@ impl SqlToRel<'_, S> { Diagnostic::new([DiagnosticEntry::new( "GROUP BY clause is here", DiagnosticEntryKind::Note, - Span::union_iter( - group_by_exprs - .iter() - .flat_map(|e| e.get_spans().cloned().unwrap_or_default()), - ), + Span::union_iter(group_by_exprs.iter().map(|e| e.get_span())), )]) }) })?; diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 572ac8221b1c..330fdfc39e83 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1714,7 +1714,14 @@ impl SqlToRel<'_, S> { // Build scan, join with from table if it exists. let mut input_tables = vec![table]; input_tables.extend(from); - let scan = self.plan_from_tables(input_tables, &mut planner_context)?; + let scan = { + let (scan, ignorable_error) = + self.plan_from_tables(input_tables, &mut planner_context)?; + if let Some(ignorable_error) = ignorable_error { + return Err(ignorable_error); + } + scan + }; // Filter let source = match predicate_expr { diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 04b8a99eacc9..b4ada73143e4 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -924,11 +924,13 @@ impl Unparser<'_> { relation: _, name: left_name, spans: _, + .. }), Expr::Column(Column { relation: _, name: right_name, spans: _, + .. }), ) if left_name == right_name => { idents.push(self.new_ident_quoted_if_needs(left_name.to_string())); diff --git a/datafusion/sql/src/unparser/rewrite.rs b/datafusion/sql/src/unparser/rewrite.rs index 68af121a4117..4dc56beb5adc 100644 --- a/datafusion/sql/src/unparser/rewrite.rs +++ b/datafusion/sql/src/unparser/rewrite.rs @@ -311,6 +311,7 @@ pub(super) fn inject_column_aliases( expr: Box::new(expr.clone()), relation, name: col_alias.value, + span: col_alias.span, }) }) .collect::>(); diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 432d04510c6a..fa4ba20c4676 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -123,20 +123,25 @@ pub(crate) fn check_columns_satisfy_exprs( Expr::Column(_) => Ok(()), _ => internal_err!("Expr::Column are required"), })?; + let mut errs = vec![]; let column_exprs = find_column_exprs(exprs); for e in &column_exprs { match e { Expr::GroupingSet(GroupingSet::Rollup(exprs)) => { for e in exprs { - if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) { + if let Err(err) = + check_column_satisfies_expr(columns, e, call_purpose) + { errs.push(err); } } } Expr::GroupingSet(GroupingSet::Cube(exprs)) => { for e in exprs { - if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) { + if let Err(err) = + check_column_satisfies_expr(columns, e, call_purpose) + { errs.push(err); } } @@ -144,15 +149,19 @@ pub(crate) fn check_columns_satisfy_exprs( Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => { for exprs in lists_of_exprs { for e in exprs { - if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) { + if let Err(err) = + check_column_satisfies_expr(columns, e, call_purpose) + { errs.push(err); } } } } - _ => if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) { - errs.push(err); - }, + _ => { + if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) { + errs.push(err); + } + } } } if !errs.is_empty() { @@ -177,11 +186,11 @@ fn check_column_satisfies_expr( let message = match call_purpose { CheckColumnsSatisfyExprsPurpose::GroupBy => format!( "'{}' in projection does not appear in GROUP BY clause", - expr.to_string() + expr ), CheckColumnsSatisfyExprsPurpose::Having => format!( "'{}' in HAVING clause does not appear in GROUP BY clause", - expr.to_string() + expr ), }; err.with_diagnostic(|_| { @@ -189,12 +198,10 @@ fn check_column_satisfies_expr( DiagnosticEntry::new( message, DiagnosticEntryKind::Error, - expr.get_spans() - .and_then(|spans| spans.first().copied()) - .unwrap_or(Span::empty()), + expr.get_span(), ), DiagnosticEntry::new( - format!("Add '{}' to GROUP BY clause", expr.to_string()), + format!("Add '{}' to GROUP BY clause", expr), DiagnosticEntryKind::Help, Span::empty(), ),