Skip to content

Commit

Permalink
physical_plan: move metrics to task context
Browse files Browse the repository at this point in the history
To share physical plans across executions we need to place
metrics in some other place. This patch moves them to the
task context. When plan is scanned metrics set is registered
in the context by the plan address.

To display a plan with metrics one should provide the task context,
where metrics associated with the plan are stored.

Also, fmt errors are fixed and applied several clippy suggestions.
  • Loading branch information
askalt committed Feb 4, 2025
1 parent 82ac356 commit d137f2a
Show file tree
Hide file tree
Showing 94 changed files with 705 additions and 723 deletions.
5 changes: 3 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,12 @@ impl RunOpt {
displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
let task_ctx = state.task_ctx();
let result = collect(physical_plan.clone(), Arc::clone(&task_ctx)).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref(), task_ctx)
.indent(true)
);
if !result.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::metrics::ExecutionPlanMetricsSet;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
Expand All @@ -39,7 +40,6 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::{
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::{
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
execution::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::{
physical_plan::{FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
execution::metrics::ExecutionPlanMetricsSet,
};

use futures::StreamExt;
Expand Down
22 changes: 13 additions & 9 deletions datafusion-examples/examples/parquet_exec_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, PartitionedFile};
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::execution::context::SessionContext;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::execution::metrics::MetricValue;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
execute_stream, visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor,
};
Expand Down Expand Up @@ -52,20 +53,22 @@ async fn main() {
let df = ctx.sql("SELECT * FROM my_table").await.unwrap();
let plan = df.create_physical_plan().await.unwrap();

// Create empty visitor
let mut visitor = ParquetExecVisitor {
file_groups: None,
bytes_scanned: None,
};

// Make sure you execute the plan to collect actual execution statistics.
// For example, in this example the `file_scan_config` is known without executing
// but the `bytes_scanned` would be None if we did not execute.
let mut batch_stream = execute_stream(plan.clone(), ctx.task_ctx()).unwrap();
let task_ctx = ctx.task_ctx();
let mut batch_stream = execute_stream(plan.clone(), Arc::clone(&task_ctx)).unwrap();
while let Some(batch) = batch_stream.next().await {
println!("Batch rows: {}", batch.unwrap().num_rows());
}

// Create empty visitor
let mut visitor = ParquetExecVisitor {
file_groups: None,
bytes_scanned: None,
ctx: task_ctx,
};

visit_execution_plan(plan.as_ref(), &mut visitor).unwrap();

println!(
Expand All @@ -85,6 +88,7 @@ async fn main() {
struct ParquetExecVisitor {
file_groups: Option<Vec<Vec<PartitionedFile>>>,
bytes_scanned: Option<MetricValue>,
ctx: Arc<TaskContext>,
}

impl ExecutionPlanVisitor for ParquetExecVisitor {
Expand All @@ -99,7 +103,7 @@ impl ExecutionPlanVisitor for ParquetExecVisitor {
if let Some(parquet_exec) = maybe_parquet_exec {
self.file_groups = Some(parquet_exec.base_config().file_groups.clone());

let metrics = match parquet_exec.metrics() {
let metrics = match self.ctx.plan_metrics(plan.as_any()) {
None => return Ok(true),
Some(metrics) => metrics,
};
Expand Down
14 changes: 8 additions & 6 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,23 @@ impl Column {
/// where `"foo.BAR"` would be parsed to a reference to column named `foo.BAR`
pub fn from_qualified_name(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false))
.unwrap_or_else(|| Self {
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, false)).unwrap_or(
Self {
relation: None,
name: flat_name,
})
},
)
}

/// Deserialize a fully qualified name string into a column preserving column text case
pub fn from_qualified_name_ignore_case(flat_name: impl Into<String>) -> Self {
let flat_name = flat_name.into();
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true))
.unwrap_or_else(|| Self {
Self::from_idents(&mut parse_identifiers_normalized(&flat_name, true)).unwrap_or(
Self {
relation: None,
name: flat_name,
})
},
)
}

/// return the column's name.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub trait HashValue {
fn hash_one(&self, state: &RandomState) -> u64;
}

impl<'a, T: HashValue + ?Sized> HashValue for &'a T {
impl<T: HashValue + ?Sized> HashValue for &'_ T {
fn hash_one(&self, state: &RandomState) -> u64 {
T::hash_one(self, state)
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}

/// Array Utils
// Array Utils

/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
Expand Down
26 changes: 0 additions & 26 deletions datafusion/core/benches/sql_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ use criterion::Bencher;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::sync::Arc;
use test_utils::tpcds::tpcds_schemas;
use test_utils::tpch::tpch_schemas;
Expand Down Expand Up @@ -95,28 +91,6 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext {
ctx
}

fn register_clickbench_hits_table() -> SessionContext {
let ctx = SessionContext::new();
let rt = Runtime::new().unwrap();

// use an external table for clickbench benchmarks
let path =
if PathBuf::from(format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")).exists() {
format!("{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}")
} else {
format!("{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}")
};

let sql = format!("CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'");

rt.block_on(ctx.sql(&sql)).unwrap();

let count =
rt.block_on(async { ctx.table("hits").await.unwrap().count().await.unwrap() });
assert!(count > 0);
ctx
}

/// Target of this benchmark: control that placeholders replacing does not get slower,
/// if the query does not contain placeholders at all.
fn benchmark_with_param_values_many_columns(ctx: &SessionContext, b: &mut Bencher) {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_execution::metrics::MetricsSet;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::Bytes;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_execution::metrics::MetricsSet;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{Buf, Bytes};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ use arrow_array::RecordBatch;
use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_execution::metrics::MetricsSet;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;

use async_trait::async_trait;
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct CsvReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for CsvReadOptions<'a> {
impl Default for CsvReadOptions<'_> {
fn default() -> Self {
Self::new()
}
Expand Down Expand Up @@ -243,7 +243,7 @@ pub struct ParquetReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for ParquetReadOptions<'a> {
impl Default for ParquetReadOptions<'_> {
fn default() -> Self {
Self {
file_extension: DEFAULT_PARQUET_EXTENSION,
Expand Down Expand Up @@ -312,7 +312,7 @@ pub struct ArrowReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
}

impl<'a> Default for ArrowReadOptions<'a> {
impl Default for ArrowReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
Expand Down Expand Up @@ -357,7 +357,7 @@ pub struct AvroReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
}

impl<'a> Default for AvroReadOptions<'a> {
impl Default for AvroReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
Expand Down Expand Up @@ -409,7 +409,7 @@ pub struct NdJsonReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
}

impl<'a> Default for NdJsonReadOptions<'a> {
impl Default for NdJsonReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
Expand Down
19 changes: 11 additions & 8 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ use datafusion_common::{
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::metrics::MetricsSet;
use datafusion_execution::TaskContext;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
Expand Down Expand Up @@ -1269,7 +1269,6 @@ mod tests {
use super::*;

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::types::Int32Type;
Expand All @@ -1283,6 +1282,7 @@ mod tests {
use datafusion_common::config::ParquetOptions;
use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
use datafusion_execution::metrics::MetricValue;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -1773,10 +1773,10 @@ mod tests {
let task_ctx = ctx.task_ctx();

let _ = collect(exec.clone(), task_ctx.clone()).await?;
let _ = collect(exec_projected.clone(), task_ctx).await?;
let _ = collect(exec_projected.clone(), Arc::clone(&task_ctx)).await?;

assert_bytes_scanned(exec, 671);
assert_bytes_scanned(exec_projected, 73);
assert_bytes_scanned(exec, 671, &task_ctx);
assert_bytes_scanned(exec_projected, 73, &task_ctx);

Ok(())
}
Expand Down Expand Up @@ -2182,9 +2182,12 @@ mod tests {
}
}

fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
.metrics()
fn assert_bytes_scanned(
exec: Arc<dyn ExecutionPlan>,
expected: usize,
ctx: &Arc<TaskContext>,
) {
let actual = ctx.plan_metrics(exec.as_any())
.expect("Metrics not recorded")
.sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
.map(|t| t.as_usize())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub fn split_files(
partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
let chunk_size = partitioned_files.len().div_ceil(n);
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in partitioned_files.drain(..) {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,8 @@ impl ListingTable {
/// This method first checks if the statistics for the given file are already cached.
/// If they are, it returns the cached statistics.
/// If they are not, it infers the statistics from the file and stores them in the cache.
async fn do_collect_statistics<'a>(
&'a self,
async fn do_collect_statistics(
&self,
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ListingTableUrl {
if ignore_subdirectory {
segments
.next()
.map_or(false, |file_name| glob.matches(file_name))
.is_some_and(|file_name| glob.matches(file_name))
} else {
let stripped = segments.join(DELIMITER);
glob.matches(&stripped)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use crate::physical_planner::create_physical_sort_exprs;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::metrics::MetricsSet;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
use datafusion_catalog::Session;
Expand Down
Loading

0 comments on commit d137f2a

Please sign in to comment.