Skip to content

Commit

Permalink
feat: explain with statistics (#7459)
Browse files Browse the repository at this point in the history
* explain with statistics

* sqllogictest for parquet with statistics
  • Loading branch information
korowa authored Sep 8, 2023
1 parent 93c209f commit 73d6d5f
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 8 deletions.
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ config_namespace! {

/// When set to true, the explain statement will only print physical plans
pub physical_plan_only: bool, default = false

/// When set to true, the explain statement will print operator statistics
/// for physical plans
pub show_statistics: bool, default = false
}
}

Expand Down
21 changes: 21 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! This module provides data structures to represent statistics
use std::fmt::Display;

use crate::ScalarValue;

/// Statistics for a relation
Expand All @@ -37,6 +39,25 @@ pub struct Statistics {
pub is_exact: bool,
}

impl Display for Statistics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.num_rows.is_none() && self.total_byte_size.is_none() && !self.is_exact {
return Ok(());
}

let rows = self
.num_rows
.map_or_else(|| "None".to_string(), |v| v.to_string());
let bytes = self
.total_byte_size
.map_or_else(|| "None".to_string(), |v| v.to_string());

write!(f, "rows={}, bytes={}, exact={}", rows, bytes, self.is_exact)?;

Ok(())
}
}

/// Statistics for a column within a relation
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ColumnStatistics {
Expand Down
18 changes: 16 additions & 2 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use datafusion_execution::TaskContext;
pub struct AnalyzeExec {
/// control how much extra to print
verbose: bool,
/// if statistics should be displayed
show_statistics: bool,
/// The input plan (the plan being analyzed)
pub(crate) input: Arc<dyn ExecutionPlan>,
/// The output schema for RecordBatches of this exec node
Expand All @@ -47,9 +49,15 @@ pub struct AnalyzeExec {

impl AnalyzeExec {
/// Create a new AnalyzeExec
pub fn new(verbose: bool, input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
pub fn new(
verbose: bool,
show_statistics: bool,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> Self {
AnalyzeExec {
verbose,
show_statistics,
input,
schema,
}
Expand Down Expand Up @@ -111,6 +119,7 @@ impl ExecutionPlan for AnalyzeExec {
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(
self.verbose,
self.show_statistics,
children.pop().unwrap(),
self.schema.clone(),
)))
Expand Down Expand Up @@ -143,6 +152,7 @@ impl ExecutionPlan for AnalyzeExec {
let captured_input = self.input.clone();
let captured_schema = self.schema.clone();
let verbose = self.verbose;
let show_statistics = self.show_statistics;

// future that gathers the results from all the tasks in the
// JoinSet that computes the overall row count and final
Expand All @@ -157,6 +167,7 @@ impl ExecutionPlan for AnalyzeExec {
let duration = Instant::now() - start;
create_output_batch(
verbose,
show_statistics,
total_rows,
duration,
captured_input,
Expand All @@ -179,6 +190,7 @@ impl ExecutionPlan for AnalyzeExec {
/// Creates the ouput of AnalyzeExec as a RecordBatch
fn create_output_batch(
verbose: bool,
show_statistics: bool,
total_rows: usize,
duration: std::time::Duration,
input: Arc<dyn ExecutionPlan>,
Expand All @@ -191,6 +203,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Metrics");

let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
Expand All @@ -201,6 +214,7 @@ fn create_output_batch(
type_builder.append_value("Plan with Full Metrics");

let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
.set_show_statistics(show_statistics)
.indent(verbose)
.to_string();
plan_builder.append_value(annotated_plan);
Expand Down Expand Up @@ -245,7 +259,7 @@ mod tests {

let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
let refs = blocking_exec.refs();
let analyze_exec = Arc::new(AnalyzeExec::new(true, blocking_exec, schema));
let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));

let fut = collect(analyze_exec, task_ctx);
let mut fut = fut.boxed();
Expand Down
54 changes: 49 additions & 5 deletions datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,38 +42,49 @@ pub struct DisplayableExecutionPlan<'a> {
inner: &'a dyn ExecutionPlan,
/// How to show metrics
show_metrics: ShowMetrics,
/// If statistics should be displayed
show_statistics: bool,
}

impl<'a> DisplayableExecutionPlan<'a> {
/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways
pub fn new(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::None,
show_statistics: false,
}
}

/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways that also shows aggregated
/// metrics
pub fn with_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::Aggregated,
show_statistics: false,
}
}

/// Create a wrapper around an [`'ExecutionPlan'] which can be
/// Create a wrapper around an [`ExecutionPlan`] which can be
/// pretty printed in a variety of ways that also shows all low
/// level metrics
pub fn with_full_metrics(inner: &'a dyn ExecutionPlan) -> Self {
Self {
inner,
show_metrics: ShowMetrics::Full,
show_statistics: false,
}
}

/// Enable display of statistics
pub fn set_show_statistics(mut self, show_statistics: bool) -> Self {
self.show_statistics = show_statistics;
self
}

/// Return a `format`able structure that produces a single line
/// per node.
///
Expand All @@ -94,6 +105,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
format_type: DisplayFormatType,
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
show_statistics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand All @@ -102,6 +114,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
f,
indent: 0,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
};
accept(self.plan, &mut visitor)
}
Expand All @@ -110,6 +123,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
format_type,
plan: self.inner,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
}
}

Expand All @@ -128,6 +142,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
show_statistics: bool,
}
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand All @@ -137,6 +152,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
f,
t,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
graphviz_builder: GraphvizBuilder::default(),
parents: Vec::new(),
};
Expand All @@ -153,6 +169,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
Wrapper {
plan: self.inner,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
}
}

Expand All @@ -162,6 +179,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
struct Wrapper<'a> {
plan: &'a dyn ExecutionPlan,
show_metrics: ShowMetrics,
show_statistics: bool,
}

