Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support distributed EXPLAIN ANALYZE #3908

Merged
merged 7 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod columnar_value;
pub mod error;
mod function;
pub mod logical_plan;
pub mod physical_plan;
pub mod prelude;
mod signature;

use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

Expand All @@ -20,14 +28,6 @@ use api::greptime_proto::v1::AddColumnLocation as Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use physical_plan::PhysicalPlan;
use serde::{Deserialize, Serialize};

pub mod columnar_value;
pub mod error;
mod function;
pub mod logical_plan;
pub mod physical_plan;
pub mod prelude;
mod signature;
use sqlparser_derive::{Visit, VisitMut};

/// new Output struct with output data(previously Output) and output meta
Expand Down
122 changes: 103 additions & 19 deletions src/common/recordbatch/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
Expand All @@ -22,7 +23,10 @@ use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream};
use datafusion::physical_plan::{
accept, displayable, ExecutionPlan, ExecutionPlanVisitor,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::arrow::error::ArrowError;
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -228,7 +232,7 @@ impl RecordBatchStream for RecordBatchStreamAdapter {

fn metrics(&self) -> Option<RecordBatchMetrics> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(*metrics),
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
Expand Down Expand Up @@ -259,11 +263,9 @@ impl Stream for RecordBatchStreamAdapter {
}
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metrics_holder = RecordBatchMetrics::default();
collect_metrics(df_plan, &mut metrics_holder);
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
self.metrics_2 = Metrics::Resolved(metrics_holder);
}
let mut metric_collector = MetricCollector::default();
accept(df_plan.as_ref(), &mut metric_collector).unwrap();
self.metrics_2 = Metrics::Resolved(metric_collector.record_batch_metrics);
}
Poll::Ready(None)
}
Expand All @@ -276,28 +278,110 @@ impl Stream for RecordBatchStreamAdapter {
}
}

fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMetrics) {
if let Some(metrics) = df_plan.metrics() {
metrics.iter().for_each(|m| match m.value() {
MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(),
MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(),
_ => {}
});
/// An [ExecutionPlanVisitor] to collect metrics from a [ExecutionPlan].
#[derive(Default)]
pub struct MetricCollector {
current_level: usize,
pub record_batch_metrics: RecordBatchMetrics,
}

impl ExecutionPlanVisitor for MetricCollector {
type Error = !;
waynexia marked this conversation as resolved.
Show resolved Hide resolved

fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
// skip if no metric available
let Some(metric) = plan.metrics() else {
self.record_batch_metrics.plan_metrics.push(PlanMetrics {
plan: plan.name().to_string(),
level: self.current_level,
metrics: vec![],
});
return Ok(true);
};

// scrape plan metrics
let metric = metric
.aggregate_by_name()
.sorted_for_display()
.timestamps_removed();
let mut plan_metric = PlanMetrics {
plan: displayable(plan).one_line().to_string(),
level: self.current_level,
metrics: Vec::with_capacity(metric.iter().size_hint().0),
};
for m in metric.iter() {
plan_metric
.metrics
.push((m.value().name().to_string(), m.value().as_usize()));

// aggregate high-level metrics
match m.value() {
MetricValue::ElapsedCompute(ec) => {
self.record_batch_metrics.elapsed_compute += ec.value()
}
MetricValue::CurrentMemoryUsage(m) => {
self.record_batch_metrics.memory_usage += m.value()
}
_ => {}
}
}
self.record_batch_metrics.plan_metrics.push(plan_metric);

self.current_level += 1;
Ok(true)
}

for child in df_plan.children() {
collect_metrics(&child, result);
fn post_visit(&mut self, _plan: &dyn ExecutionPlan) -> std::result::Result<bool, Self::Error> {
// the last miuns will underflow
waynexia marked this conversation as resolved.
Show resolved Hide resolved
self.current_level = self.current_level.wrapping_sub(1);
Ok(true)
}
}

/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, Copy)]
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
pub struct RecordBatchMetrics {
// cpu consumption in nanoseconds
// High-level aggregated metrics
/// CPU consumption in nanoseconds
pub elapsed_compute: usize,
// memory used by the plan in bytes
/// Memory used by the plan in bytes
pub memory_usage: usize,
// Detailed per-plan metrics
/// An ordered list of plan metrics, from top to bottom in post-order.
pub plan_metrics: Vec<PlanMetrics>,
}

/// Only display `plan_metrics` with indent ` ` (2 spaces).
impl Display for RecordBatchMetrics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for metric in &self.plan_metrics {
write!(
f,
"{:indent$}{} metrics=[",
" ",
metric.plan.trim_end(),
indent = metric.level * 2,
)?;
for (label, value) in &metric.metrics {
write!(f, "{}: {}, ", label, value)?;
}
writeln!(f, "]")?;
}

Ok(())
}
}

#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
pub struct PlanMetrics {
/// The plan name
pub plan: String,
/// The level of the plan, starts from 0
pub level: usize,
/// An ordered key-value list of metrics.
/// Key is metric label and value is metric value.
pub metrics: Vec<(String, usize)>,
}

enum AsyncRecordBatchStreamAdapterState {
Expand Down
4 changes: 3 additions & 1 deletion src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(never_type)]

pub mod adapter;
pub mod error;
pub mod filter;
Expand Down Expand Up @@ -260,7 +262,7 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
}

fn metrics(&self) -> Option<RecordBatchMetrics> {
self.metrics.load().as_ref().map(|s| *s.as_ref())
self.metrics.load().as_ref().map(|s| s.as_ref().clone())
}
}

Expand Down
Loading