Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(promql): parameterize lookback #3630

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" }
greptime-proto = { git = "https://github.com/etolbakov/greptime-proto.git", rev = "1da8c466f9920ec85141bc673c544c627fe79416" }
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
1 change: 1 addition & 0 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl Database {
start: start.to_string(),
end: end.to_string(),
step: step.to_string(),
lookback: "".to_string(),
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
})),
}))
.await
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl GrpcQueryHandler for Instance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await;
Expand Down
9 changes: 8 additions & 1 deletion src/operator/src/statement/tql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_query::Output;
use common_telemetry::tracing;
use query::parser::{
PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME,
EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand All @@ -37,12 +37,16 @@ impl StatementExecutor {
end: eval.end,
step: eval.step,
query: eval.query,
lookback: eval.lookback.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
};
QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)?
}
Tql::Explain(explain) => {
let promql = PromQuery {
query: explain.query,
lookback: explain
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
..PromQuery::default()
};
let explain_node_name = if explain.is_verbose {
Expand All @@ -63,6 +67,9 @@ impl StatementExecutor {
end: analyze.end,
step: analyze.step,
query: analyze.query,
lookback: analyze
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};
let analyze_node_name = if analyze.is_verbose {
ANALYZE_VERBOSE_NODE_NAME
Expand Down
17 changes: 14 additions & 3 deletions src/query/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::error::{
};
use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED};

const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m
pub const DEFAULT_LOOKBACK_STRING: &str = "5m";
pub const EXPLAIN_NODE_NAME: &str = "EXPLAIN";
pub const EXPLAIN_VERBOSE_NODE_NAME: &str = "EXPLAIN VERBOSE";
Expand Down Expand Up @@ -98,6 +97,7 @@ pub struct PromQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: String,
}

impl Default for PromQuery {
Expand All @@ -107,6 +107,7 @@ impl Default for PromQuery {
start: String::from("0"),
end: String::from("0"),
step: String::from("5m"),
lookback: String::from(DEFAULT_LOOKBACK_STRING),
}
}
}
Expand Down Expand Up @@ -165,13 +166,22 @@ impl QueryLanguageParser {
query: &query.query,
})?;

let lookback_delta = query
.lookback
.parse::<u64>()
.map(Duration::from_secs)
.or_else(|_| promql_parser::util::parse_duration(&query.lookback))
.map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments)))
.context(QueryParseSnafu {
query: &query.query,
})?;

let eval_stmt = EvalStmt {
expr,
start,
end,
interval: step,
// TODO(ruihang): provide a way to adjust this parameter.
lookback_delta: Duration::from_secs(DEFAULT_LOOKBACK),
lookback_delta,
};

Ok(QueryStatement::Promql(eval_stmt))
Expand Down Expand Up @@ -353,6 +363,7 @@ mod test {
start: "2022-02-13T17:14:00Z".to_string(),
end: "2023-02-13T17:14:00Z".to_string(),
step: "1d".to_string(),
lookback: "5m".to_string(),
};

#[cfg(not(windows))]
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub enum Error {

#[snafu(display("Failed to parse PromQL: {query:?}"))]
ParsePromQL {
query: PromQuery,
query: Box<PromQuery>,
location: Location,
source: query::error::Error,
},
Expand Down
2 changes: 2 additions & 0 deletions src/servers/src/grpc/prom_query_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: range_query.start,
end: range_query.end,
step: range_query.step,
lookback: range_query.lookback,
}
}
Promql::InstantQuery(instant_query) => {
Expand All @@ -71,6 +72,7 @@ impl PrometheusGateway for PrometheusGatewayService {
start: time.clone(),
end: time,
step: String::from("1s"),
lookback: instant_query.lookback,
}
}
};
Expand Down
6 changes: 5 additions & 1 deletion src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::tracing;
use query::parser::PromQuery;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -205,6 +205,7 @@ pub struct PromqlQuery {
pub start: String,
pub end: String,
pub step: String,
pub lookback: Option<String>,
pub db: Option<String>,
}

