From 3787cd30d01c901b6b067d971d5f9e11f6bb7f64 Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Tue, 16 Apr 2024 10:18:38 +0800 Subject: [PATCH] fix: return NaN in scalar --- src/promql/src/extension_plan/planner.rs | 2 +- .../src/extension_plan/scalar_calculate.rs | 168 +++++++---- src/promql/src/planner.rs | 48 +++- .../standalone/common/promql/scalar.result | 261 ++++++++++++------ .../cases/standalone/common/promql/scalar.sql | 23 +- 5 files changed, 347 insertions(+), 155 deletions(-) diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index 279b392b7307..10b6efd438b8 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -49,7 +49,7 @@ impl ExtensionPlanner for PromExtensionPlanner { } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(session_state, planner)?)) } else if let Some(node) = node.as_any().downcast_ref::() { - Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) + Ok(Some(node.to_execution_plan(physical_inputs[0].clone())?)) } else if let Some(node) = node.as_any().downcast_ref::() { Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else if let Some(node) = node.as_any().downcast_ref::() { diff --git a/src/promql/src/extension_plan/scalar_calculate.rs b/src/promql/src/extension_plan/scalar_calculate.rs index 0c8dd48ffa5d..b3249d26d9cf 100644 --- a/src/promql/src/extension_plan/scalar_calculate.rs +++ b/src/promql/src/extension_plan/scalar_calculate.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use datafusion::common::{DFField, DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics}; +use datafusion::error::DataFusionError; use datafusion::execution::context::TaskContext; use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion::physical_expr::PhysicalSortExpr; @@ -27,53 +28,51 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, }; use datafusion::prelude::Expr; -use datatypes::arrow::array::{Array, StringArray}; -use datatypes::arrow::compute::concat_batches; -use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; +use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::compute::{cast_with_options, concat_batches, CastOptions}; +use datatypes::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use futures::{ready, Stream, StreamExt}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; +use super::Millisecond; use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, Result}; /// `ScalarCalculate` is the custom logical plan to calculate /// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar) -/// in PromQL, return empty when have multiple time series. -/// The behavior here is different from prometheus. Prometheus will return `NaN`, but we returned empty data. -/// The main reason is to support multi-type calculations (because only `float` type has `NaN`). +/// in PromQL, return NaN when have multiple time series. /// return the time series as scalar value when only have one time series. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ScalarCalculate { - schema: DFSchemaRef, - input: LogicalPlan, - project_index: (usize, usize), + start: Millisecond, + end: Millisecond, + interval: Millisecond, + + time_index: String, tag_columns: Vec, - exprs: Vec, + field_column: String, + input: LogicalPlan, + output_schema: DFSchemaRef, } impl ScalarCalculate { /// create a new `ScalarCalculate` plan pub fn new( + start: Millisecond, + end: Millisecond, + interval: Millisecond, input: LogicalPlan, - time_index_column: &str, + time_index: &str, tag_colunms: &[String], field_column: &str, ) -> Result { let input_schema = input.schema(); - let ts_index = input_schema - .index_of_column_by_name(None, time_index_column) - .unwrap_or(None) - .context(ColumnNotFoundSnafu { - col: time_index_column, - })?; - let val_index = input_schema - .index_of_column_by_name(None, field_column) - .unwrap_or(None) - .context(ColumnNotFoundSnafu { col: field_column })?; - let ts_field = input_schema.field(ts_index); + let Ok(ts_field) = input_schema.field_with_unqualified_name(time_index) else { + return ColumnNotFoundSnafu { col: time_index }.fail(); + }; let val_field = DFField::new_unqualified( &format!("scalar({})", field_column), - input_schema.field(val_index).data_type().clone(), + DataType::Float64, true, ); let schema = DFSchema::new_with_metadata( @@ -81,17 +80,16 @@ impl ScalarCalculate { input_schema.metadata().clone(), ) .context(DataFusionPlanningSnafu)?; - let exprs = vec![ - Expr::Column(ts_field.qualified_column()), - Expr::Column(val_field.qualified_column()), - ]; Ok(Self { - schema: Arc::new(schema), - input, - project_index: (ts_index, val_index), + start, + end, + interval, + time_index: time_index.to_string(), tag_columns: tag_colunms.to_vec(), - exprs, + field_column: field_column.to_string(), + input, + output_schema: Arc::new(schema), }) } @@ -100,21 +98,34 @@ impl ScalarCalculate { } /// Create a new execution plan from ScalarCalculate - pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + pub fn to_execution_plan( + &self, + exec_input: Arc, + ) -> DataFusionResult> { let fields: Vec<_> = self - .schema + .output_schema .fields() .iter() .map(|field| Field::new(field.name(), field.data_type().clone(), field.is_nullable())) .collect(); + let input_schema = exec_input.schema(); + let ts_index = input_schema + .index_of(&self.time_index) + .map_err(DataFusionError::ArrowError)?; + let val_index = input_schema + .index_of(&self.field_column) + .map_err(DataFusionError::ArrowError)?; - Arc::new(ScalarCalculateExec { + Ok(Arc::new(ScalarCalculateExec { + start: self.start, + end: self.end, + interval: self.interval, schema: Arc::new(Schema::new(fields)), input: exec_input, - project_index: self.project_index, + project_index: (ts_index, val_index), tag_columns: self.tag_columns.clone(), metric: ExecutionPlanMetricsSet::new(), - }) + })) } } @@ -128,31 +139,37 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate { } fn schema(&self) -> &DFSchemaRef { - &self.schema + &self.output_schema } fn expressions(&self) -> Vec { - self.exprs.clone() + vec![] } fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "ScalarCalculate: tags={:?}", self.tag_columns) } - fn from_template(&self, expr: &[Expr], inputs: &[LogicalPlan]) -> Self { + fn from_template(&self, _expr: &[Expr], inputs: &[LogicalPlan]) -> Self { assert!(!inputs.is_empty()); ScalarCalculate { - schema: self.schema.clone(), - input: inputs[0].clone(), - project_index: self.project_index, + start: self.start, + end: self.end, + interval: self.interval, + time_index: self.time_index.clone(), tag_columns: self.tag_columns.clone(), - exprs: expr.to_vec(), + field_column: self.field_column.clone(), + input: inputs[0].clone(), + output_schema: self.output_schema.clone(), } } } #[derive(Debug, Clone)] struct ScalarCalculateExec { + start: Millisecond, + end: Millisecond, + interval: Millisecond, schema: SchemaRef, project_index: (usize, usize), input: Arc, @@ -194,6 +211,9 @@ impl ExecutionPlan for ScalarCalculateExec { children: Vec>, ) -> DataFusionResult> { Ok(Arc::new(ScalarCalculateExec { + start: self.start, + end: self.end, + interval: self.interval, schema: self.schema.clone(), project_index: self.project_index, tag_columns: self.tag_columns.clone(), @@ -222,6 +242,9 @@ impl ExecutionPlan for ScalarCalculateExec { .collect(); Ok(Box::pin(ScalarCalculateStream { + start: self.start, + end: self.end, + interval: self.interval, schema: self.schema.clone(), project_index: self.project_index, metric: baseline_metric, @@ -239,9 +262,19 @@ impl ExecutionPlan for ScalarCalculateExec { } fn statistics(&self) -> Statistics { + let input_stats = self.input.statistics(); + + let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64; + let estimated_total_bytes = input_stats + .total_byte_size + .zip(input_stats.num_rows) + .map(|(size, rows)| (size as f64 / rows as f64) * estimated_row_num) + .map(|size| size.floor() as _); + Statistics { - num_rows: None, - total_byte_size: None, + num_rows: Some(estimated_row_num.floor() as _), + total_byte_size: estimated_total_bytes, + // TODO(ruihang): support this column statistics column_statistics: None, is_exact: false, } @@ -259,10 +292,14 @@ impl DisplayAs for ScalarCalculateExec { } struct ScalarCalculateStream { + start: Millisecond, + end: Millisecond, + interval: Millisecond, schema: SchemaRef, input: SendableRecordBatchStream, metric: BaselineMetrics, tag_indices: Vec, + /// with format `(ts_index, field_index)` project_index: (usize, usize), have_multi_series: bool, done: bool, @@ -324,7 +361,13 @@ impl ScalarCalculateStream { } fn append_batch(&mut self, input_batch: RecordBatch) -> DataFusionResult<()> { - let input_batch = input_batch.project(&[self.project_index.0, self.project_index.1])?; + let ts_column = input_batch.column(self.project_index.0).clone(); + let val_column = cast_with_options( + input_batch.column(self.project_index.1), + &DataType::Float64, + &CastOptions::default(), + )?; + let input_batch = RecordBatch::try_new(self.schema.clone(), vec![ts_column, val_column])?; if let Some(batch) = &self.batch { self.batch = Some(concat_batches(&self.schema, vec![batch, &input_batch])?); } else { @@ -353,7 +396,20 @@ impl Stream for ScalarCalculateStream { self.done = true; return match self.batch.take() { Some(batch) if !self.have_multi_series => Poll::Ready(Some(Ok(batch))), - _ => Poll::Ready(None), + _ => { + let time_array = (self.start..=self.end) + .step_by(self.interval as _) + .collect::>(); + let nums = time_array.len(); + let nan_batch = RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(time_array)), + Arc::new(Float64Array::from(vec![f64::NAN; nums])), + ], + )?; + Poll::Ready(Some(Ok(nan_batch))) + } }; } }; @@ -417,6 +473,9 @@ mod test { async fn run_test(diff_series: bool, expected: &str) { let memory_exec = Arc::new(prepare_test_data(diff_series)); let scalar_exec = Arc::new(ScalarCalculateExec { + start: 0, + end: 15_000, + interval: 5000, tag_columns: vec!["tag1".to_string(), "tag2".to_string()], input: memory_exec, schema: Arc::new(Schema::new(vec![ @@ -454,6 +513,17 @@ mod test { #[tokio::test] async fn diff_series() { - run_test(true, "++\n++").await + run_test( + true, + "+---------------------+-----+\ + \n| ts | val |\ + \n+---------------------+-----+\ + \n| 1970-01-01T00:00:00 | NaN |\ + \n| 1970-01-01T00:00:05 | NaN |\ + \n| 1970-01-01T00:00:10 | NaN |\ + \n| 1970-01-01T00:00:15 | NaN |\ + \n+---------------------+-----+", + ) + .await } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 0e691a7bea97..36d86cce0a06 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -97,7 +97,7 @@ impl LiteralExpr { match self { LiteralExpr::Const(expr) => expr.clone(), // the plan is `ScalarCalculate` plan with schema like `[ts, expr]`, so return the second expr - LiteralExpr::Runtime(plan) => plan.expressions()[1].clone(), + LiteralExpr::Runtime(plan) => DfExpr::Column(plan.schema().field(1).qualified_column()), } } } @@ -339,6 +339,8 @@ impl PromPlanner { is_comparison_op, should_return_bool, *op, + true, + true, )? } // e.g. scalar(...) + scalar(...) @@ -371,6 +373,8 @@ impl PromPlanner { is_comparison_op, should_return_bool, *op, + false, + false, )? } // e.g. metircs + scalar(...) @@ -385,8 +389,6 @@ impl PromPlanner { .qualifier() .map(|x| x.to_owned_reference()) .unwrap_or_else(|| OwnedTableReference::bare("rhs")); - // clear the tag value on lhs_plan to avoid join on tag column - self.ctx.tag_columns.clear(); let left_field_columns = self.ctx.field_columns.clone(); let right_field_columns = vec![rhs_plan.schema().field(1).name().clone()]; self.join_time_series( @@ -399,6 +401,8 @@ impl PromPlanner { is_comparison_op, should_return_bool, *op, + true, + false, )? } // e.g. scalar(...) + metircs @@ -413,8 +417,6 @@ impl PromPlanner { let right_table_ref = self .table_ref() .unwrap_or_else(|_| OwnedTableReference::bare("rhs")); - // clear the tag value on lhs_plan to avoid join on tag column - self.ctx.tag_columns.clear(); let left_field_columns = vec![lhs_plan.schema().field(1).name().clone()]; let right_field_columns = self.ctx.field_columns.clone(); self.join_time_series( @@ -427,6 +429,8 @@ impl PromPlanner { is_comparison_op, should_return_bool, *op, + false, + true, )? } // e.g. `scalar(...) + 1` @@ -748,13 +752,19 @@ impl PromPlanner { is_comparison_op: bool, should_return_bool: bool, op: TokenType, + is_tag_on_left_plan: bool, + is_tag_on_right_plan: bool, ) -> Result { // normal join if left_table_ref == right_table_ref { // rename table references to avoid ambiguity left_table_ref = OwnedTableReference::bare("lhs"); right_table_ref = OwnedTableReference::bare("rhs"); - self.ctx.table_name = Some("lhs".to_string()); + if is_tag_on_left_plan { + self.ctx.table_name = Some("lhs".to_string()); + } else { + self.ctx.table_name = Some("rhs".to_string()); + } } let mut field_columns = left_field_columns.zip(right_field_columns); let join_plan = self.join_on_non_field_columns( @@ -762,6 +772,7 @@ impl PromPlanner { right_input, left_table_ref.clone(), right_table_ref.clone(), + !is_tag_on_left_plan || !is_tag_on_right_plan, )?; let join_plan_schema = join_plan.schema().clone(); @@ -1610,6 +1621,9 @@ impl PromPlanner { ); let scalar_plan = LogicalPlan::Extension(Extension { node: Arc::new(ScalarCalculate::new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, input, self.ctx.time_index_column.as_ref().unwrap(), &self.ctx.tag_columns, @@ -1678,6 +1692,8 @@ impl PromPlanner { is_comparison_op, should_return_bool, *op, + false, + false, )?; return Ok(Some(LiteralExpr::Runtime(plan))); } @@ -1698,7 +1714,8 @@ impl PromPlanner { } (LiteralExpr::Const(_), LiteralExpr::Runtime(plan)) | (LiteralExpr::Runtime(plan), LiteralExpr::Const(_)) => { - let time_index = plan.expressions()[0].clone(); + let time_index = + DfExpr::Column(plan.schema().field(0).qualified_column()); self.ctx.field_columns = vec![expr.display_name().context(DataFusionPlanningSnafu)?]; LiteralExpr::Runtime( @@ -1812,19 +1829,24 @@ impl PromPlanner { } /// Build a inner join on time index column and tag columns to concat two logical plans. + /// When `only_join_time_index == true` we only join on the time index, because these two plan may not have the same tag columns fn join_on_non_field_columns( &self, left: LogicalPlan, right: LogicalPlan, left_table_ref: OwnedTableReference, right_table_ref: OwnedTableReference, + only_join_time_index: bool, ) -> Result { - let mut tag_columns = self - .ctx - .tag_columns - .iter() - .map(Column::from_name) - .collect::>(); + let mut tag_columns = if only_join_time_index { + vec![] + } else { + self.ctx + .tag_columns + .iter() + .map(Column::from_name) + .collect::>() + }; // push time index column if it exist if let Some(time_index_column) = &self.ctx.time_index_column { diff --git a/tests/cases/standalone/common/promql/scalar.result b/tests/cases/standalone/common/promql/scalar.result index 03216c7faff9..0a3d5be95a62 100644 --- a/tests/cases/standalone/common/promql/scalar.result +++ b/tests/cases/standalone/common/promql/scalar.result @@ -25,10 +25,10 @@ TQL EVAL (0, 15, '5s') scalar(host{host="host1"}); +---------------------+-------------+ | ts | scalar(val) | +---------------------+-------------+ -| 1970-01-01T00:00:00 | 1 | -| 1970-01-01T00:00:05 | 3 | -| 1970-01-01T00:00:10 | 5 | -| 1970-01-01T00:00:15 | 7 | +| 1970-01-01T00:00:00 | 1.0 | +| 1970-01-01T00:00:05 | 3.0 | +| 1970-01-01T00:00:10 | 5.0 | +| 1970-01-01T00:00:15 | 7.0 | +---------------------+-------------+ -- SQLNESS SORT_RESULT 3 1 @@ -61,68 +61,68 @@ TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"}); +---------------------+-----------------------------------+ | ts | lhs.scalar(val) + rhs.scalar(val) | +---------------------+-----------------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 15 | +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | +---------------------+-----------------------------------+ -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"}); -+---------------------+---------------------------+ -| ts | lhs.val + rhs.scalar(val) | -+---------------------+---------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 15 | -+---------------------+---------------------------+ ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 3.0 | +| host1 | 1970-01-01T00:00:05 | 7.0 | +| host1 | 1970-01-01T00:00:10 | 11.0 | +| host1 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ -- SQLNESS SORT_RESULT 3 1 -TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); +TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"}; -+---------------------+---------------------------+ -| ts | lhs.val + rhs.scalar(val) | -+---------------------+---------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:00 | 4 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:05 | 8 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:10 | 12 | -| 1970-01-01T00:00:15 | 15 | -| 1970-01-01T00:00:15 | 16 | -+---------------------+---------------------------+ ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | 1970-01-01T00:00:05 | 7.0 | +| host2 | 1970-01-01T00:00:10 | 11.0 | +| host2 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ -- SQLNESS SORT_RESULT 3 1 - -TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"}; +TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); -+---------------------+---------------------------+ -| ts | lhs.scalar(val) + rhs.val | -+---------------------+---------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 15 | -+---------------------+---------------------------+ ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 3.0 | +| host1 | 1970-01-01T00:00:05 | 7.0 | +| host1 | 1970-01-01T00:00:10 | 11.0 | +| host1 | 1970-01-01T00:00:15 | 15.0 | +| host2 | 1970-01-01T00:00:00 | 4.0 | +| host2 | 1970-01-01T00:00:05 | 8.0 | +| host2 | 1970-01-01T00:00:10 | 12.0 | +| host2 | 1970-01-01T00:00:15 | 16.0 | ++-------+---------------------+---------------------------+ -- SQLNESS SORT_RESULT 3 1 + TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host; -+---------------------+---------------------------+ -| ts | lhs.scalar(val) + rhs.val | -+---------------------+---------------------------+ -| 1970-01-01T00:00:00 | 2 | -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 6 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 10 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 14 | -| 1970-01-01T00:00:15 | 15 | -+---------------------+---------------------------+ ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 6.0 | +| host1 | 1970-01-01T00:00:10 | 10.0 | +| host1 | 1970-01-01T00:00:15 | 14.0 | +| host2 | 1970-01-01T00:00:00 | 3.0 | +| host2 | 1970-01-01T00:00:05 | 7.0 | +| host2 | 1970-01-01T00:00:10 | 11.0 | +| host2 | 1970-01-01T00:00:15 | 15.0 | ++-------+---------------------+---------------------------+ -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host))); @@ -130,10 +130,10 @@ TQL EVAL (0, 15, '5s') scalar(count(count(host) by (host))); +---------------------+--------------------------------+ | ts | scalar(COUNT(COUNT(host.val))) | +---------------------+--------------------------------+ -| 1970-01-01T00:00:00 | 2 | -| 1970-01-01T00:00:05 | 2 | -| 1970-01-01T00:00:10 | 2 | -| 1970-01-01T00:00:15 | 2 | +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 2.0 | +| 1970-01-01T00:00:10 | 2.0 | +| 1970-01-01T00:00:15 | 2.0 | +---------------------+--------------------------------+ -- SQLNESS SORT_RESULT 3 1 @@ -142,10 +142,10 @@ TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host{host="host2"})); +---------------------+-----------------------------------+ | ts | scalar(lhs.val + rhs.scalar(val)) | +---------------------+-----------------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 15 | +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | +---------------------+-----------------------------------+ -- SQLNESS SORT_RESULT 3 1 @@ -154,10 +154,10 @@ TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host{host="host1"}); +---------------------+-----------------------------------+ | ts | scalar(lhs.scalar(val) + rhs.val) | +---------------------+-----------------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:15 | 15 | +| 1970-01-01T00:00:00 | 3.0 | +| 1970-01-01T00:00:05 | 7.0 | +| 1970-01-01T00:00:10 | 11.0 | +| 1970-01-01T00:00:15 | 15.0 | +---------------------+-----------------------------------+ -- SQLNESS SORT_RESULT 3 1 @@ -166,14 +166,10 @@ TQL EVAL (0, 15, '5s') scalar(host + scalar(host{host="host2"})); +---------------------+-----------------------------------+ | ts | scalar(lhs.val + rhs.scalar(val)) | +---------------------+-----------------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:00 | 4 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:05 | 8 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:10 | 12 | -| 1970-01-01T00:00:15 | 15 | -| 1970-01-01T00:00:15 | 16 | +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | +---------------------+-----------------------------------+ -- SQLNESS SORT_RESULT 3 1 @@ -182,41 +178,128 @@ TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host); +---------------------+-----------------------------------+ | ts | scalar(lhs.scalar(val) + rhs.val) | +---------------------+-----------------------------------+ -| 1970-01-01T00:00:00 | 3 | -| 1970-01-01T00:00:00 | 4 | -| 1970-01-01T00:00:05 | 7 | -| 1970-01-01T00:00:05 | 8 | -| 1970-01-01T00:00:10 | 11 | -| 1970-01-01T00:00:10 | 12 | -| 1970-01-01T00:00:15 | 15 | -| 1970-01-01T00:00:15 | 16 | +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | +---------------------+-----------------------------------+ -- case have multiple time series, scalar return NaN +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host); -++ -++ ++---------------------+-------------+ +| ts | scalar(val) | ++---------------------+-------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-------------+ +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host) + 1; -++ -++ ++---------------------+--------------------------+ +| ts | scalar(val) + Float64(1) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+--------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host); + ++---------------------+--------------------------+ +| ts | Float64(1) + scalar(val) | ++---------------------+--------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+--------------------------+ +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host) + scalar(host); -++ -++ ++---------------------+-----------------------------------+ +| ts | lhs.scalar(val) + rhs.scalar(val) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') host + scalar(host); -++ -++ ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | NaN | +| host1 | 1970-01-01T00:00:05 | NaN | +| host1 | 1970-01-01T00:00:10 | NaN | +| host1 | 1970-01-01T00:00:15 | NaN | +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host1 | 1970-01-01T00:00:00 | NaN | +| host1 | 1970-01-01T00:00:05 | NaN | +| host1 | 1970-01-01T00:00:10 | NaN | +| host1 | 1970-01-01T00:00:15 | NaN | +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host); + ++-------+---------------------+---------------------------+ +| host | ts | lhs.val + rhs.scalar(val) | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"}; + ++-------+---------------------+---------------------------+ +| host | ts | lhs.scalar(val) + rhs.val | ++-------+---------------------+---------------------------+ +| host2 | 1970-01-01T00:00:00 | NaN | +| host2 | 1970-01-01T00:00:05 | NaN | +| host2 | 1970-01-01T00:00:10 | NaN | +| host2 | 1970-01-01T00:00:15 | NaN | ++-------+---------------------+---------------------------+ + +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host)); -++ -++ ++---------------------+-----------------------------------+ +| ts | scalar(lhs.val + rhs.scalar(val)) | ++---------------------+-----------------------------------+ +| 1970-01-01T00:00:00 | NaN | +| 1970-01-01T00:00:05 | NaN | +| 1970-01-01T00:00:10 | NaN | +| 1970-01-01T00:00:15 | NaN | ++---------------------+-----------------------------------+ -- error case TQL EVAL (0, 15, '5s') scalar(1 + scalar(host{host="host2"})); diff --git a/tests/cases/standalone/common/promql/scalar.sql b/tests/cases/standalone/common/promql/scalar.sql index a0a9c8f5d7d6..3c4516422657 100644 --- a/tests/cases/standalone/common/promql/scalar.sql +++ b/tests/cases/standalone/common/promql/scalar.sql @@ -31,12 +31,12 @@ TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + scalar(host{host="host2"}); -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') host{host="host1"} + scalar(host{host="host2"}); --- SQLNESS SORT_RESULT 3 1 -TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); - -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host{host="host2"}; +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host + scalar(host{host="host2"}); + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host{host="host1"}) + host; @@ -57,14 +57,31 @@ TQL EVAL (0, 15, '5s') scalar(scalar(host{host="host2"}) + host); -- case have multiple time series, scalar return NaN +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host); +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host) + 1; +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') 1 + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host) + scalar(host); +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') host + scalar(host); +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host{host="host2"} + scalar(host); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') scalar(host) + host{host="host2"}; + +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') scalar(host{host="host1"} + scalar(host)); -- error case