Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl label_join and label_replace for promql #5153

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 227 additions & 4 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl PromPlanner {
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as quantile, but found {:?}", other),
desc: format!("expected f64 literal as quantile, but found {:?}", other),
}
.fail()?,
};
Expand All @@ -1224,7 +1224,7 @@ impl PromPlanner {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
desc: format!("expected i64 literal as t, but found {:?}", other),
}
.fail()?,
};
Expand All @@ -1235,7 +1235,7 @@ impl PromPlanner {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expect f64 literal as smoothing factor, but found {:?}",
"expected f64 literal as smoothing factor, but found {:?}",
other
),
}
Expand All @@ -1244,7 +1244,10 @@ impl PromPlanner {
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as trend factor, but found {:?}", other),
desc: format!(
"expected f64 literal as trend factor, but found {:?}",
other
),
}
.fail()?,
};
Expand Down Expand Up @@ -1331,6 +1334,35 @@ impl PromPlanner {
exprs.push(date_part_expr);
ScalarFunc::GeneratedExpr
}

"label_join" => {
// Reserve the current columns
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}

let concat_expr =
Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
// Add the new label expr
exprs.push(concat_expr);

ScalarFunc::GeneratedExpr
}
"label_replace" => {
// Reserve the current columns
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}

let replace_expr =
Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
// Add the new label expr
exprs.push(replace_expr);

ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
Expand Down Expand Up @@ -1425,6 +1457,127 @@ impl PromPlanner {
Ok(exprs)
}

/// Build expr for `label_replace` function
fn build_regexp_replace_label_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<DfExpr> {
// label_replace(vector, dst_label, replacement, src_label, regex)
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected dst_label string literal, but found {:?}", other),
}
.fail()?,
};
let replacement = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expected replacement string literal, but found {:?}", other),
}
.fail()?,
};
let src_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
other => UnexpectedPlanExprSnafu {
desc: format!("expected src_label string literal, but found {:?}", other),
}
.fail()?,
};
let regex = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expected regex string literal, but found {:?}", other),
}
.fail()?,
};

let func = session_state
.scalar_functions()
.get("regexp_replace")
.context(UnsupportedExprSnafu {
name: "regexp_replace",
})?;

// regexp_replace(src_label, regex, replacement)
let args = vec![
DfExpr::Column(Column::from_name(src_label)),
DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
];

Ok(DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(dst_label))
}

/// Build expr for `label_join` function
fn build_concat_labels_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<DfExpr> {
// label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)

let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected dst_label string literal, but found {:?}", other),
}
.fail()?,
};
let separator = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected separator string literal, but found {:?}", other),
}
.fail()?,
};
let src_labels = other_input_exprs
.clone()
.into_iter()
.map(|expr| {
// Cast source label into column
match expr {
DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
let expr = DfExpr::Column(Column::from_name(label));
Ok(expr)
}
other => UnexpectedPlanExprSnafu {
desc: format!(
"expected source label string literal, but found {:?}",
other
),
}
.fail(),
}
})
.collect::<Result<Vec<_>>>()?;
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
}
);

let func = session_state
.scalar_functions()
.get("concat_ws")
.context(UnsupportedExprSnafu { name: "concat_ws" })?;

// concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
let mut args = Vec::with_capacity(1 + src_labels.len());
args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
args.extend(src_labels);

Ok(DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(dst_label))
}

fn create_time_index_column_expr(&self) -> Result<DfExpr> {
Ok(DfExpr::Column(Column::from_name(
self.ctx
Expand Down Expand Up @@ -3267,4 +3420,74 @@ mod test {
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}

#[tokio::test]
async fn test_label_join() {
let prom_expr = parser::parse(
"label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.tag_1 DESC NULLS LAST, up.tag_2 DESC NULLS LAST, up.tag_3 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_label_replace() {
let prom_expr = parser::parse(
"label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8("(.*):.*"), Utf8("$1")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}
103 changes: 103 additions & 0 deletions tests/cases/standalone/common/promql/label.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);

Affected Rows: 0

INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2:zone1',3),
(5000, 'host2', 'idc2',4),
(10000, 'host1', 'idc3:zone2',5),
(10000, 'host2', 'idc3',6),
(15000, 'host1', 'idc4:zone3',7),
(15000, 'host2', 'idc4',8);

Affected Rows: 8

-- Missing source labels --
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-");

Error: 1004(InvalidArguments), Invalid function argument for label_join

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host");

+---------------------+-----+------------------+-------+------------+
| ts | val | new_host | host | idc |
+---------------------+-----+------------------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | host1 | idc4:zone3 |
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
+---------------------+-----+------------------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "$2", "idc", "(.*):(.*)");

+---------------------+-----+---------+-------+------------+
| ts | val | new_idc | host | idc |
+---------------------+-----+---------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | zone3 | host1 | idc4:zone3 |
+---------------------+-----+---------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "idc", "idc2.*");

+---------------------+-----+------------+-------+------------+
| ts | val | new_idc | host | idc |
+---------------------+-----+------------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | idc3:zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | idc3:zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | idc4:zone3 | host1 | idc4:zone3 |
+---------------------+-----+------------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)");

+---------------------+-----+---------+-------+------+
| ts | val | new_idc | host | idc |
+---------------------+-----+---------+-------+------+
| 1970-01-01T00:00:00 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:05 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:05 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:10 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:10 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:10 | 6 | idc3 | host2 | idc3 |
| 1970-01-01T00:00:15 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:15 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:15 | 6 | idc3 | host2 | idc3 |
| 1970-01-01T00:00:15 | 8 | idc4 | host2 | idc4 |
+---------------------+-----+---------+-------+------+

DROP TABLE test;

Affected Rows: 0

Loading
Loading