From 074079f6a7e63f1ea32b22160551205d4a0c5d93 Mon Sep 17 00:00:00 2001 From: paomian Date: Thu, 12 Dec 2024 15:14:48 +0800 Subject: [PATCH] chore: add more test for greptime_identity pipeline --- .../src/etl/transform/transformer/greptime.rs | 105 ++++++++++++++---- 1 file changed, 86 insertions(+), 19 deletions(-) diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 99c750dc3ef8..309f19cac321 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -368,19 +368,11 @@ fn json_value_to_row( Ok(Row { values: row }) } -/// Identity pipeline for Greptime -/// This pipeline will convert the input JSON array to Greptime Rows -/// 1. The pipeline will add a default timestamp column to the schema -/// 2. The pipeline not resolve NULL value -/// 3. The pipeline assumes that the json format is fixed -/// 4. The pipeline will return an error if the same column datatype is mismatched -/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. -pub fn identity_pipeline( +fn identity_pipeline_inner( array: Vec, - table: Option>, + tag_column_names: Option>, ) -> Result { let mut rows = Vec::with_capacity(array.len()); - let mut schema_info = SchemaInfo::default(); for value in array { if let serde_json::Value::Object(map) = value { @@ -410,15 +402,12 @@ pub fn identity_pipeline( schema_info.schema.push(greptime_timestamp_schema); // set the semantic type of the row key column to Tag - if let Some(table) = table { - let table_schema = &table.table_info().meta; - table_schema - .row_key_column_names() - .for_each(|tag_column_name| { - if let Some(index) = schema_info.index.get(tag_column_name) { - schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; - } - }); + if let Some(tag_column_names) = tag_column_names { + tag_column_names.iter().for_each(|tag_column_name| { + if let Some(index) = schema_info.index.get(*tag_column_name) { + schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; + } + }); } Ok(Rows { schema: schema_info.schema, @@ -426,8 +415,33 @@ pub fn identity_pipeline( }) } +/// Identity pipeline for Greptime +/// This pipeline will convert the input JSON array to Greptime Rows +/// params table is used to set the semantic type of the row key column to Tag +/// 1. The pipeline will add a default timestamp column to the schema +/// 2. The pipeline not resolve NULL value +/// 3. The pipeline assumes that the json format is fixed +/// 4. The pipeline will return an error if the same column datatype is mismatched +/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema. +pub fn identity_pipeline( + array: Vec, + table: Option>, +) -> Result { + match table { + Some(table) => { + let table_info = table.table_info(); + let tag_column_names = table_info.meta.row_key_column_names().collect(); + identity_pipeline_inner(array, Some(tag_column_names)) + } + None => identity_pipeline_inner(array, None), + } +} + #[cfg(test)] mod tests { + use api::v1::SemanticType; + + use crate::etl::transform::transformer::greptime::identity_pipeline_inner; use crate::identity_pipeline; #[test] @@ -517,5 +531,58 @@ mod tests { assert_eq!(8, rows.rows[0].values.len()); assert_eq!(8, rows.rows[1].values.len()); } + { + let array = vec![ + serde_json::json!({ + "woshinull": null, + "name": "Alice", + "age": 20, + "is_student": true, + "score": 99.5, + "hobbies": "reading", + "address": "Beijing", + }), + serde_json::json!({ + "name": "Bob", + "age": 21, + "is_student": false, + "score": 88.5, + "hobbies": "swimming", + "address": "Shanghai", + "gaga": "gaga" + }), + ]; + let tag_column_names = ["name".to_string(), "address".to_string()]; + let rows = identity_pipeline_inner(array, Some(tag_column_names.iter().collect())); + assert!(rows.is_ok()); + let rows = rows.unwrap(); + assert_eq!(rows.schema.len(), 8); + assert_eq!(rows.rows.len(), 2); + assert_eq!(8, rows.rows[0].values.len()); + assert_eq!(8, rows.rows[1].values.len()); + assert_eq!( + rows.schema + .iter() + .find(|x| x.column_name == "name") + .unwrap() + .semantic_type, + SemanticType::Tag as i32 + ); + assert_eq!( + rows.schema + .iter() + .find(|x| x.column_name == "address") + .unwrap() + .semantic_type, + SemanticType::Tag as i32 + ); + assert_eq!( + rows.schema + .iter() + .filter(|x| x.semantic_type == SemanticType::Tag as i32) + .count(), + 2 + ); + } } }