Expand All @@ -215,6 +216,9 @@ impl From<PromqlQuery> for PromQuery {
start: query.start,
end: query.end,
step: query.step,
lookback: query
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/servers/src/http/prometheus.rs
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub async fn build_info_query() -> PrometheusJsonResponse {
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct InstantQuery {
query: Option<String>,
lookback: Option<String>,
time: Option<String>,
timeout: Option<String>,
db: Option<String>,
Expand All @@ -178,6 +179,10 @@ pub async fn instant_query(
start: time.clone(),
end: time,
step: "1s".to_string(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -196,6 +201,7 @@ pub struct RangeQuery {
start: Option<String>,
end: Option<String>,
step: Option<String>,
lookback: Option<String>,
timeout: Option<String>,
db: Option<String>,
}
Expand All @@ -216,6 +222,10 @@ pub async fn range_query(
start: params.start.or(form_params.start).unwrap_or_default(),
end: params.end.or(form_params.end).unwrap_or_default(),
step: params.step.or(form_params.step).unwrap_or_default(),
lookback: params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string()),
};

let result = handler.do_query(&prom_query, query_ctx).await;
Expand All @@ -235,6 +245,7 @@ struct Matches(Vec<String>);
pub struct LabelsQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -310,6 +321,11 @@ pub async fn labels_query(
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);

let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut labels = HashSet::new();
let _ = labels.insert(METRIC_NAME.to_string());

Expand All @@ -320,6 +336,7 @@ pub async fn labels_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};

let result = handler.do_query(&prom_query, query_ctx.clone()).await;
Expand Down Expand Up @@ -546,6 +563,7 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
pub struct LabelValueQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -587,6 +605,9 @@ pub async fn label_values_query(

let start = params.start.unwrap_or_else(yesterday_rfc3339);
let end = params.end.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut label_values = HashSet::new();

Expand All @@ -597,6 +618,7 @@ pub async fn label_values_query(
start: start.clone(),
end: end.clone(),
step: DEFAULT_LOOKBACK_STRING.to_string(),
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) =
Expand Down Expand Up @@ -695,6 +717,7 @@ async fn retrieve_label_values_from_record_batch(
pub struct SeriesQuery {
start: Option<String>,
end: Option<String>,
lookback: Option<String>,
#[serde(flatten)]
matches: Matches,
db: Option<String>,
Expand Down Expand Up @@ -726,6 +749,10 @@ pub async fn series_query(
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);
let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or(DEFAULT_LOOKBACK_STRING.to_string());

let mut series = Vec::new();
let mut merge_map = HashMap::new();
Expand All @@ -737,6 +764,7 @@ pub async fn series_query(
end: end.clone(),
// TODO: find a better value for step
step: DEFAULT_LOOKBACK_STRING.to_string(),
lookback: lookback.clone(),
};
let result = handler.do_query(&prom_query, query_ctx.clone()).await;

Expand Down
1 change: 1 addition & 0 deletions src/servers/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl GrpcQueryHandler for DummyInstance {
start: promql.start,
end: promql.end,
step: promql.step,
lookback: promql.lookback,
};
let mut result =
SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await;
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ CREATE TABLE {table_name} (
start: "1672557973".to_owned(),
end: "1672557978".to_owned(),
step: "1s".to_owned(),
lookback: "5m".to_string(),
})),
});
let output = query(instance, request).await;
Expand Down
3 changes: 3 additions & 0 deletions tests-integration/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
let instant_query = PromInstantQuery {
query: "test".to_string(),
time: "5".to_string(),
lookback: "5m".to_string(),
};
let instant_query_request = PromqlRequest {
header: Some(header.clone()),
Expand Down Expand Up @@ -555,6 +556,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
start: "0".to_string(),
end: "10".to_string(),
step: "5s".to_string(),
lookback: "5m".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header.clone()),
Expand Down Expand Up @@ -605,6 +607,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
start: "1000000000".to_string(),
end: "1000001000".to_string(),
step: "5s".to_string(),
lookback: "5m".to_string(),
};
let range_query_request: PromqlRequest = PromqlRequest {
header: Some(header),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[1000], time index=[j], REDACTED
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[2000], interval=[1000], time index=[j], REDACTED
tisonkun marked this conversation as resolved.
Show resolved Hide resolved
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
Expand Down
26 changes: 13 additions & 13 deletions tests/cases/standalone/common/tql-explain-analyze/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,21 @@ TQL EXPLAIN (0, 10, '5s') test;
-- SQLNESS REPLACE (peers.*) REDACTED
TQL EXPLAIN (0, 10, '1s', '2s') test;

+---------------+-----------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | MergeScan [is_placeholder=false] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[2000], interval=[300000], time index=[j] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["k"] |
| | SortExec: expr=[k@2 ASC NULLS LAST] |
| | MergeScanExec: REDACTED
| | |
+---------------+-----------------------------------------------------------------------------------------------+
| | |
+---------------+---------------------------------------------------------------------------------------------+

-- explain at 0s, 5s and 10s. No point at 0s.
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
Expand Down
9 changes: 0 additions & 9 deletions tests/cases/standalone/common/tql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,13 @@ TQL EVAL (0, 10, '5s') test{k="a"};
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+---------------------+---+

-- 'lookback' parameter is not fully supported, the test has to be updated
TQL EVAL (0, 10, '1s', '2s') test{k="a"};

+-----+---------------------+---+
| i | j | k |
+-----+---------------------+---+
| 2.0 | 1970-01-01T00:00:01 | a |
| 2.0 | 1970-01-01T00:00:02 | a |
| 2.0 | 1970-01-01T00:00:03 | a |
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
| 2.0 | 1970-01-01T00:00:04 | a |
| 2.0 | 1970-01-01T00:00:05 | a |
| 2.0 | 1970-01-01T00:00:06 | a |
| 2.0 | 1970-01-01T00:00:07 | a |
| 2.0 | 1970-01-01T00:00:08 | a |
| 2.0 | 1970-01-01T00:00:09 | a |
| 2.0 | 1970-01-01T00:00:10 | a |
+-----+---------------------+---+

TQL EVAL ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp + '10 seconds'::interval, '1s') test{k="a"};
Expand Down
Loading
Loading