Skip to content

Commit

Permalink
fix: handle non-identical time index and field column in PromQL set o…
Browse files Browse the repository at this point in the history
…peration (#3145)

* handle different field columns

Signed-off-by: Ruihang Xia <[email protected]>

* fix and/unless on different time index

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Jan 12, 2024
1 parent aad2afd commit 527e523
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 36 deletions.
7 changes: 7 additions & 0 deletions src/promql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ pub enum Error {
right: Vec<String>,
location: Location,
},

#[snafu(display("Multi fields calculation is not supported in {}", operator))]
MultiFieldsNotSupported {
operator: String,
location: Location,
},
}

impl ErrorExt for Error {
Expand All @@ -149,6 +155,7 @@ impl ErrorExt for Error {
| UnsupportedVectorMatch { .. }
| CombineTableColumnMismatch { .. }
| DataFusionPlanning { .. }
| MultiFieldsNotSupported { .. }
| UnexpectedPlanExpr { .. }
| IllegalRange { .. } => StatusCode::InvalidArguments,

Expand Down
18 changes: 15 additions & 3 deletions src/promql/src/extension_plan/union_distinct_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl HashedData {
) -> DataFusionResult<Self> {
// Collect all batches from the input stream
let initial = (Vec::new(), 0);
let schema = input.schema();
let (batches, _num_rows) = input
.try_fold(initial, |mut acc, batch| async {
// Update rowcount
Expand Down Expand Up @@ -399,7 +400,7 @@ impl HashedData {
}

// Finilize the hash map
let batch = interleave_batches(batches, interleave_indices)?;
let batch = interleave_batches(schema, batches, interleave_indices)?;

Ok(Self {
hash_map,
Expand Down Expand Up @@ -442,10 +443,19 @@ impl HashedData {

/// Utility function to interleave batches. Based on [interleave](datafusion::arrow::compute::interleave)
fn interleave_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
indices: Vec<(usize, usize)>,
) -> DataFusionResult<RecordBatch> {
let schema = batches[0].schema();
if batches.is_empty() {
if indices.is_empty() {
return Ok(RecordBatch::new_empty(schema));
} else {
return Err(DataFusionError::Internal(
"Cannot interleave empty batches with non-empty indices".to_string(),
));
}
}

// transform batches into arrays
let mut arrays = vec![vec![]; schema.fields().len()];
Expand Down Expand Up @@ -488,6 +498,8 @@ fn take_batch(batch: &RecordBatch, indices: &[usize]) -> DataFusionResult<Record

#[cfg(test)]
mod test {
use std::sync::Arc;

use datafusion::arrow::array::Int32Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};

Expand Down Expand Up @@ -529,7 +541,7 @@ mod test {

let batches = vec![batch1, batch2, batch3];
let indices = vec![(0, 0), (1, 0), (2, 0), (0, 1), (1, 1), (2, 1)];
let result = interleave_batches(batches, indices).unwrap();
let result = interleave_batches(Arc::new(schema.clone()), batches, indices).unwrap();

let expected = RecordBatch::try_new(
Arc::new(schema),
Expand Down
71 changes: 62 additions & 9 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ use table::table::adapter::DfTableProviderAdapter;

use crate::error::{
CatalogSnafu, ColumnNotFoundSnafu, CombineTableColumnMismatchSnafu, DataFusionPlanningSnafu,
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultipleMetricMatchersSnafu,
MultipleVectorSnafu, NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu,
TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
ExpectRangeSelectorSnafu, FunctionInvalidArgumentSnafu, MultiFieldsNotSupportedSnafu,
MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, Result,
TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu,
UnknownTableSnafu, UnsupportedExprSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
ZeroRangeSelectorSnafu,
};
use crate::extension_plan::{
build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond,
Expand Down Expand Up @@ -1547,7 +1548,7 @@ impl PromPlanner {
fn set_op_on_non_field_columns(
&mut self,
left: LogicalPlan,
right: LogicalPlan,
mut right: LogicalPlan,
left_context: PromPlannerContext,
right_context: PromPlannerContext,
op: TokenType,
Expand Down Expand Up @@ -1619,11 +1620,36 @@ impl PromPlanner {
}
)
};
let left_time_index = left_context.time_index_column.clone().unwrap();
let right_time_index = right_context.time_index_column.clone().unwrap();
let join_keys = left_tag_col_set
.iter()
.cloned()
.chain([self.ctx.time_index_column.clone().unwrap()])
.chain([left_time_index.clone()])
.collect::<Vec<_>>();
self.ctx.time_index_column = Some(left_time_index.clone());

// alias right time index column if necessary
if left_context.time_index_column != right_context.time_index_column {
let right_project_exprs = right
.schema()
.fields()
.iter()
.map(|field| {
if field.name() == &right_time_index {
DfExpr::Column(Column::from_name(&right_time_index)).alias(&left_time_index)
} else {
DfExpr::Column(Column::from_name(field.name()))
}
})
.collect::<Vec<_>>();

right = LogicalPlanBuilder::from(right)
.project(right_project_exprs)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}

// Generate join plan.
// All set operations in PromQL are "distinct"
Expand Down Expand Up @@ -1675,6 +1701,21 @@ impl PromPlanner {
right_context: PromPlannerContext,
modifier: &Option<BinModifier>,
) -> Result<LogicalPlan> {
// checks
ensure!(
left_context.field_columns.len() == right_context.field_columns.len(),
CombineTableColumnMismatchSnafu {
left: left_context.field_columns.clone(),
right: right_context.field_columns.clone()
}
);
ensure!(
left_context.field_columns.len() == 1,
MultiFieldsNotSupportedSnafu {
operator: "OR operator"
}
);

// prepare hash sets
let all_tags = left_tag_cols_set
.union(&right_tag_cols_set)
Expand Down Expand Up @@ -1712,6 +1753,9 @@ impl PromPlanner {
.with_context(|| TimeIndexNotFoundSnafu {
table: right_qualifier_string.clone(),
})?;
// Take the name of first field column. The length is checked above.
let left_field_col = left_context.field_columns.first().unwrap();
let right_field_col = right_context.field_columns.first().unwrap();

// step 0: fill all columns in output schema
let mut all_columns_set = left
Expand All @@ -1724,6 +1768,10 @@ impl PromPlanner {
// remove time index column
all_columns_set.remove(&left_time_index_column);
all_columns_set.remove(&right_time_index_column);
// remove field column in the right
if left_field_col != right_field_col {
all_columns_set.remove(right_field_col);
}
let mut all_columns = all_columns_set.into_iter().collect::<Vec<_>>();
// sort to ensure the generated schema is not volatile
all_columns.sort_unstable();
Expand All @@ -1735,19 +1783,24 @@ impl PromPlanner {
if tags_not_in_left.contains(col) {
DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
} else {
DfExpr::Column(Column::new(left_qualifier.clone(), col))
DfExpr::Column(Column::new(None::<String>, col))
}
});
let right_time_index_expr = DfExpr::Column(Column::new(
right_qualifier.clone(),
right_time_index_column,
))
.alias(left_time_index_column.clone());
// `skip(1)` to skip the time index column
let right_proj_exprs_without_time_index = all_columns.iter().skip(1).map(|col| {
if tags_not_in_right.contains(col) {
// expr
if col == left_field_col && left_field_col != right_field_col {
// alias field in right side if necessary to handle different field name
DfExpr::Column(Column::new(right_qualifier.clone(), right_field_col))
} else if tags_not_in_right.contains(col) {
DfExpr::Literal(ScalarValue::Utf8(None)).alias(col.to_string())
} else {
DfExpr::Column(Column::new(right_qualifier.clone(), col))
DfExpr::Column(Column::new(None::<String>, col))
}
});
let right_proj_exprs = [right_time_index_expr]
Expand Down
74 changes: 59 additions & 15 deletions tests/cases/standalone/common/promql/set_operation.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ insert into http_requests values
Affected Rows: 8

-- empty metric
create table cpu_count(ts timestamp time index);
create table cpu_count(ts timestamp time index, greptime_value double);

Affected Rows: 0

Expand All @@ -38,10 +38,9 @@ create table vector_matching_a(
Affected Rows: 0

insert into vector_matching_a values
(3000000, "x", 10),
(3000000, "y", 20);
(3000000, "x", 10);

Affected Rows: 2
Affected Rows: 1

-- eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"}
-- http_requests{group="canary", instance="0", job="api-server"} 300
Expand Down Expand Up @@ -157,7 +156,16 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} or http_requests{g="produc
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{instance="1"};

Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named http_requests.greptime_value. Valid fields are http_requests.job, http_requests.instance, http_requests.g, http_requests.ts, "greptime_value + Float64(1)".
+---------------------+------------+-----------------------------+----------+-----+
| ts | g | greptime_value + Float64(1) | instance | job |
+---------------------+------------+-----------------------------+----------+-----+
| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api |
| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api |
| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app |
| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app |
| 1970-01-01T00:50:00 | production | 200.0 | 1 | api |
| 1970-01-01T00:50:00 | production | 600.0 | 1 | app |
+---------------------+------------+-----------------------------+----------+-----+

-- # Matching only on instance excludes everything that has instance=0/1 but includes
-- # entries without the instance label.
Expand All @@ -168,11 +176,18 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N
-- {group="canary", instance="1", job="app-server"} 801
-- vector_matching_a{l="x"} 10
-- vector_matching_a{l="y"} 20
-- NOT SUPPORTED: union on different schemas
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a);

Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.greptime_value. Valid fields are cpu_count.ts.
+---------------------+--------+-----------------------------+----------+-----+---+
| ts | g | greptime_value + Float64(1) | instance | job | l |
+---------------------+--------+-----------------------------+----------+-----+---+
| 1970-01-01T00:50:00 | | 10.0 | | | x |
| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api | |
| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api | |
| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app | |
| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app | |
+---------------------+--------+-----------------------------+----------+-----+---+

-- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a)
-- {group="canary", instance="0", job="api-server"} 301
Expand All @@ -181,11 +196,18 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N
-- {group="canary", instance="1", job="app-server"} 801
-- vector_matching_a{l="x"} 10
-- vector_matching_a{l="y"} 20
-- NOT SUPPORTED: union on different schemas
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a);

Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named cpu_count.greptime_value. Valid fields are cpu_count.ts.
+---------------------+--------+-----------------------------+----------+-----+---+
| ts | g | greptime_value + Float64(1) | instance | job | l |
+---------------------+--------+-----------------------------+----------+-----+---+
| 1970-01-01T00:50:00 | | 10.0 | | | x |
| 1970-01-01T00:50:00 | canary | 301.0 | 0 | api | |
| 1970-01-01T00:50:00 | canary | 401.0 | 1 | api | |
| 1970-01-01T00:50:00 | canary | 701.0 | 0 | app | |
| 1970-01-01T00:50:00 | canary | 801.0 | 1 | app | |
+---------------------+--------+-----------------------------+----------+-----+---+

-- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
-- http_requests{group="canary", instance="1", job="api-server"} 400
Expand Down Expand Up @@ -248,10 +270,21 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} unless ignoring(g) http_re
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `vector()`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1);

Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named time. Valid fields are http_requests.ts, http_requests.job, http_requests.instance, http_requests.g, http_requests.greptime_value.
+---------------------+-----+----------+------------+----------------+
| ts | job | instance | g | greptime_value |
+---------------------+-----+----------+------------+----------------+
| 1970-01-01T00:50:00 | api | 0 | canary | 300.0 |
| 1970-01-01T00:50:00 | api | 0 | production | 100.0 |
| 1970-01-01T00:50:00 | api | 1 | canary | 400.0 |
| 1970-01-01T00:50:00 | api | 1 | production | 200.0 |
| 1970-01-01T00:50:00 | app | 0 | canary | 700.0 |
| 1970-01-01T00:50:00 | app | 0 | production | 500.0 |
| 1970-01-01T00:50:00 | app | 1 | canary | 800.0 |
| 1970-01-01T00:50:00 | app | 1 | production | 600.0 |
+---------------------+-----+----------+------------+----------------+

-- eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1)
-- http_requests{group="canary", instance="0", job="api-server"} 300
Expand All @@ -262,10 +295,21 @@ Error: 1004(InvalidArguments), Internal error during building DataFusion plan: N
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `vector()`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests AND IGNORING (g, instance, job) vector(1);

Error: 1004(InvalidArguments), Internal error during building DataFusion plan: No field named time. Valid fields are http_requests.ts, http_requests.job, http_requests.instance, http_requests.g, http_requests.greptime_value.
+---------------------+-----+----------+------------+----------------+
| ts | job | instance | g | greptime_value |
+---------------------+-----+----------+------------+----------------+
| 1970-01-01T00:50:00 | api | 0 | canary | 300.0 |
| 1970-01-01T00:50:00 | api | 0 | production | 100.0 |
| 1970-01-01T00:50:00 | api | 1 | canary | 400.0 |
| 1970-01-01T00:50:00 | api | 1 | production | 200.0 |
| 1970-01-01T00:50:00 | app | 0 | canary | 700.0 |
| 1970-01-01T00:50:00 | app | 0 | production | 500.0 |
| 1970-01-01T00:50:00 | app | 1 | canary | 800.0 |
| 1970-01-01T00:50:00 | app | 1 | production | 600.0 |
+---------------------+-----+----------+------------+----------------+

drop table http_requests;

Expand Down
15 changes: 6 additions & 9 deletions tests/cases/standalone/common/promql/set_operation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ insert into http_requests values
(3000000, "app", "1", "canary", 800);

-- empty metric
create table cpu_count(ts timestamp time index);
create table cpu_count(ts timestamp time index, greptime_value double);

create table vector_matching_a(
ts timestamp time index,
Expand All @@ -31,8 +31,7 @@ create table vector_matching_a(
);

insert into vector_matching_a values
(3000000, "x", 10),
(3000000, "y", 20);
(3000000, "x", 10);

-- eval instant at 50m http_requests{group="canary"} and http_requests{instance="0"}
-- http_requests{group="canary", instance="0", job="api-server"} 300
Expand Down Expand Up @@ -103,8 +102,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or http_requests{ins
-- {group="canary", instance="1", job="app-server"} 801
-- vector_matching_a{l="x"} 10
-- vector_matching_a{l="y"} 20
-- NOT SUPPORTED: union on different schemas
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (http_requests or cpu_count or vector_matching_a);

-- eval instant at 50m (http_requests{group="canary"} + 1) or ignoring(l, group, job) (http_requests or cpu_count or vector_matching_a)
Expand All @@ -114,8 +112,7 @@ tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or on(instance) (htt
-- {group="canary", instance="1", job="app-server"} 801
-- vector_matching_a{l="x"} 10
-- vector_matching_a{l="y"} 20
-- NOT SUPPORTED: union on different schemas
-- NOT SUPPORTED: `or`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') (http_requests{g="canary"} + 1) or ignoring(l, g, job) (http_requests or cpu_count or vector_matching_a);

-- eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
Expand Down Expand Up @@ -153,7 +150,7 @@ tql eval (3000, 3000, '1s') http_requests{g="canary"} unless ignoring(g) http_re
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `vector()`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1);

-- eval instant at 50m http_requests AND IGNORING (group, instance, job) vector(1)
Expand All @@ -165,7 +162,7 @@ tql eval (3000, 3000, '1s') http_requests AND ON (dummy) vector(1);
-- http_requests{group="production", instance="0", job="app-server"} 500
-- http_requests{group="production", instance="1", job="api-server"} 200
-- http_requests{group="production", instance="1", job="app-server"} 600
-- NOT SUPPORTED: `vector()`
-- SQLNESS SORT_RESULT 3 1
tql eval (3000, 3000, '1s') http_requests AND IGNORING (g, instance, job) vector(1);

drop table http_requests;
Expand Down

0 comments on commit 527e523

Please sign in to comment.