Skip to content

Commit

Permalink
fix: return NaN in scalar
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed Apr 16, 2024
1 parent e3e7ad0 commit 3787cd3
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 155 deletions.
2 changes: 1 addition & 1 deletion src/promql/src/extension_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ExtensionPlanner for PromExtensionPlanner {
} else if let Some(node) = node.as_any().downcast_ref::<EmptyMetric>() {
Ok(Some(node.to_execution_plan(session_state, planner)?))
} else if let Some(node) = node.as_any().downcast_ref::<ScalarCalculate>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())?))
} else if let Some(node) = node.as_any().downcast_ref::<HistogramFold>() {
Ok(Some(node.to_execution_plan(physical_inputs[0].clone())))
} else if let Some(node) = node.as_any().downcast_ref::<UnionDistinctOn>() {
Expand Down
168 changes: 119 additions & 49 deletions src/promql/src/extension_plan/scalar_calculate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
Expand All @@ -27,71 +28,68 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
};
use datafusion::prelude::Expr;
use datatypes::arrow::array::{Array, StringArray};
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::compute::{cast_with_options, concat_batches, CastOptions};
use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use futures::{ready, Stream, StreamExt};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;

use super::Millisecond;
use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, Result};

/// `ScalarCalculate` is the custom logical plan to calculate
/// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar)
/// in PromQL, return empty when have multiple time series.
/// The behavior here is different from prometheus. Prometheus will return `NaN`, but we returned empty data.
/// The main reason is to support multi-type calculations (because only `float` type has `NaN`).
/// in PromQL, return NaN when have multiple time series.
/// return the time series as scalar value when only have one time series.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ScalarCalculate {
schema: DFSchemaRef,
input: LogicalPlan,
project_index: (usize, usize),
start: Millisecond,
end: Millisecond,
interval: Millisecond,

time_index: String,
tag_columns: Vec<String>,
exprs: Vec<Expr>,
field_column: String,
input: LogicalPlan,
output_schema: DFSchemaRef,
}

impl ScalarCalculate {
/// create a new `ScalarCalculate` plan
pub fn new(
start: Millisecond,
end: Millisecond,
interval: Millisecond,
input: LogicalPlan,
time_index_column: &str,
time_index: &str,
tag_colunms: &[String],
field_column: &str,
) -> Result<Self> {
let input_schema = input.schema();
let ts_index = input_schema
.index_of_column_by_name(None, time_index_column)
.unwrap_or(None)
.context(ColumnNotFoundSnafu {
col: time_index_column,
})?;
let val_index = input_schema
.index_of_column_by_name(None, field_column)
.unwrap_or(None)
.context(ColumnNotFoundSnafu { col: field_column })?;
let ts_field = input_schema.field(ts_index);
let Ok(ts_field) = input_schema.field_with_unqualified_name(time_index) else {
return ColumnNotFoundSnafu { col: time_index }.fail();
};
let val_field = DFField::new_unqualified(
&format!("scalar({})", field_column),
input_schema.field(val_index).data_type().clone(),
DataType::Float64,
true,
);
let schema = DFSchema::new_with_metadata(
vec![ts_field.clone(), val_field.clone()],
input_schema.metadata().clone(),
)
.context(DataFusionPlanningSnafu)?;
let exprs = vec![
Expr::Column(ts_field.qualified_column()),
Expr::Column(val_field.qualified_column()),
];

Ok(Self {
schema: Arc::new(schema),
input,
project_index: (ts_index, val_index),
start,
end,
interval,
time_index: time_index.to_string(),
tag_columns: tag_colunms.to_vec(),
exprs,
field_column: field_column.to_string(),
input,
output_schema: Arc::new(schema),
})
}

Expand All @@ -100,21 +98,34 @@ impl ScalarCalculate {
}

/// Create a new execution plan from ScalarCalculate
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
pub fn to_execution_plan(
&self,
exec_input: Arc<dyn ExecutionPlan>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let fields: Vec<_> = self
.schema
.output_schema
.fields()
.iter()
.map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable()))
.collect();
let input_schema = exec_input.schema();
let ts_index = input_schema
.index_of(&self.time_index)
.map_err(DataFusionError::ArrowError)?;
let val_index = input_schema
.index_of(&self.field_column)
.map_err(DataFusionError::ArrowError)?;

Arc::new(ScalarCalculateExec {
Ok(Arc::new(ScalarCalculateExec {
start: self.start,
end: self.end,
interval: self.interval,
schema: Arc::new(Schema::new(fields)),
input: exec_input,
project_index: self.project_index,
project_index: (ts_index, val_index),
tag_columns: self.tag_columns.clone(),
metric: ExecutionPlanMetricsSet::new(),
})
}))
}
}

Expand All @@ -128,31 +139,37 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
}

fn schema(&self) -> &DFSchemaRef {
&self.schema
&self.output_schema
}

