From b4e4df9c6625578b1d51fbe674214555fbf187d4 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 12 Dec 2024 20:04:16 +0800 Subject: [PATCH 1/5] feat: impl label_join and label_replace for promql --- src/query/src/promql/planner.rs | 217 ++++++++++++++++++ .../standalone/common/promql/label.result | 103 +++++++++ .../cases/standalone/common/promql/label.sql | 35 +++ 3 files changed, 355 insertions(+) create mode 100644 tests/cases/standalone/common/promql/label.result create mode 100644 tests/cases/standalone/common/promql/label.sql diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 001e41ca9934..fd530c2499a6 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1331,6 +1331,35 @@ impl PromPlanner { exprs.push(date_part_expr); ScalarFunc::GeneratedExpr } + + "label_join" => { + // Reserve the current columns + for value in &self.ctx.field_columns { + let expr = DfExpr::Column(Column::from_name(value)); + exprs.push(expr); + } + + let concat_expr = + Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?; + // Add the new label expr + exprs.push(concat_expr); + + ScalarFunc::GeneratedExpr + } + "label_replace" => { + // Reserve the current columns + for value in &self.ctx.field_columns { + let expr = DfExpr::Column(Column::from_name(value)); + exprs.push(expr); + } + + let replace_expr = + Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?; + // Add the new label expr + exprs.push(replace_expr); + + ScalarFunc::GeneratedExpr + } _ => { if let Some(f) = session_state.scalar_functions().get(func.name) { ScalarFunc::DataFusionBuiltin(f.clone()) @@ -1425,6 +1454,124 @@ impl PromPlanner { Ok(exprs) } + /// Build expr for `label_replace` function + fn build_regexp_replace_label_expr( + other_input_exprs: &mut VecDeque, + session_state: &SessionState, + ) -> Result { + // label_replace(vector, dst_label, replacement, src_label, regex) + let dst_label = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, + other => UnexpectedPlanExprSnafu { + desc: format!("expect dst_label string literal, but found {:?}", other), + } + .fail()?, + }; + let replacement = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r, + other => UnexpectedPlanExprSnafu { + desc: format!("expect replacement string literal, but found {:?}", other), + } + .fail()?, + }; + let src_label = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s, + other => UnexpectedPlanExprSnafu { + desc: format!("expect src_label string literal, but found {:?}", other), + } + .fail()?, + }; + let regex = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r, + other => UnexpectedPlanExprSnafu { + desc: format!("expect regex string literal, but found {:?}", other), + } + .fail()?, + }; + + let func = session_state + .scalar_functions() + .get("regexp_replace") + .context(UnsupportedExprSnafu { + name: "regexp_replace", + })?; + + // regexp_replace(src_label, regex, replacement) + let args = vec![ + DfExpr::Column(Column::from_name(src_label)), + DfExpr::Literal(ScalarValue::Utf8(Some(regex))), + DfExpr::Literal(ScalarValue::Utf8(Some(replacement))), + ]; + + Ok(DfExpr::ScalarFunction(ScalarFunction { + func: func.clone(), + args, + }) + .alias(dst_label)) + } + + /// Build expr for `label_join` function + fn build_concat_labels_expr( + other_input_exprs: &mut VecDeque, + session_state: &SessionState, + ) -> Result { + // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...) + + let dst_label = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, + other => UnexpectedPlanExprSnafu { + desc: format!("expect dst_label string literal, but found {:?}", other), + } + .fail()?, + }; + let separator = match other_input_exprs.pop_front() { + Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, + other => UnexpectedPlanExprSnafu { + desc: format!("expect separator string literal, but found {:?}", other), + } + .fail()?, + }; + let src_labels = other_input_exprs + .clone() + .into_iter() + .map(|expr| { + // Cast source label into column + match expr { + DfExpr::Literal(ScalarValue::Utf8(Some(label))) => { + let expr = DfExpr::Column(Column::from_name(label)); + Ok(expr) + } + other => UnexpectedPlanExprSnafu { + desc: format!("expect source label string literal, but found {:?}", other), + } + .fail(), + } + }) + .collect::>>()?; + ensure!( + !src_labels.is_empty(), + FunctionInvalidArgumentSnafu { + fn_name: "label_join", + } + ); + + let func = session_state + .scalar_functions() + .get("concat_ws") + .context(UnsupportedExprSnafu { name: "concat_ws" })?; + + // concat_ws(separator, src_label_1, src_label_2, ...) as dst_label + let mut args = Vec::with_capacity(1 + src_labels.len()); + args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)))); + args.extend(src_labels); + + Ok(DfExpr::ScalarFunction(ScalarFunction { + func: func.clone(), + args, + }) + .alias(dst_label)) + } + fn create_time_index_column_expr(&self) -> Result { Ok(DfExpr::Column(Column::from_name( self.ctx @@ -3267,4 +3414,74 @@ mod test { \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]" ); } + + #[tokio::test] + async fn test_label_join() { + let prom_expr = parser::parse( + "label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')", + ) + .unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = + build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1) + .await; + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + + let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8] + Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8] + PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + Sort: up.tag_0 DESC NULLS LAST, up.tag_1 DESC NULLS LAST, up.tag_2 DESC NULLS LAST, up.tag_3 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_label_replace() { + let prom_expr = parser::parse( + "label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")", + ) + .unwrap(); + let eval_stmt = EvalStmt { + expr: prom_expr, + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + + let table_provider = + build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1) + .await; + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + + let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8] + Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8("(.*):.*"), Utf8("$1")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8] + PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + Sort: up.tag_0 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N] + TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } } diff --git a/tests/cases/standalone/common/promql/label.result b/tests/cases/standalone/common/promql/label.result new file mode 100644 index 000000000000..b693b8b67b6f --- /dev/null +++ b/tests/cases/standalone/common/promql/label.result @@ -0,0 +1,103 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', 'idc1', 1), + (0, 'host2', 'idc1', 2), + (5000, 'host1', 'idc2:zone1',3), + (5000, 'host2', 'idc2',4), + (10000, 'host1', 'idc3:zone2',5), + (10000, 'host2', 'idc3',6), + (15000, 'host1', 'idc4:zone3',7), + (15000, 'host2', 'idc4',8); + +Affected Rows: 8 + +-- Missing source labels -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); + +Error: 1004(InvalidArguments), Invalid function argument for label_join + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); + ++---------------------+-----+------------------+-------+------------+ +| ts | val | new_host | host | idc | ++---------------------+-----+------------------+-------+------------+ +| 1970-01-01T00:00:00 | 1 | idc1-host1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | idc1-host1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 1 | idc1-host1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | idc1-host1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | host1 | idc4:zone3 | ++---------------------+-----+------------------+-------+------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "$2", "idc", "(.*):(.*)"); + ++---------------------+-----+---------+-------+------------+ +| ts | val | new_idc | host | idc | ++---------------------+-----+---------+-------+------------+ +| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 3 | zone1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 3 | zone1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 5 | zone2 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 3 | zone1 | host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 5 | zone2 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | zone3 | host1 | idc4:zone3 | ++---------------------+-----+---------+-------+------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "idc", "idc2.*"); + ++---------------------+-----+------------+-------+------------+ +| ts | val | new_idc | host | idc | ++---------------------+-----+------------+-------+------------+ +| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 3 | idc99 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 3 | idc99 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 5 | idc3:zone2 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 3 | idc99 | host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 5 | idc3:zone2 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | idc4:zone3 | host1 | idc4:zone3 | ++---------------------+-----+------------+-------+------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)"); + ++---------------------+-----+---------+-------+------+ +| ts | val | new_idc | host | idc | ++---------------------+-----+---------+-------+------+ +| 1970-01-01T00:00:00 | 2 | idc1 | host2 | idc1 | +| 1970-01-01T00:00:05 | 2 | idc1 | host2 | idc1 | +| 1970-01-01T00:00:05 | 4 | idc2 | host2 | idc2 | +| 1970-01-01T00:00:10 | 2 | idc1 | host2 | idc1 | +| 1970-01-01T00:00:10 | 4 | idc2 | host2 | idc2 | +| 1970-01-01T00:00:10 | 6 | idc3 | host2 | idc3 | +| 1970-01-01T00:00:15 | 2 | idc1 | host2 | idc1 | +| 1970-01-01T00:00:15 | 4 | idc2 | host2 | idc2 | +| 1970-01-01T00:00:15 | 6 | idc3 | host2 | idc3 | +| 1970-01-01T00:00:15 | 8 | idc4 | host2 | idc4 | ++---------------------+-----+---------+-------+------+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/label.sql b/tests/cases/standalone/common/promql/label.sql new file mode 100644 index 000000000000..48349066f0ad --- /dev/null +++ b/tests/cases/standalone/common/promql/label.sql @@ -0,0 +1,35 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE test VALUES + (0, 'host1', 'idc1', 1), + (0, 'host2', 'idc1', 2), + (5000, 'host1', 'idc2:zone1',3), + (5000, 'host2', 'idc2',4), + (10000, 'host1', 'idc3:zone2',5), + (10000, 'host2', 'idc3',6), + (15000, 'host1', 'idc4:zone3',7), + (15000, 'host2', 'idc4',8); + +-- Missing source labels -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "$2", "idc", "(.*):(.*)"); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "idc", "idc2.*"); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)"); + + +DROP TABLE test; From aeb1e10a742612fe05e6cbd4ab00d575b901e534 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Thu, 12 Dec 2024 22:05:22 +0800 Subject: [PATCH 2/5] chore: style --- src/query/src/promql/planner.rs | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index fd530c2499a6..7024efa0df85 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1213,7 +1213,7 @@ impl PromPlanner { let quantile_expr = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile, other => UnexpectedPlanExprSnafu { - desc: format!("expect f64 literal as quantile, but found {:?}", other), + desc: format!("expected f64 literal as quantile, but found {:?}", other), } .fail()?, }; @@ -1224,7 +1224,7 @@ impl PromPlanner { Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64, Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t, other => UnexpectedPlanExprSnafu { - desc: format!("expect i64 literal as t, but found {:?}", other), + desc: format!("expected i64 literal as t, but found {:?}", other), } .fail()?, }; @@ -1235,7 +1235,7 @@ impl PromPlanner { Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf, other => UnexpectedPlanExprSnafu { desc: format!( - "expect f64 literal as smoothing factor, but found {:?}", + "expected f64 literal as smoothing factor, but found {:?}", other ), } @@ -1244,7 +1244,10 @@ impl PromPlanner { let tf_exp = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf, other => UnexpectedPlanExprSnafu { - desc: format!("expect f64 literal as trend factor, but found {:?}", other), + desc: format!( + "expected f64 literal as trend factor, but found {:?}", + other + ), } .fail()?, }; @@ -1463,28 +1466,28 @@ impl PromPlanner { let dst_label = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, other => UnexpectedPlanExprSnafu { - desc: format!("expect dst_label string literal, but found {:?}", other), + desc: format!("expected dst_label string literal, but found {:?}", other), } .fail()?, }; let replacement = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r, other => UnexpectedPlanExprSnafu { - desc: format!("expect replacement string literal, but found {:?}", other), + desc: format!("expected replacement string literal, but found {:?}", other), } .fail()?, }; let src_label = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s, other => UnexpectedPlanExprSnafu { - desc: format!("expect src_label string literal, but found {:?}", other), + desc: format!("expected src_label string literal, but found {:?}", other), } .fail()?, }; let regex = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r, other => UnexpectedPlanExprSnafu { - desc: format!("expect regex string literal, but found {:?}", other), + desc: format!("expected regex string literal, but found {:?}", other), } .fail()?, }; @@ -1520,14 +1523,14 @@ impl PromPlanner { let dst_label = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, other => UnexpectedPlanExprSnafu { - desc: format!("expect dst_label string literal, but found {:?}", other), + desc: format!("expected dst_label string literal, but found {:?}", other), } .fail()?, }; let separator = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, other => UnexpectedPlanExprSnafu { - desc: format!("expect separator string literal, but found {:?}", other), + desc: format!("expected separator string literal, but found {:?}", other), } .fail()?, }; @@ -1542,7 +1545,10 @@ impl PromPlanner { Ok(expr) } other => UnexpectedPlanExprSnafu { - desc: format!("expect source label string literal, but found {:?}", other), + desc: format!( + "expected source label string literal, but found {:?}", + other + ), } .fail(), } From 9750e5fc83f6eeb4b502db00581540c03cf720f0 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 16 Dec 2024 21:15:18 +0800 Subject: [PATCH 3/5] fix: dst_label is eqauls to src_label --- src/query/src/promql/planner.rs | 65 ++++++++++++------- .../standalone/common/promql/label.result | 54 +++++++++++++++ .../cases/standalone/common/promql/label.sql | 8 +++ 3 files changed, 105 insertions(+), 22 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 7024efa0df85..1864f9844fbf 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1336,28 +1336,40 @@ impl PromPlanner { } "label_join" => { - // Reserve the current columns + let (concat_expr, dst_label) = + Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?; + + // Reserve the current field columns except the `dst_label`. for value in &self.ctx.field_columns { - let expr = DfExpr::Column(Column::from_name(value)); - exprs.push(expr); + if *value != dst_label { + let expr = DfExpr::Column(Column::from_name(value)); + exprs.push(expr); + } } - let concat_expr = - Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?; + // Remove it from tag columns + self.ctx.tag_columns.retain(|tag| *tag != dst_label); + // Add the new label expr exprs.push(concat_expr); ScalarFunc::GeneratedExpr } "label_replace" => { - // Reserve the current columns + let (replace_expr, dst_label) = + Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?; + + // Reserve the current field columns except the `dst_label`. for value in &self.ctx.field_columns { - let expr = DfExpr::Column(Column::from_name(value)); - exprs.push(expr); + if *value != dst_label { + let expr = DfExpr::Column(Column::from_name(value)); + exprs.push(expr); + } } - let replace_expr = - Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?; + // Remove it from tag columns + self.ctx.tag_columns.retain(|tag| *tag != dst_label); + // Add the new label expr exprs.push(replace_expr); @@ -1443,6 +1455,7 @@ impl PromPlanner { // update value columns' name, and alias them to remove qualifiers let mut new_field_columns = Vec::with_capacity(exprs.len()); + exprs = exprs .into_iter() .map(|expr| { @@ -1452,6 +1465,7 @@ impl PromPlanner { }) .collect::, _>>() .context(DataFusionPlanningSnafu)?; + self.ctx.field_columns = new_field_columns; Ok(exprs) @@ -1461,7 +1475,7 @@ impl PromPlanner { fn build_regexp_replace_label_expr( other_input_exprs: &mut VecDeque, session_state: &SessionState, - ) -> Result { + ) -> Result<(DfExpr, String)> { // label_replace(vector, dst_label, replacement, src_label, regex) let dst_label = match other_input_exprs.pop_front() { Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d, @@ -1506,18 +1520,21 @@ impl PromPlanner { DfExpr::Literal(ScalarValue::Utf8(Some(replacement))), ]; - Ok(DfExpr::ScalarFunction(ScalarFunction { - func: func.clone(), - args, - }) - .alias(dst_label)) + Ok(( + DfExpr::ScalarFunction(ScalarFunction { + func: func.clone(), + args, + }) + .alias(&dst_label), + dst_label, + )) } /// Build expr for `label_join` function fn build_concat_labels_expr( other_input_exprs: &mut VecDeque, session_state: &SessionState, - ) -> Result { + ) -> Result<(DfExpr, String)> { // label_join(vector, dst_label, separator, src_label_1, src_label_2, ...) let dst_label = match other_input_exprs.pop_front() { @@ -1542,6 +1559,7 @@ impl PromPlanner { match expr { DfExpr::Literal(ScalarValue::Utf8(Some(label))) => { let expr = DfExpr::Column(Column::from_name(label)); + Ok(expr) } other => UnexpectedPlanExprSnafu { @@ -1571,11 +1589,14 @@ impl PromPlanner { args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator)))); args.extend(src_labels); - Ok(DfExpr::ScalarFunction(ScalarFunction { - func: func.clone(), - args, - }) - .alias(dst_label)) + Ok(( + DfExpr::ScalarFunction(ScalarFunction { + func: func.clone(), + args, + }) + .alias(&dst_label), + dst_label, + )) } fn create_time_index_column_expr(&self) -> Result { diff --git a/tests/cases/standalone/common/promql/label.result b/tests/cases/standalone/common/promql/label.result index b693b8b67b6f..b0bc4189f928 100644 --- a/tests/cases/standalone/common/promql/label.result +++ b/tests/cases/standalone/common/promql/label.result @@ -25,6 +25,42 @@ TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); Error: 1004(InvalidArguments), Invalid function argument for label_join +-- dst_label is equal to source label -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "host"); + ++---------------------+-----+-------+------------+ +| ts | val | host | idc | ++---------------------+-----+-------+------------+ +| 1970-01-01T00:00:15 | 7 | host1 | idc4:zone3 | +| 1970-01-01T00:00:05 | 3 | host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 3 | host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 3 | host1 | idc2:zone1 | +| 1970-01-01T00:00:00 | 1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 5 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 5 | host1 | idc3:zone2 | ++---------------------+-----+-------+------------+ + +-- dst_label is in source labels -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host"); + ++---------------------+-----+------------------+------------+ +| ts | val | host | idc | ++---------------------+-----+------------------+------------+ +| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | idc4:zone3 | +| 1970-01-01T00:00:00 | 1 | idc1-host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | idc1-host1 | idc1 | +| 1970-01-01T00:00:10 | 1 | idc1-host1 | idc1 | +| 1970-01-01T00:00:15 | 1 | idc1-host1 | idc1 | +| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | idc3:zone2 | +| 1970-01-01T00:00:05 | 3 | idc2:zone1-host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 3 | idc2:zone1-host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 3 | idc2:zone1-host1 | idc2:zone1 | ++---------------------+-----+------------------+------------+ + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); @@ -97,6 +133,24 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", | 1970-01-01T00:00:15 | 8 | idc4 | host2 | idc4 | +---------------------+-----+---------+-------+------+ +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.*):(.*)"); + ++---------------------+-----+------+-------+ +| ts | val | idc | host | ++---------------------+-----+------+-------+ +| 1970-01-01T00:00:00 | 2 | idc1 | host2 | +| 1970-01-01T00:00:05 | 2 | idc1 | host2 | +| 1970-01-01T00:00:05 | 4 | idc2 | host2 | +| 1970-01-01T00:00:10 | 2 | idc1 | host2 | +| 1970-01-01T00:00:10 | 4 | idc2 | host2 | +| 1970-01-01T00:00:10 | 6 | idc3 | host2 | +| 1970-01-01T00:00:15 | 2 | idc1 | host2 | +| 1970-01-01T00:00:15 | 4 | idc2 | host2 | +| 1970-01-01T00:00:15 | 6 | idc3 | host2 | +| 1970-01-01T00:00:15 | 8 | idc4 | host2 | ++---------------------+-----+------+-------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/promql/label.sql b/tests/cases/standalone/common/promql/label.sql index 48349066f0ad..fd94836f251a 100644 --- a/tests/cases/standalone/common/promql/label.sql +++ b/tests/cases/standalone/common/promql/label.sql @@ -19,6 +19,12 @@ INSERT INTO TABLE test VALUES -- Missing source labels -- TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); +-- dst_label is equal to source label -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "host"); + +-- dst_label is in source labels -- +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host"); + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); @@ -31,5 +37,7 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "id -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)"); +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.*):(.*)"); DROP TABLE test; From f809f28541b35b41a66321d035045a08b08e2ce7 Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Mon, 16 Dec 2024 21:43:27 +0800 Subject: [PATCH 4/5] fix: forgot to sort the results --- .../standalone/common/promql/label.result | 23 +++++++++++-------- .../cases/standalone/common/promql/label.sql | 3 +++ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/cases/standalone/common/promql/label.result b/tests/cases/standalone/common/promql/label.result index b0bc4189f928..505ecfd60b55 100644 --- a/tests/cases/standalone/common/promql/label.result +++ b/tests/cases/standalone/common/promql/label.result @@ -26,39 +26,41 @@ TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); Error: 1004(InvalidArguments), Invalid function argument for label_join -- dst_label is equal to source label -- +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "host"); +---------------------+-----+-------+------------+ | ts | val | host | idc | +---------------------+-----+-------+------------+ -| 1970-01-01T00:00:15 | 7 | host1 | idc4:zone3 | -| 1970-01-01T00:00:05 | 3 | host1 | idc2:zone1 | -| 1970-01-01T00:00:10 | 3 | host1 | idc2:zone1 | -| 1970-01-01T00:00:15 | 3 | host1 | idc2:zone1 | | 1970-01-01T00:00:00 | 1 | host1 | idc1 | | 1970-01-01T00:00:05 | 1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 3 | host1 | idc2:zone1 | | 1970-01-01T00:00:10 | 1 | host1 | idc1 | -| 1970-01-01T00:00:15 | 1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 3 | host1 | idc2:zone1 | | 1970-01-01T00:00:10 | 5 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 3 | host1 | idc2:zone1 | | 1970-01-01T00:00:15 | 5 | host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | host1 | idc4:zone3 | +---------------------+-----+-------+------------+ -- dst_label is in source labels -- +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host"); +---------------------+-----+------------------+------------+ | ts | val | host | idc | +---------------------+-----+------------------+------------+ -| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | idc4:zone3 | | 1970-01-01T00:00:00 | 1 | idc1-host1 | idc1 | | 1970-01-01T00:00:05 | 1 | idc1-host1 | idc1 | -| 1970-01-01T00:00:10 | 1 | idc1-host1 | idc1 | -| 1970-01-01T00:00:15 | 1 | idc1-host1 | idc1 | -| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | idc3:zone2 | -| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | idc3:zone2 | | 1970-01-01T00:00:05 | 3 | idc2:zone1-host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 1 | idc1-host1 | idc1 | | 1970-01-01T00:00:10 | 3 | idc2:zone1-host1 | idc2:zone1 | +| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | idc1-host1 | idc1 | | 1970-01-01T00:00:15 | 3 | idc2:zone1-host1 | idc2:zone1 | +| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | idc4:zone3 | +---------------------+-----+------------------+------------+ -- SQLNESS SORT_RESULT 3 1 @@ -133,6 +135,7 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", | 1970-01-01T00:00:15 | 8 | idc4 | host2 | idc4 | +---------------------+-----+---------+-------+------+ +-- dst_label is equal to source label -- -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.*):(.*)"); diff --git a/tests/cases/standalone/common/promql/label.sql b/tests/cases/standalone/common/promql/label.sql index fd94836f251a..e239af5998ca 100644 --- a/tests/cases/standalone/common/promql/label.sql +++ b/tests/cases/standalone/common/promql/label.sql @@ -20,9 +20,11 @@ INSERT INTO TABLE test VALUES TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-"); -- dst_label is equal to source label -- +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "host"); -- dst_label is in source labels -- +-- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host"); -- SQLNESS SORT_RESULT 3 1 @@ -37,6 +39,7 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "id -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)"); +-- dst_label is equal to source label -- -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.*):(.*)"); From 05e37f993a04994def57f4c4a876e8cbf13f674a Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Tue, 17 Dec 2024 15:38:16 +0800 Subject: [PATCH 5/5] fix: processing empty source label --- src/query/src/promql/planner.rs | 14 +++++-- .../standalone/common/promql/label.result | 39 +++++++++++++++++++ .../cases/standalone/common/promql/label.sql | 9 +++++ 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 1864f9844fbf..1e7bc27dab6a 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -1515,7 +1515,11 @@ impl PromPlanner { // regexp_replace(src_label, regex, replacement) let args = vec![ - DfExpr::Column(Column::from_name(src_label)), + if src_label.is_empty() { + DfExpr::Literal(ScalarValue::Null) + } else { + DfExpr::Column(Column::from_name(src_label)) + }, DfExpr::Literal(ScalarValue::Utf8(Some(regex))), DfExpr::Literal(ScalarValue::Utf8(Some(replacement))), ]; @@ -1558,9 +1562,11 @@ impl PromPlanner { // Cast source label into column match expr { DfExpr::Literal(ScalarValue::Utf8(Some(label))) => { - let expr = DfExpr::Column(Column::from_name(label)); - - Ok(expr) + if label.is_empty() { + Ok(DfExpr::Literal(ScalarValue::Null)) + } else { + Ok(DfExpr::Column(Column::from_name(label))) + } } other => UnexpectedPlanExprSnafu { desc: format!( diff --git a/tests/cases/standalone/common/promql/label.result b/tests/cases/standalone/common/promql/label.result index 505ecfd60b55..42ba33ca9253 100644 --- a/tests/cases/standalone/common/promql/label.result +++ b/tests/cases/standalone/common/promql/label.result @@ -63,6 +63,25 @@ TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host" | 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | idc4:zone3 | +---------------------+-----+------------------+------------+ +-- test the empty source label -- +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", ""); + ++---------------------+-----+------+------------+ +| ts | val | host | idc | ++---------------------+-----+------+------------+ +| 1970-01-01T00:00:00 | 1 | | idc1 | +| 1970-01-01T00:00:05 | 1 | | idc1 | +| 1970-01-01T00:00:05 | 3 | | idc2:zone1 | +| 1970-01-01T00:00:10 | 1 | | idc1 | +| 1970-01-01T00:00:10 | 3 | | idc2:zone1 | +| 1970-01-01T00:00:10 | 5 | | idc3:zone2 | +| 1970-01-01T00:00:15 | 1 | | idc1 | +| 1970-01-01T00:00:15 | 3 | | idc2:zone1 | +| 1970-01-01T00:00:15 | 5 | | idc3:zone2 | +| 1970-01-01T00:00:15 | 7 | | idc4:zone3 | ++---------------------+-----+------+------------+ + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); @@ -154,6 +173,26 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(. | 1970-01-01T00:00:15 | 8 | idc4 | host2 | +---------------------+-----+------+-------+ +-- test the empty source label -- +-- TODO(dennis): we can't remove the label currently -- +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "", "", ""); + ++---------------------+-----+-----+-------+ +| ts | val | idc | host | ++---------------------+-----+-----+-------+ +| 1970-01-01T00:00:00 | 2 | | host2 | +| 1970-01-01T00:00:05 | 2 | | host2 | +| 1970-01-01T00:00:05 | 4 | | host2 | +| 1970-01-01T00:00:10 | 2 | | host2 | +| 1970-01-01T00:00:10 | 4 | | host2 | +| 1970-01-01T00:00:10 | 6 | | host2 | +| 1970-01-01T00:00:15 | 2 | | host2 | +| 1970-01-01T00:00:15 | 4 | | host2 | +| 1970-01-01T00:00:15 | 6 | | host2 | +| 1970-01-01T00:00:15 | 8 | | host2 | ++---------------------+-----+-----+-------+ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/promql/label.sql b/tests/cases/standalone/common/promql/label.sql index e239af5998ca..3b9058c27ed6 100644 --- a/tests/cases/standalone/common/promql/label.sql +++ b/tests/cases/standalone/common/promql/label.sql @@ -27,6 +27,10 @@ TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "host"); -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", "idc", "host"); +-- test the empty source label -- +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "host", "-", ""); + -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host"); @@ -43,4 +47,9 @@ TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", -- SQLNESS SORT_RESULT 3 1 TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "$2", "idc", "(.*):(.*)"); +-- test the empty source label -- +-- TODO(dennis): we can't remove the label currently -- +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "idc", "", "", ""); + DROP TABLE test;