Skip to content

Commit

Permalink
chore: decide tag column in log api follow table schema if table exis…
Browse files Browse the repository at this point in the history
…ts (#5138)

* chore: decide tag column in log api follow table schema if table exists

* chore: add more test for greptime_identity pipeline

* chore: change pipeline get_table function signature

* chore: change identity_pipeline_inner tag_column_names type
  • Loading branch information
paomian authored and evenyag committed Dec 20, 2024
1 parent 4a3ef2d commit 10b3e3d
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 23 deletions.
15 changes: 14 additions & 1 deletion src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use servers::error::{
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use table::Table;

use crate::instance::Instance;

Expand Down Expand Up @@ -84,6 +85,18 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

async fn get_table(
&self,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
let catalog = query_ctx.current_catalog();
let schema = query_ctx.current_schema();
self.catalog_manager
.table(catalog, &schema, table, None)
.await
}
}

impl Instance {
Expand Down
117 changes: 100 additions & 17 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod coerce;

use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use api::helper::proto_value_type;
Expand Down Expand Up @@ -367,20 +368,15 @@ fn json_value_to_row(
Ok(Row { values: row })
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
fn identity_pipeline_inner<'a>(
array: Vec<serde_json::Value>,
tag_column_names: Option<impl Iterator<Item = &'a String>>,
) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema = SchemaInfo::default();
let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
let row = json_value_to_row(&mut schema_info, map)?;
rows.push(row);
}
}
Expand All @@ -395,23 +391,57 @@ pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
let column_count = schema_info.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
schema_info.schema.push(greptime_timestamp_schema);

// set the semantic type of the row key column to Tag
if let Some(tag_column_names) = tag_column_names {
tag_column_names.for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}
Ok(Rows {
schema: schema.schema,
schema: schema_info.schema,
rows,
})
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// params table is used to set the semantic type of the row key column to Tag
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
) -> Result<Rows> {
match table {
Some(table) => {
let table_info = table.table_info();
let tag_column_names = table_info.meta.row_key_column_names();
identity_pipeline_inner(array, Some(tag_column_names))
}
None => identity_pipeline_inner(array, None::<std::iter::Empty<&String>>),
}
}

#[cfg(test)]
mod tests {
use api::v1::SemanticType;

use crate::etl::transform::transformer::greptime::identity_pipeline_inner;
use crate::identity_pipeline;

#[test]
Expand All @@ -437,7 +467,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -465,7 +495,7 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
Expand Down Expand Up @@ -493,13 +523,66 @@ mod tests {
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
let rows = identity_pipeline(array, None);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88.5,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let tag_column_names = ["name".to_string(), "address".to_string()];
let rows = identity_pipeline_inner(array, Some(tag_column_names.iter()));
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "name")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "address")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.filter(|x| x.semantic_type == SemanticType::Tag as i32)
.count(),
2
);
}
}
}
13 changes: 9 additions & 4 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu, ParseJsonSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::extractor::LogTableName;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
Expand Down Expand Up @@ -612,10 +612,15 @@ async fn ingest_logs_inner(
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
let table = state
.get_table(&table_name, &query_ctx)
.await
.context(CatalogSnafu)?;
let rows = pipeline::identity_pipeline(pipeline_data, table)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;

transformed_data = rows
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
Expand Down
8 changes: 7 additions & 1 deletion src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay};
use serde_json::Value;
use session::context::QueryContextRef;
use session::context::{QueryContext, QueryContextRef};

use crate::error::Result;
use crate::influxdb::InfluxdbRequest;
Expand Down Expand Up @@ -164,4 +164,10 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

async fn get_table(
&self,
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
}

0 comments on commit 10b3e3d

Please sign in to comment.