diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d6b1e98ab45a..1c53d8380a8b 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -745,11 +745,7 @@ impl HttpServer { ) .route( "/pipelines/dryrun", - routing::post(event::pipeline_dryrun_v0), - ) - .route( - "/pipelines/v1/dryrun", - routing::post(event::pipeline_dryrun_v1), + routing::post(event::pipeline_dryrun), ) .layer( ServiceBuilder::new() diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 5582aa371276..526b40f5171a 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -391,43 +391,8 @@ pub struct PipelineDryrunParams { pub data: Vec, } -/// Dryrun pipeline with given data v1 #[axum_macros::debug_handler] -pub async fn pipeline_dryrun_v1( - State(log_state): State, - Extension(query_ctx): Extension, - Json(paylod): Json, -) -> Result { - let pipeline_info = paylod.pipeline_info; - let data = paylod.data; - match pipeline_info { - PipelineInfo::PipelineName { - pipeline_name, - pipeline_version, - } => { - let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?; - let pipeline = log_state - .log_handler - .get_pipeline(&pipeline_name, version, Arc::new(query_ctx)) - .await?; - dryrun_pipeline(data, &pipeline) - } - PipelineInfo::PipelineContent(pipeline) => { - let pipeline = log_state.log_handler.build_pipeline(&pipeline)?; - dryrun_pipeline(data, &pipeline) - } - } -} - -/// Dryrun pipeline with given data -/// deprecated -/// TODO(paomian): should be removed in the future -/// step to migrate: -/// 1. add v1 url path for pipeline_dryrun_v1 -/// 2. set original path to pipeline_dryrun_v1 -/// 3. delete v1 path for pipeline_dryrun_v1, keep original path -#[axum_macros::debug_handler] -pub async fn pipeline_dryrun_v0( +pub async fn pipeline_dryrun( State(log_state): State, Query(query_params): Query, Extension(mut query_ctx): Extension, @@ -435,31 +400,64 @@ pub async fn pipeline_dryrun_v0( payload: String, ) -> Result { let handler = log_state.log_handler; - let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { - reason: "pipeline_name is required", - })?; + match serde_json::from_str::(&payload) { + Ok(params) => { + let pipeline_info = params.pipeline_info; + let data = params.data; + + ensure!( + data.len() <= 10, + InvalidParameterSnafu { + reason: "too many rows for dryrun", + } + ); - let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + match pipeline_info { + PipelineInfo::PipelineName { + pipeline_name, + pipeline_version, + } => { + let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?; + let pipeline = handler + .get_pipeline(&pipeline_name, version, Arc::new(query_ctx)) + .await?; + dryrun_pipeline(data, &pipeline) + } + PipelineInfo::PipelineContent(pipeline) => { + let pipeline = handler.build_pipeline(&pipeline)?; + dryrun_pipeline(data, &pipeline) + } + } + } + Err(_) => { + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + })?; - let ignore_errors = query_params.ignore_errors.unwrap_or(false); + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; - let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; + let ignore_errors = query_params.ignore_errors.unwrap_or(false); - ensure!( - value.len() <= 10, - InvalidParameterSnafu { - reason: "too many rows for dryrun", - } - ); + let value = + extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - query_ctx.set_channel(Channel::Http); - let query_ctx = Arc::new(query_ctx); + ensure!( + value.len() <= 10, + InvalidParameterSnafu { + reason: "too many rows for dryrun", + } + ); - let pipeline = handler - .get_pipeline(&pipeline_name, version, query_ctx.clone()) - .await?; + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); - dryrun_pipeline(value, &pipeline) + let pipeline = handler + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + dryrun_pipeline(value, &pipeline) + } + } } #[axum_macros::debug_handler] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 9d7b81f3919b..6206415d1b84 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1311,7 +1311,7 @@ pub async fn test_test_pipeline_api(store_type: StorageType) { // handshake let client = TestClient::new(app); - let body = r#" + let pipeline_content = r#" processors: - date: field: time @@ -1338,7 +1338,7 @@ transform: let res = client .post("/v1/events/pipelines/test") .header("Content-Type", "application/x-yaml") - .body(body) + .body(pipeline_content) .send() .await; @@ -1359,113 +1359,168 @@ transform: let pipeline = pipelines.first().unwrap(); assert_eq!(pipeline.get("name").unwrap(), "test"); - // 2. write data - let data_body = r#" + let dryrun_schema = json!([ + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id1" + }, + { + "colume_type": "FIELD", + "data_type": "INT32", + "fulltext": false, + "name": "id2" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "type" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "log" + }, + { + "colume_type": "FIELD", + "data_type": "STRING", + "fulltext": false, + "name": "logger" + }, + { + "colume_type": "TIMESTAMP", + "data_type": "TIMESTAMP_NANOSECOND", + "fulltext": false, + "name": "time" + } + ]); + let dryrun_rows = json!([ [ - { - "id1": "2436", - "id2": "2528", - "logger": "INTERACT.MANAGER", - "type": "I", - "time": "2024-05-25 20:16:37.217", - "log": "ClusterAdapter:enter sendTextDataToCluster\\n" - } - ] - "#; - let res = client - .post("/v1/events/pipelines/dryrun?pipeline_name=test") - .header("Content-Type", "application/json") - .body(data_body) - .send() - .await; - assert_eq!(res.status(), StatusCode::OK); - let body: Value = res.json().await; - let schema = &body["schema"]; - let rows = &body["rows"]; - assert_eq!( - schema, - &json!([ { - "colume_type": "FIELD", "data_type": "INT32", - "fulltext": false, - "name": "id1" + "key": "id1", + "semantic_type": "FIELD", + "value": 2436 }, { - "colume_type": "FIELD", "data_type": "INT32", - "fulltext": false, - "name": "id2" + "key": "id2", + "semantic_type": "FIELD", + "value": 2528 }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "type" + "key": "type", + "semantic_type": "FIELD", + "value": "I" }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "log" + "key": "log", + "semantic_type": "FIELD", + "value": "ClusterAdapter:enter sendTextDataToCluster\\n" }, { - "colume_type": "FIELD", "data_type": "STRING", - "fulltext": false, - "name": "logger" + "key": "logger", + "semantic_type": "FIELD", + "value": "INTERACT.MANAGER" }, { - "colume_type": "TIMESTAMP", "data_type": "TIMESTAMP_NANOSECOND", - "fulltext": false, - "name": "time" + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:37.217+0000" } - ]) - ); - assert_eq!( - rows, - &json!([ - [ - { - "data_type": "INT32", - "key": "id1", - "semantic_type": "FIELD", - "value": 2436 - }, - { - "data_type": "INT32", - "key": "id2", - "semantic_type": "FIELD", - "value": 2528 - }, - { - "data_type": "STRING", - "key": "type", - "semantic_type": "FIELD", - "value": "I" - }, - { - "data_type": "STRING", - "key": "log", - "semantic_type": "FIELD", - "value": "ClusterAdapter:enter sendTextDataToCluster\\n" - }, - { - "data_type": "STRING", - "key": "logger", - "semantic_type": "FIELD", - "value": "INTERACT.MANAGER" - }, + ] + ]); + { + let data_body = r#" + [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + "#; + let res = client + .post("/v1/events/pipelines/dryrun?pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } + { + let body = r#" + { + "pipeline_info": { + "pipeline_name": "test" + }, + "data": [ { - "data_type": "TIMESTAMP_NANOSECOND", - "key": "time", - "semantic_type": "TIMESTAMP", - "value": "2024-05-25 20:16:37.217+0000" + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" } ] - ]) - ); + } + "#; + let res = client + .post("/v1/events/pipelines/dryrun") + .header("Content-Type", "application/json") + .body(body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } + { + let mut body = json!({ + "data": [ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } + ] + }); + body["pipeline_info"] = json!(pipeline_content); + let res = client + .post("/v1/events/pipelines/dryrun") + .header("Content-Type", "application/json") + .body(body.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body["schema"]; + let rows = &body["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + } guard.remove_all().await; }