fn expressions(&self) -> Vec<Expr> {
self.exprs.clone()
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "ScalarCalculate: tags={:?}", self.tag_columns)
}

fn from_template(&self, expr: &[Expr], inputs: &[LogicalPlan]) -> Self {
fn from_template(&self, _expr: &[Expr], inputs: &[LogicalPlan]) -> Self {
assert!(!inputs.is_empty());
ScalarCalculate {
schema: self.schema.clone(),
input: inputs[0].clone(),
project_index: self.project_index,
start: self.start,
end: self.end,
interval: self.interval,
time_index: self.time_index.clone(),
tag_columns: self.tag_columns.clone(),
exprs: expr.to_vec(),
field_column: self.field_column.clone(),
input: inputs[0].clone(),
output_schema: self.output_schema.clone(),
}
}
}

#[derive(Debug, Clone)]
struct ScalarCalculateExec {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
project_index: (usize, usize),
input: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -194,6 +211,9 @@ impl ExecutionPlan for ScalarCalculateExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ScalarCalculateExec {
start: self.start,
end: self.end,
interval: self.interval,
schema: self.schema.clone(),
project_index: self.project_index,
tag_columns: self.tag_columns.clone(),
Expand Down Expand Up @@ -222,6 +242,9 @@ impl ExecutionPlan for ScalarCalculateExec {
.collect();

Ok(Box::pin(ScalarCalculateStream {
start: self.start,
end: self.end,
interval: self.interval,
schema: self.schema.clone(),
project_index: self.project_index,
metric: baseline_metric,
Expand All @@ -239,9 +262,19 @@ impl ExecutionPlan for ScalarCalculateExec {
}

fn statistics(&self) -> Statistics {
let input_stats = self.input.statistics();

let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
let estimated_total_bytes = input_stats
.total_byte_size
.zip(input_stats.num_rows)
.map(|(size, rows)| (size as f64 / rows as f64) * estimated_row_num)
.map(|size| size.floor() as _);

Statistics {
num_rows: None,
total_byte_size: None,
num_rows: Some(estimated_row_num.floor() as _),
total_byte_size: estimated_total_bytes,
// TODO(ruihang): support this column statistics
column_statistics: None,
is_exact: false,
}
Expand All @@ -259,10 +292,14 @@ impl DisplayAs for ScalarCalculateExec {
}

struct ScalarCalculateStream {
start: Millisecond,
end: Millisecond,
interval: Millisecond,
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
tag_indices: Vec<usize>,
/// with format `(ts_index, field_index)`
project_index: (usize, usize),
have_multi_series: bool,
done: bool,
Expand Down Expand Up @@ -324,7 +361,13 @@ impl ScalarCalculateStream {
}

fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> {
let input_batch = input_batch.project(&[self.project_index.0, self.project_index.1])?;
let ts_column = input_batch.column(self.project_index.0).clone();
let val_column = cast_with_options(
input_batch.column(self.project_index.1),
&DataType::Float64,
&CastOptions::default(),
)?;
let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?;
if let Some(batch) = &self.batch {
self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?);
} else {
Expand Down Expand Up @@ -353,7 +396,20 @@ impl Stream for ScalarCalculateStream {
self.done = true;
return match self.batch.take() {
Some(batch) if !self.have_multi_series => Poll::Ready(Some(Ok(batch))),
_ => Poll::Ready(None),
_ => {
let time_array = (self.start..=self.end)
.step_by(self.interval as _)
.collect::<Vec<_>>();
let nums = time_array.len();
let nan_batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(TimestampMillisecondArray::from(time_array)),
Arc::new(Float64Array::from(vec![f64::NAN; nums])),
],
)?;
Poll::Ready(Some(Ok(nan_batch)))
}
};
}
};
Expand Down Expand Up @@ -417,6 +473,9 @@ mod test {
async fn run_test(diff_series: bool, expected: &str) {
let memory_exec = Arc::new(prepare_test_data(diff_series));
let scalar_exec = Arc::new(ScalarCalculateExec {
start: 0,
end: 15_000,
interval: 5000,
tag_columns: vec!["tag1".to_string(), "tag2".to_string()],
input: memory_exec,
schema: Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -454,6 +513,17 @@ mod test {

#[tokio::test]
async fn diff_series() {
run_test(true, "++\n++").await
run_test(
true,
"+---------------------+-----+\
\n| ts | val |\
\n+---------------------+-----+\
\n| 1970-01-01T00:00:00 | NaN |\
\n| 1970-01-01T00:00:05 | NaN |\
\n| 1970-01-01T00:00:10 | NaN |\
\n| 1970-01-01T00:00:15 | NaN |\
\n+---------------------+-----+",
)
.await
}
}
Loading

0 comments on commit 3787cd3

Please sign in to comment.