Skip to content

Commit

Permalink
feat: impl label_join and label_replace for promql (#5153)
Browse files Browse the repository at this point in the history
* feat: impl label_join and label_replace for promql

* chore: style

* fix: dst_label is eqauls to src_label

* fix: forgot to sort the results

* fix: processing empty source label
  • Loading branch information
killme2008 authored Dec 18, 2024
1 parent 266919c commit e662c24
Show file tree
Hide file tree
Showing 3 changed files with 508 additions and 4 deletions.
258 changes: 254 additions & 4 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl PromPlanner {
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as quantile, but found {:?}", other),
desc: format!("expected f64 literal as quantile, but found {:?}", other),
}
.fail()?,
};
Expand All @@ -1224,7 +1224,7 @@ impl PromPlanner {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expect i64 literal as t, but found {:?}", other),
desc: format!("expected i64 literal as t, but found {:?}", other),
}
.fail()?,
};
Expand All @@ -1235,7 +1235,7 @@ impl PromPlanner {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expect f64 literal as smoothing factor, but found {:?}",
"expected f64 literal as smoothing factor, but found {:?}",
other
),
}
Expand All @@ -1244,7 +1244,10 @@ impl PromPlanner {
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!("expect f64 literal as trend factor, but found {:?}", other),
desc: format!(
"expected f64 literal as trend factor, but found {:?}",
other
),
}
.fail()?,
};
Expand Down Expand Up @@ -1331,6 +1334,47 @@ impl PromPlanner {
exprs.push(date_part_expr);
ScalarFunc::GeneratedExpr
}

"label_join" => {
let (concat_expr, dst_label) =
Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;

// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
if *value != dst_label {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}
}

// Remove it from tag columns
self.ctx.tag_columns.retain(|tag| *tag != dst_label);

// Add the new label expr
exprs.push(concat_expr);

ScalarFunc::GeneratedExpr
}
"label_replace" => {
let (replace_expr, dst_label) =
Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;

// Reserve the current field columns except the `dst_label`.
for value in &self.ctx.field_columns {
if *value != dst_label {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}
}

// Remove it from tag columns
self.ctx.tag_columns.retain(|tag| *tag != dst_label);

// Add the new label expr
exprs.push(replace_expr);

ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
Expand Down Expand Up @@ -1411,6 +1455,7 @@ impl PromPlanner {

// update value columns' name, and alias them to remove qualifiers
let mut new_field_columns = Vec::with_capacity(exprs.len());

exprs = exprs
.into_iter()
.map(|expr| {
Expand All @@ -1420,11 +1465,146 @@ impl PromPlanner {
})
.collect::<std::result::Result<Vec<_>, _>>()
.context(DataFusionPlanningSnafu)?;

self.ctx.field_columns = new_field_columns;

Ok(exprs)
}

/// Build expr for `label_replace` function
fn build_regexp_replace_label_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<(DfExpr, String)> {
// label_replace(vector, dst_label, replacement, src_label, regex)
let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected dst_label string literal, but found {:?}", other),
}
.fail()?,
};
let replacement = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expected replacement string literal, but found {:?}", other),
}
.fail()?,
};
let src_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
other => UnexpectedPlanExprSnafu {
desc: format!("expected src_label string literal, but found {:?}", other),
}
.fail()?,
};
let regex = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expected regex string literal, but found {:?}", other),
}
.fail()?,
};

let func = session_state
.scalar_functions()
.get("regexp_replace")
.context(UnsupportedExprSnafu {
name: "regexp_replace",
})?;

// regexp_replace(src_label, regex, replacement)
let args = vec![
if src_label.is_empty() {
DfExpr::Literal(ScalarValue::Null)
} else {
DfExpr::Column(Column::from_name(src_label))
},
DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
];

Ok((
DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(&dst_label),
dst_label,
))
}

/// Build expr for `label_join` function
fn build_concat_labels_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<(DfExpr, String)> {
// label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)

let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected dst_label string literal, but found {:?}", other),
}
.fail()?,
};
let separator = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expected separator string literal, but found {:?}", other),
}
.fail()?,
};
let src_labels = other_input_exprs
.clone()
.into_iter()
.map(|expr| {
// Cast source label into column
match expr {
DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
if label.is_empty() {
Ok(DfExpr::Literal(ScalarValue::Null))
} else {
Ok(DfExpr::Column(Column::from_name(label)))
}
}
other => UnexpectedPlanExprSnafu {
desc: format!(
"expected source label string literal, but found {:?}",
other
),
}
.fail(),
}
})
.collect::<Result<Vec<_>>>()?;
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
}
);

let func = session_state
.scalar_functions()
.get("concat_ws")
.context(UnsupportedExprSnafu { name: "concat_ws" })?;

// concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
let mut args = Vec::with_capacity(1 + src_labels.len());
args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
args.extend(src_labels);

Ok((
DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(&dst_label),
dst_label,
))
}

fn create_time_index_column_expr(&self) -> Result<DfExpr> {
Ok(DfExpr::Column(Column::from_name(
self.ctx
Expand Down Expand Up @@ -3267,4 +3447,74 @@ mod test {
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}

#[tokio::test]
async fn test_label_join() {
let prom_expr = parser::parse(
"label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.tag_1 DESC NULLS LAST, up.tag_2 DESC NULLS LAST, up.tag_3 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_label_replace() {
let prom_expr = parser::parse(
"label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8("(.*):.*"), Utf8("$1")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}
Loading

0 comments on commit e662c24

Please sign in to comment.