Skip to content

Commit

Permalink
chore: decide tag column in log api follow table schema if table exists
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 11, 2024
1 parent 3d1b8c4 commit 0704eb6
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 13 deletions.
12 changes: 12 additions & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::Table;

use crate::instance::Instance;

Expand Down Expand Up @@ -84,6 +85,17 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
}
}

impl Instance {
Expand Down
34 changes: 25 additions & 9 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod coerce;

use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use api::helper::proto_value_type;
Expand Down Expand Up @@ -374,13 +375,16 @@ fn json_value_to_row(
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
pub fn identity_pipeline(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
let row = json_value_to_row(&mut schema_info, map)?;
rows.push(row);
}
}
Expand All @@ -395,17 +399,29 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
schema_info.schema.push(greptime_timestamp_schema);

// set the semantic type of the row key column to Tag
if let Some(table) = table {
let table_schema = &table.table_info().meta;
table_schema
.row_key_column_names()
.for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}
Ok(Rows {
schema: schema.schema,
schema: schema_info.schema,
rows,
})
}
Expand Down Expand Up @@ -437,7 +453,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -465,7 +481,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -493,7 +509,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
Expand Down
15 changes: 11 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -612,10 +612,17 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
let table = state
.get_table(catalog, &schema, &table_name)
.await
.context(CatalogSnafu)?;
let rows = pipeline::identity_pipeline(pipeline_data, table)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;

transformed_data = rows
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
Expand Down
7 changes: 7 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

async fn get_table(
&self,
catalog: &str,
schema: &str,
table: &str,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}

0 comments on commit 0704eb6

Please sign in to comment.