diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index b02b9b8bd70d..4f453a1ea43b 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -21,7 +21,6 @@ use catalog::CatalogManagerRef; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::{Output, OutputData}; use common_recordbatch::RecordBatches; use common_telemetry::tracing; @@ -312,17 +311,26 @@ pub async fn labels_query( if queries.is_empty() { queries = form_params.matches.0; } - if queries.is_empty() { - match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await { - Ok(labels) => { - return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels)) - } - Err(e) => { - return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg()) - } + + // Fetch all tag columns. It will be used as white-list for tag names. + let mut labels = match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await + { + Ok(labels) => labels, + Err(e) => { + return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg()) } + }; + // insert the special metric name label + let _ = labels.insert(METRIC_NAME.to_string()); + + // Fetch all columns if no query matcher is provided + if queries.is_empty() { + let mut labels_vec = labels.into_iter().collect::>(); + labels_vec.sort_unstable(); + return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels_vec)); } + // Otherwise, run queries and extract column name from result set. let start = params .start .or(form_params.start) @@ -331,14 +339,13 @@ pub async fn labels_query( .end .or(form_params.end) .unwrap_or_else(current_time_rfc3339); - let lookback = params .lookback .or(form_params.lookback) .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()); - let mut labels = HashSet::new(); - let _ = labels.insert(METRIC_NAME.to_string()); + let mut fetched_labels = HashSet::new(); + let _ = fetched_labels.insert(METRIC_NAME.to_string()); let mut merge_map = HashMap::new(); for query in queries { @@ -352,7 +359,8 @@ pub async fn labels_query( let result = handler.do_query(&prom_query, query_ctx.clone()).await; if let Err(err) = - retrieve_labels_name_from_query_result(result, &mut labels, &mut merge_map).await + retrieve_labels_name_from_query_result(result, &mut fetched_labels, &mut merge_map) + .await { // Prometheus won't report error if querying nonexist label and metric if err.status_code() != StatusCode::TableNotFound @@ -366,10 +374,11 @@ pub async fn labels_query( } } - let _ = labels.remove(GREPTIME_TIMESTAMP); - let _ = labels.remove(GREPTIME_VALUE); + // intersect `fetched_labels` with `labels` to filter out non-tag columns + fetched_labels.retain(|l| labels.contains(l)); + let _ = labels.insert(METRIC_NAME.to_string()); - let mut sorted_labels: Vec = labels.into_iter().collect(); + let mut sorted_labels: Vec = fetched_labels.into_iter().collect(); sorted_labels.sort(); let merge_map = merge_map .into_iter() @@ -380,11 +389,12 @@ pub async fn labels_query( resp } +/// Get all tag column name of the given schema async fn get_all_column_names( catalog: &str, schema: &str, manager: &CatalogManagerRef, -) -> std::result::Result, catalog::error::Error> { +) -> std::result::Result, catalog::error::Error> { let table_names = manager.table_names(catalog, schema).await?; let mut labels = HashSet::new(); @@ -392,15 +402,12 @@ async fn get_all_column_names( let Some(table) = manager.table(catalog, schema, &table_name).await? else { continue; }; - let schema = table.schema(); - for column in schema.column_schemas() { - labels.insert(column.name.to_string()); + for column in table.primary_key_columns() { + labels.insert(column.name); } } - let mut labels_vec = labels.into_iter().collect::>(); - labels_vec.sort_unstable(); - Ok(labels_vec) + Ok(labels) } async fn retrieve_series_from_query_result( diff --git a/src/table/src/table.rs b/src/table/src/table.rs index a0a45a07395f..44406c24b239 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use common_query::logical_plan::Expr; use common_recordbatch::SendableRecordBatchStream; -use datatypes::schema::SchemaRef; +use datatypes::schema::{ColumnSchema, SchemaRef}; use snafu::ResultExt; use store_api::data_source::DataSourceRef; use store_api::storage::ScanRequest; @@ -81,4 +81,13 @@ impl Table { pub fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { Ok(vec![self.filter_pushdown; filters.len()]) } + + /// Get primary key columns in the definition order. + pub fn primary_key_columns(&self) -> impl Iterator + '_ { + self.table_info + .meta + .primary_key_indices + .iter() + .map(|i| self.table_info.meta.schema.column_schemas()[*i].clone()) + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7a3a1a43c55c..d269859b869a 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -463,15 +463,19 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(body.status, "success"); assert_eq!( body.data, - serde_json::from_value::(json!([ - "__name__", "cpu", "host", "memory", "ts" - ])) - .unwrap() + serde_json::from_value::(json!(["__name__", "host",])).unwrap() ); // labels without match[] param let res = client.get("/v1/prometheus/api/v1/labels").send().await; assert_eq!(res.status(), StatusCode::OK); + let body = serde_json::from_str::(&res.text().await).unwrap(); + assert_eq!(body.status, "success"); + assert_eq!( + body.data, + serde_json::from_value::(json!(["__name__", "host", "number",])) + .unwrap() + ); // labels query with multiple match[] params let res = client