Skip to content

Commit

Permalink
feat: collect more errors
Browse files Browse the repository at this point in the history
  • Loading branch information
eliaperantoni committed Dec 10, 2024
1 parent 03afbf6 commit 2d3e2a1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 10 deletions.
64 changes: 64 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub enum DataFusionError {
/// or serializing/deserializing protobytes to Substrait plans
Substrait(String),
Diagnostic(Diagnostic, Box<DataFusionError>),
Collection(Vec<DataFusionError>),
}

#[macro_export]
Expand Down Expand Up @@ -319,6 +320,7 @@ impl Error for DataFusionError {
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
DataFusionError::Collection(_) => None,
}
}
}
Expand Down Expand Up @@ -442,6 +444,7 @@ impl DataFusionError {
DataFusionError::Context(_, _) => "",
DataFusionError::Substrait(_) => "Substrait error: ",
DataFusionError::Diagnostic(_, _) => "",
DataFusionError::Collection(_) => "",
}
}

Expand Down Expand Up @@ -483,6 +486,13 @@ impl DataFusionError {
}
DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()),
DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()),
DataFusionError::Collection(ref v) => Cow::Owned(format!(
"[{}]",
v.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
)),
}
}

Expand Down Expand Up @@ -517,6 +527,60 @@ impl DataFusionError {

DiagnosticsIterator { head: self }
}

pub fn get_individual_errors(&self) -> impl Iterator<Item = &Self> + '_ {
fn contains_collection(err: &DataFusionError) -> bool {
let mut head = err;
loop {
if let DataFusionError::Collection(_) = head {
return true;
}

if let Some(source) = head
.source()
.map(|source| source.downcast_ref::<DataFusionError>())
.flatten()
{
head = source;
} else {
return false;
}
}
}

struct IndividualErrorsIterator<'a> {
queue: Vec<&'a DataFusionError>,
}

impl<'a> Iterator for IndividualErrorsIterator<'a> {
type Item = &'a DataFusionError;

fn next(&mut self) -> Option<Self::Item> {
while let Some(err) = self.queue.pop() {
if !contains_collection(err) {
return Some(err);
}

if let DataFusionError::Collection(errs) = err {
self.queue.extend(errs.iter());
continue;
}

if let Some(source) = err
.source()
.map(|source| source.downcast_ref::<DataFusionError>())
.flatten()
{
self.queue.push(source);
}
}

None
}
}

IndividualErrorsIterator { queue: vec![self] }
}
}

/// Unwrap an `Option` if possible. Otherwise return an `DataFusionError::Internal`.
Expand Down
20 changes: 14 additions & 6 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ pub fn exprlist_to_fields<'a>(
// Look for exact match in plan's output schema
let wildcard_schema = find_base_plan(plan).schema();
let input_schema = plan.schema();
let result = exprs
let (fields, errs) = exprs
.into_iter()
.map(|e| match e {
Expr::Wildcard { qualifier, options } => match qualifier {
Expand Down Expand Up @@ -759,11 +759,19 @@ pub fn exprlist_to_fields<'a>(
},
_ => Ok(vec![e.to_field(input_schema)?]),
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();
Ok(result)
.fold((vec![], vec![]), |(mut fields, mut errs), result| {
match result {
Ok(this_fields) => fields.extend(this_fields),
Err(err) => errs.push(err),
}
(fields, errs)
});

if !errs.is_empty() {
Err(DataFusionError::Collection(errs))
} else {
Ok(fields)
}
}

/// Find the suitable base plan to expand the wildcard expression recursively.
Expand Down
20 changes: 16 additions & 4 deletions datafusion/sql/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,41 @@ pub(crate) fn check_columns_satisfy_exprs(
Expr::Column(_) => Ok(()),
_ => internal_err!("Expr::Column are required"),
})?;
let mut errs = vec![];
let column_exprs = find_column_exprs(exprs);
for e in &column_exprs {
match e {
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
for e in exprs {
check_column_satisfies_expr(columns, e, call_purpose)?;
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
errs.push(err);
}
}
}
Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
for e in exprs {
check_column_satisfies_expr(columns, e, call_purpose)?;
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
errs.push(err);
}
}
}
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
for exprs in lists_of_exprs {
for e in exprs {
check_column_satisfies_expr(columns, e, call_purpose)?;
if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
errs.push(err);
}
}
}
}
_ => check_column_satisfies_expr(columns, e, call_purpose)?,
_ => if let Err(err) = check_column_satisfies_expr(columns, e, call_purpose) {
errs.push(err);
},
}
}
if !errs.is_empty() {
return Err(DataFusionError::Collection(errs));
}
Ok(())
}

Expand Down

0 comments on commit 2d3e2a1

Please sign in to comment.