Skip to content

Commit

Permalink
fix: post process result on query full column name of prom labels API (
Browse files Browse the repository at this point in the history
…GreptimeTeam#3793)

* fix: post process result on query full column name of prom labels API

Signed-off-by: Ruihang Xia <[email protected]>

* only preserve tag column

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Apr 24, 2024
1 parent df01ac0 commit 1272bc9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
53 changes: 30 additions & 23 deletions src/servers/src/http/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use catalog::CatalogManagerRef;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use common_telemetry::tracing;
Expand Down Expand Up @@ -312,17 +311,26 @@ pub async fn labels_query(
if queries.is_empty() {
queries = form_params.matches.0;
}
if queries.is_empty() {
match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await {
Ok(labels) => {
return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels))
}
Err(e) => {
return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg())
}

// Fetch all tag columns. It will be used as white-list for tag names.
let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await
{
Ok(labels) => labels,
Err(e) => {
return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg())
}
};
// insert the special metric name label
let _ = labels.insert(METRIC_NAME.to_string());

// Fetch all columns if no query matcher is provided
if queries.is_empty() {
let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
labels_vec.sort_unstable();
return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec));
}

// Otherwise, run queries and extract column name from result set.
let start = params
.start
.or(form_params.start)
Expand All @@ -331,14 +339,13 @@ pub async fn labels_query(
.end
.or(form_params.end)
.unwrap_or_else(current_time_rfc3339);

let lookback = params
.lookback
.or(form_params.lookback)
.unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string());

let mut labels = HashSet::new();
let _ = labels.insert(METRIC_NAME.to_string());
let mut fetched_labels = HashSet::new();
let _ = fetched_labels.insert(METRIC_NAME.to_string());

let mut merge_map = HashMap::new();
for query in queries {
Expand All @@ -352,7 +359,8 @@ pub async fn labels_query(

let result = handler.do_query(&prom_query, query_ctx.clone()).await;
if let Err(err) =
retrieve_labels_name_from_query_result(result, &mut labels, &mut merge_map).await
retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map)
.await
{
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() != StatusCode::TableNotFound
Expand All @@ -366,10 +374,11 @@ pub async fn labels_query(
}
}

let _ = labels.remove(GREPTIME_TIMESTAMP);
let _ = labels.remove(GREPTIME_VALUE);
// intersect `fetched_labels` with `labels` to filter out non-tag columns
fetched_labels.retain(|l| labels.contains(l));
let _ = labels.insert(METRIC_NAME.to_string());

let mut sorted_labels: Vec<String> = labels.into_iter().collect();
let mut sorted_labels: Vec<String> = fetched_labels.into_iter().collect();
sorted_labels.sort();
let merge_map = merge_map
.into_iter()
Expand All @@ -380,27 +389,25 @@ pub async fn labels_query(
resp
}

/// Get all tag column name of the given schema
async fn get_all_column_names(
catalog: &str,
schema: &str,
manager: &CatalogManagerRef,
) -> std::result::Result<Vec<String>, catalog::error::Error> {
) -> std::result::Result<HashSet<String>, catalog::error::Error> {
let table_names = manager.table_names(catalog, schema).await?;

let mut labels = HashSet::new();
for table_name in table_names {
let Some(table) = manager.table(catalog, schema, &table_name).await? else {
continue;
};
let schema = table.schema();
for column in schema.column_schemas() {
labels.insert(column.name.to_string());
for column in table.primary_key_columns() {
labels.insert(column.name);
}
}

let mut labels_vec = labels.into_iter().collect::<Vec<_>>();
labels_vec.sort_unstable();
Ok(labels_vec)
Ok(labels)
}

async fn retrieve_series_from_query_result(
Expand Down
11 changes: 10 additions & 1 deletion src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use datatypes::schema::{ColumnSchema, SchemaRef};
use snafu::ResultExt;
use store_api::data_source::DataSourceRef;
use store_api::storage::ScanRequest;
Expand Down Expand Up @@ -81,4 +81,13 @@ impl Table {
pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
Ok(vec![self.filter_pushdown; filters.len()])
}

/// Get primary key columns in the definition order.
pub fn primary_key_columns(&self) -> impl Iterator<Item = ColumnSchema> + '_ {
self.table_info
.meta
.primary_key_indices
.iter()
.map(|i| self.table_info.meta.schema.column_schemas()[*i].clone())
}
}
12 changes: 8 additions & 4 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,15 +463,19 @@ pub async fn test_prom_http_api(store_type: StorageType) {
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!([
"__name__", "cpu", "host", "memory", "ts"
]))
.unwrap()
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host",])).unwrap()
);

// labels without match[] param
let res = client.get("/v1/prometheus/api/v1/labels").send().await;
assert_eq!(res.status(), StatusCode::OK);
let body = serde_json::from_str::<PrometheusJsonResponse>(&res.text().await).unwrap();
assert_eq!(body.status, "success");
assert_eq!(
body.data,
serde_json::from_value::<PrometheusResponse>(json!(["__name__", "host", "number",]))
.unwrap()
);

// labels query with multiple match[] params
let res = client
Expand Down

0 comments on commit 1272bc9

Please sign in to comment.