impl<'a> fmt::Display for Wrapper<'a> {
Expand All @@ -171,6 +189,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
t: DisplayFormatType::Default,
indent: 0,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
};
visitor.pre_visit(self.plan)?;
Ok(())
Expand All @@ -180,6 +199,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
Wrapper {
plan: self.inner,
show_metrics: self.show_metrics,
show_statistics: self.show_statistics,
}
}

Expand Down Expand Up @@ -215,6 +235,8 @@ struct IndentVisitor<'a, 'b> {
indent: usize,
/// How to show metrics
show_metrics: ShowMetrics,
/// If statistics should be displayed
show_statistics: bool,
}

impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
Expand Down Expand Up @@ -244,6 +266,9 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b> {
}
}
}
if self.show_statistics {
write!(self.f, ", statistics=[{}]", plan.statistics())?;
}
writeln!(self.f)?;
self.indent += 1;
Ok(true)
Expand All @@ -261,6 +286,9 @@ struct GraphvizVisitor<'a, 'b> {
t: DisplayFormatType,
/// How to show metrics
show_metrics: ShowMetrics,
/// If statistics should be displayed
show_statistics: bool,

graphviz_builder: GraphvizBuilder,
/// Used to record parent node ids when visiting a plan.
parents: Vec<usize>,
Expand Down Expand Up @@ -318,8 +346,24 @@ impl ExecutionPlanVisitor for GraphvizVisitor<'_, '_> {
}
};

self.graphviz_builder
.add_node(self.f, id, &label, Some(&metrics))?;
let statistics = if self.show_statistics {
format!("statistics=[{}]", plan.statistics())
} else {
"".to_string()
};

let delimiter = if !metrics.is_empty() && !statistics.is_empty() {
", "
} else {
""
};

self.graphviz_builder.add_node(
self.f,
id,
&label,
Some(&format!("{}{}{}", metrics, delimiter, statistics)),
)?;

if let Some(parent_node_id) = self.parents.last() {
self.graphviz_builder
Expand Down
34 changes: 33 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1892,6 +1892,7 @@ impl DefaultPhysicalPlanner {
Ok(input) => {
stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, InitialPhysicalPlan),
);

Expand All @@ -1903,12 +1904,14 @@ impl DefaultPhysicalPlanner {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans.push(
displayable(plan)
.set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, plan_type),
);
},
) {
Ok(input) => stringified_plans.push(
displayable(input.as_ref())
.set_show_statistics(config.show_statistics)
.to_stringified(e.verbose, FinalPhysicalPlan),
),
Err(DataFusionError::Context(optimizer_name, e)) => {
Expand All @@ -1932,7 +1935,13 @@ impl DefaultPhysicalPlanner {
} else if let LogicalPlan::Analyze(a) = logical_plan {
let input = self.create_physical_plan(&a.input, session_state).await?;
let schema = SchemaRef::new((*a.schema).clone().into());
Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema))))
let show_statistics = session_state.config_options().explain.show_statistics;
Ok(Some(Arc::new(AnalyzeExec::new(
a.verbose,
show_statistics,
input,
schema,
))))
} else {
Ok(None)
}
Expand Down Expand Up @@ -2716,4 +2725,27 @@ digraph {

assert_eq!(expected_graph, generated_graph);
}

#[tokio::test]
async fn test_display_graphviz_with_statistics() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

let logical_plan = scan_empty(Some("employee"), &schema, None)
.unwrap()
.project(vec![col("id") + lit(2)])
.unwrap()
.build()
.unwrap();

let plan = plan(&logical_plan).await.unwrap();

let expected_tooltip = ", tooltip=\"statistics=[";

let generated_graph = format!(
"{}",
displayable(&*plan).set_show_statistics(true).graphviz()
);

assert_contains!(generated_graph, expected_tooltip);
}
}
18 changes: 18 additions & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,3 +807,21 @@ async fn explain_physical_plan_only() {
]];
assert_eq!(expected, actual);
}

#[tokio::test]
async fn csv_explain_analyze_with_statistics() {
let mut config = ConfigOptions::new();
config.explain.physical_plan_only = true;
config.explain.show_statistics = true;
let ctx = SessionContext::with_config(config.into());
register_aggregate_csv_by_sql(&ctx).await;

let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100";
let actual = execute_to_batches(&ctx, sql).await;
let formatted = arrow::util::pretty::pretty_format_batches(&actual)
.unwrap()
.to_string();

// should contain scan statistics
assert_contains!(&formatted, ", statistics=[]");
}
Loading

0 comments on commit 73d6d5f

Please sign in to comment.