Skip to content

Commit

Permalink
chore: add more test for greptime_identity pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 12, 2024
1 parent 0704eb6 commit 074079f
Showing 1 changed file with 86 additions and 19 deletions.
105 changes: 86 additions & 19 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,11 @@ fn json_value_to_row(
Ok(Row { values: row })
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
fn identity_pipeline_inner(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
tag_column_names: Option<Vec<&String>>,
) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());

let mut schema_info = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
Expand Down Expand Up @@ -410,24 +402,46 @@ pub fn identity_pipeline(
schema_info.schema.push(greptime_timestamp_schema);

// set the semantic type of the row key column to Tag
if let Some(table) = table {
let table_schema = &table.table_info().meta;
table_schema
.row_key_column_names()
.for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
if let Some(tag_column_names) = tag_column_names {
tag_column_names.iter().for_each(|tag_column_name| {
if let Some(index) = schema_info.index.get(*tag_column_name) {
schema_info.schema[*index].semantic_type = SemanticType::Tag as i32;
}
});
}
Ok(Rows {
schema: schema_info.schema,
rows,
})
}

/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// params table is used to set the semantic type of the row key column to Tag
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(
array: Vec<serde_json::Value>,
table: Option<Arc<table::Table>>,
) -> Result<Rows> {
match table {
Some(table) => {
let table_info = table.table_info();
let tag_column_names = table_info.meta.row_key_column_names().collect();
identity_pipeline_inner(array, Some(tag_column_names))
}
None => identity_pipeline_inner(array, None),
}
}

#[cfg(test)]
mod tests {
use api::v1::SemanticType;

use crate::etl::transform::transformer::greptime::identity_pipeline_inner;
use crate::identity_pipeline;

#[test]
Expand Down Expand Up @@ -517,5 +531,58 @@ mod tests {
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88.5,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let tag_column_names = ["name".to_string(), "address".to_string()];
let rows = identity_pipeline_inner(array, Some(tag_column_names.iter().collect()));
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "name")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.find(|x| x.column_name == "address")
.unwrap()
.semantic_type,
SemanticType::Tag as i32
);
assert_eq!(
rows.schema
.iter()
.filter(|x| x.semantic_type == SemanticType::Tag as i32)
.count(),
2
);
}
}
}

0 comments on commit 074079f

Please sign in to comment.