From bf16422cee715d9c1f190be3f23cc0f66ea657ae Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 21 Aug 2024 14:20:09 +0800 Subject: [PATCH] fix: pipeline prepare loop break detects a conditional error (#4593) --- src/pipeline/src/etl.rs | 98 +++++++++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 28 deletions(-) diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 1a75866656a6..b2c8802dd52a 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -284,7 +284,7 @@ where let mut search_from = 0; // because of the key in the json map is ordered for (payload_key, payload_value) in map.into_iter() { - if search_from >= self.required_keys.len() - 1 { + if search_from >= self.required_keys.len() { break; } @@ -359,15 +359,16 @@ mod tests { #[test] fn test_pipeline_prepare() { - let input_value_str = r#" + { + let input_value_str = r#" { "my_field": "1,2", "foo": "bar" } "#; - let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); + let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); - let pipeline_yaml = r#" + let pipeline_yaml = r#" --- description: Pipeline for Apache Tomcat @@ -381,32 +382,73 @@ transform: - field: field2 type: uint32 "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); - let mut payload = pipeline.init_intermediate_state(); - pipeline.prepare(input_value, &mut payload).unwrap(); - assert_eq!( - &["greptime_timestamp", "my_field"].to_vec(), - pipeline.required_keys() - ); - assert_eq!( - payload, - vec![ - Value::Null, - Value::String("1,2".to_string()), - Value::Null, - Value::Null - ] - ); - let result = pipeline.exec_mut(&mut payload).unwrap(); + let pipeline: Pipeline = + parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let mut payload = pipeline.init_intermediate_state(); + pipeline.prepare(input_value, &mut payload).unwrap(); + assert_eq!( + &["greptime_timestamp", "my_field"].to_vec(), + pipeline.required_keys() + ); + assert_eq!( + payload, + vec![ + Value::Null, + Value::String("1,2".to_string()), + Value::Null, + Value::Null + ] + ); + let result = pipeline.exec_mut(&mut payload).unwrap(); - assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); - assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); - match &result.values[2].value_data { - Some(ValueData::TimestampNanosecondValue(v)) => { - assert_ne!(*v, 0); + assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); + assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); + match &result.values[2].value_data { + Some(ValueData::TimestampNanosecondValue(v)) => { + assert_ne!(*v, 0); + } + _ => panic!("expect null value"), } - _ => panic!("expect null value"), + } + { + let input_value_str = r#" + { + "reqTimeSec": "1573840000.000" + } + "#; + + let pipeline_yaml = r#" +--- +description: Pipeline for Demo Log + +processors: + - gsub: + field: reqTimeSec + pattern: "\\." + replacement: "" + - epoch: + field: reqTimeSec + resolution: millisecond + ignore_missing: true + +transform: + - field: reqTimeSec + type: epoch, millisecond + index: timestamp +"#; + let input_value: serde_json::Value = serde_json::from_str(input_value_str).unwrap(); + let pipeline: Pipeline = + parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let mut payload = pipeline.init_intermediate_state(); + pipeline.prepare(input_value, &mut payload).unwrap(); + assert_eq!(&["reqTimeSec"].to_vec(), pipeline.required_keys()); + assert_eq!(payload, vec![Value::String("1573840000.000".to_string())]); + let result = pipeline.exec_mut(&mut payload).unwrap(); + + assert_eq!( + result.values[0].value_data, + Some(ValueData::TimestampMillisecondValue(1573840000000)) + ); } }