Skip to content

Commit

Permalink
fix: pipeline prepare loop break detects a conditional error
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Aug 20, 2024
1 parent b25a2b1 commit e416c8a
Showing 1 changed file with 70 additions and 28 deletions.
98 changes: 70 additions & 28 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ where
let mut search_from = 0;
// because of the key in the json map is ordered
for (payload_key, payload_value) in map.into_iter() {
if search_from >= self.required_keys.len() - 1 {
if search_from >= self.required_keys.len() {
break;
}

Expand Down Expand Up @@ -359,15 +359,16 @@ mod tests {

#[test]
fn test_pipeline_prepare() {
let input_value_str = r#"
{
let input_value_str = r#"
{
"my_field": "1,2",
"foo": "bar"
}
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();

let pipeline_yaml = r#"
let pipeline_yaml = r#"
---
description: Pipeline for Apache Tomcat
Expand All @@ -381,32 +382,73 @@ transform:
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(
&["greptime_timestamp", "my_field"].to_vec(),
pipeline.required_keys()
);
assert_eq!(
payload,
vec![
Value::Null,
Value::String("1,2".to_string()),
Value::Null,
Value::Null
]
);
let result = pipeline.exec_mut(&mut payload).unwrap();
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(
&["greptime_timestamp", "my_field"].to_vec(),
pipeline.required_keys()
);
assert_eq!(
payload,
vec![
Value::Null,
Value::String("1,2".to_string()),
Value::Null,
Value::Null
]
);
let result = pipeline.exec_mut(&mut payload).unwrap();

assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
}
_ => panic!("expect null value"),
}
_ => panic!("expect null value"),
}
{
let input_value_str = r#"
{
"reqTimeSec": "1573840000.000"
}
"#;

let pipeline_yaml = r#"
---
description: Pipeline for Demo Log
processors:
- gsub:
field: reqTimeSec
pattern: "\\."
replacement: ""
- epoch:
field: reqTimeSec
resolution: millisecond
ignore_missing: true
transform:
- field: reqTimeSec
type: epoch, millisecond
index: timestamp
"#;
let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap();
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["reqTimeSec"].to_vec(), pipeline.required_keys());
assert_eq!(payload, vec![Value::String("1573840000.000".to_string())]);
let result = pipeline.exec_mut(&mut payload).unwrap();

assert_eq!(
result.values[0].value_data,
Some(ValueData::TimestampMillisecondValue(1573840000000))
);
}
}

Expand Down

0 comments on commit e416c8a

Please sign in to comment.