Skip to content

Commit

Permalink
Improve AggregationFuzzer error reporting (#12832)
Browse files Browse the repository at this point in the history
* Improve AggregationFuzzer error reporting

* simplify

* Update datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs

* fmt
  • Loading branch information
alamb authored Oct 15, 2024
1 parent fce331a commit 377a4c5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 33 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn test_basic_prim_aggr_no_group() {
.table_name("fuzz_table")
.build();

fuzzer.run().await;
fuzzer.run().await
}

/// Fuzz test for `basic prim aggr(sum/sum distinct/max/min/count/avg)` + `group by single int64`
Expand Down
90 changes: 58 additions & 32 deletions datafusion/core/tests/fuzz_cases/aggregation_fuzzer/fuzzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use arrow::util::pretty::pretty_format_batches;
use arrow_array::RecordBatch;
use datafusion_common::{DataFusionError, Result};
use rand::{thread_rng, Rng};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -132,7 +133,20 @@ struct QueryGroup {
}

impl AggregationFuzzer {
/// Run the fuzzer, printing an error and panicking if any of the tasks fail
pub async fn run(&self) {
let res = self.run_inner().await;

if let Err(e) = res {
// Print the error via `Display` so that it displays nicely (the default `unwrap()`
// prints using `Debug` which escapes newlines, and makes multi-line messages
// hard to read
println!("{e}");
panic!("Error!");
}
}

async fn run_inner(&self) -> Result<()> {
let mut join_set = JoinSet::new();
let mut rng = thread_rng();

Expand All @@ -157,16 +171,20 @@ impl AggregationFuzzer {

let tasks = self.generate_fuzz_tasks(query_groups).await;
for task in tasks {
join_set.spawn(async move {
task.run().await;
});
join_set.spawn(async move { task.run().await });
}
}

while let Some(join_handle) = join_set.join_next().await {
// propagate errors
join_handle.unwrap();
join_handle.map_err(|e| {
DataFusionError::Internal(format!(
"AggregationFuzzer task error: {:?}",
e
))
})??;
}
Ok(())
}

async fn generate_fuzz_tasks(
Expand Down Expand Up @@ -237,45 +255,53 @@ struct AggregationFuzzTestTask {
}

impl AggregationFuzzTestTask {
async fn run(&self) {
async fn run(&self) -> Result<()> {
let task_result = run_sql(&self.sql, &self.ctx_with_params.ctx)
.await
.expect("should success to run sql");
self.check_result(&task_result, &self.expected_result);
.map_err(|e| e.context(self.context_error_report()))?;
self.check_result(&task_result, &self.expected_result)
}

// TODO: maybe we should persist the `expected_result` and `task_result`,
// because the readability is not so good if we just print it.
fn check_result(&self, task_result: &[RecordBatch], expected_result: &[RecordBatch]) {
let result = check_equality_of_batches(task_result, expected_result);
if let Err(e) = result {
fn check_result(
&self,
task_result: &[RecordBatch],
expected_result: &[RecordBatch],
) -> Result<()> {
check_equality_of_batches(task_result, expected_result).map_err(|e| {
// If we found inconsistent result, we print the test details for reproducing at first
println!(
"##### AggregationFuzzer error report #####
### Sql:\n{}\n\
### Schema:\n{}\n\
### Session context params:\n{:?}\n\
### Inconsistent row:\n\
- row_idx:{}\n\
- task_row:{}\n\
- expected_row:{}\n\
### Task total result:\n{}\n\
### Expected total result:\n{}\n\
### Input:\n{}\n\
",
self.sql,
self.dataset_ref.batches[0].schema_ref(),
self.ctx_with_params.params,
let message = format!(
"{}\n\
### Inconsistent row:\n\
- row_idx:{}\n\
- task_row:{}\n\
- expected_row:{}\n\
### Task total result:\n{}\n\
### Expected total result:\n{}\n\
",
self.context_error_report(),
e.row_idx,
e.lhs_row,
e.rhs_row,
pretty_format_batches(task_result).unwrap(),
pretty_format_batches(expected_result).unwrap(),
pretty_format_batches(&self.dataset_ref.batches).unwrap(),
);
DataFusionError::Internal(message)
})
}

// Then we just panic
panic!();
}
/// Returns a formatted error message
fn context_error_report(&self) -> String {
format!(
"##### AggregationFuzzer error report #####\n\
### Sql:\n{}\n\
### Schema:\n{}\n\
### Session context params:\n{:?}\n\
### Input:\n{}\n\
",
self.sql,
self.dataset_ref.batches[0].schema_ref(),
self.ctx_with_params.params,
pretty_format_batches(&self.dataset_ref.batches).unwrap(),
)
}
}

0 comments on commit 377a4c5

Please sign in to comment.