From 0704eb6d232297cecc46c597cdb3440cb70aa9be Mon Sep 17 00:00:00 2001 From: paomian Date: Wed, 11 Dec 2024 11:52:28 +0800 Subject: [PATCH] chore: decide tag column in log api follow table schema if table exists --- src/frontend/src/instance/log_handler.rs | 12 +++++++ .../src/etl/transform/transformer/greptime.rs | 34 ++++++++++++++----- src/servers/src/http/event.rs | 15 +++++--- src/servers/src/query_handler.rs | 7 ++++ 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index c3422066a387..6346efdab035 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -27,6 +27,7 @@ use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; use session::context::QueryContextRef; use snafu::ResultExt; +use table::Table; use crate::instance::Instance; @@ -84,6 +85,17 @@ impl PipelineHandler for Instance { .await .context(PipelineSnafu) } + + async fn get_table( + &self, + catalog: &str, + schema: &str, + table: &str, + ) -> std::result::Result>, catalog::error::Error> { + self.catalog_manager + .table(catalog, schema, table, None) + .await + } } impl Instance { diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 3b43696b5ab7..99c750dc3ef8 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -15,6 +15,7 @@ pub mod coerce; use std::collections::HashSet; +use std::sync::Arc; use ahash::HashMap; use api::helper::proto_value_type; @@ -374,13 +375,16 @@ fn json_value_to_row( /// 3. The pipeline assumes that the json format is fixed /// 4. The pipeline will return an error if the same column datatype is mismatched /// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. -pub fn identity_pipeline(array: Vec) -> Result { +pub fn identity_pipeline( + array: Vec, + table: Option>, +) -> Result { let mut rows = Vec::with_capacity(array.len()); - let mut schema = SchemaInfo::default(); + let mut schema_info = SchemaInfo::default(); for value in array { if let serde_json::Value::Object(map) = value { - let row = json_value_to_row(&mut schema, map)?; + let row = json_value_to_row(&mut schema_info, map)?; rows.push(row); } } @@ -395,7 +399,7 @@ pub fn identity_pipeline(array: Vec) -> Result { let ts = GreptimeValue { value_data: Some(ValueData::TimestampNanosecondValue(ns)), }; - let column_count = schema.schema.len(); + let column_count = schema_info.schema.len(); for row in rows.iter_mut() { let diff = column_count - row.values.len(); for _ in 0..diff { @@ -403,9 +407,21 @@ pub fn identity_pipeline(array: Vec) -> Result { } row.values.push(ts.clone()); } - schema.schema.push(greptime_timestamp_schema); + schema_info.schema.push(greptime_timestamp_schema); + + // set the semantic type of the row key column to Tag + if let Some(table) = table { + let table_schema = &table.table_info().meta; + table_schema + .row_key_column_names() + .for_each(|tag_column_name| { + if let Some(index) = schema_info.index.get(tag_column_name) { + schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; + } + }); + } Ok(Rows { - schema: schema.schema, + schema: schema_info.schema, rows, }) } @@ -437,7 +453,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -465,7 +481,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -493,7 +509,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline(array); + let rows = identity_pipeline(array, None); assert!(rows.is_ok()); let rows = rows.unwrap(); assert_eq!(rows.schema.len(), 8); diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 69498c209ab4..7e8a0e6e2981 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu, - PipelineSnafu, Result, UnsupportedContentTypeSnafu, + CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, + ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; use crate::http::extractor::LogTableName; use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; @@ -612,10 +612,17 @@ async fn ingest_logs_inner( let mut results = Vec::with_capacity(pipeline_data.len()); let transformed_data: Rows; if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - let rows = pipeline::identity_pipeline(pipeline_data) + let catalog = query_ctx.current_catalog(); + let schema = query_ctx.current_schema(); + let table = state + .get_table(catalog, &schema, &table_name) + .await + .context(CatalogSnafu)?; + let rows = pipeline::identity_pipeline(pipeline_data, table) .context(PipelineTransformSnafu) .context(PipelineSnafu)?; - transformed_data = rows; + + transformed_data = rows } else { let pipeline = state .get_pipeline(&pipeline_name, version, query_ctx.clone()) diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 58812e9350bc..8e841fa8af27 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -164,4 +164,11 @@ pub trait PipelineHandler { version: PipelineVersion, query_ctx: QueryContextRef, ) -> Result>; + + async fn get_table( + &self, + catalog: &str, + schema: &str, + table: &str, + ) -> std::result::Result>, catalog::error::Error>; }