From f456d47598caebe4128a49715648c4f9832dc73a Mon Sep 17 00:00:00 2001 From: paomian Date: Wed, 11 Dec 2024 17:19:48 +0800 Subject: [PATCH] chore: pipeline dryrun api can currently receives pipeline raw content --- src/frontend/src/instance/log_handler.rs | 5 + src/pipeline/benches/processor.rs | 2 +- src/pipeline/src/etl.rs | 18 +-- src/pipeline/src/manager/pipeline_operator.rs | 5 + src/pipeline/src/manager/table.rs | 2 +- src/pipeline/tests/common.rs | 2 +- src/pipeline/tests/dissect.rs | 2 +- src/pipeline/tests/pipeline.rs | 10 +- src/servers/src/http.rs | 9 +- src/servers/src/http/event.rs | 141 ++++++++++++++---- src/servers/src/query_handler.rs | 3 + 11 files changed, 145 insertions(+), 54 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index c3422066a387..566c475a08c1 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; +use pipeline::pipeline_operator::PipelineOperator; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, @@ -84,6 +85,10 @@ impl PipelineHandler for Instance { .await .context(PipelineSnafu) } + + fn build_pipeline(&self, pipeline: &str) -> ServerResult> { + PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu) + } } impl Instance { diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 09462753d892..8cf221af5b10 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -223,7 +223,7 @@ transform: type: uint32 "#; - parse(&Content::Yaml(pipeline_yaml.into())).unwrap() + parse(&Content::Yaml(pipeline_yaml)).unwrap() } fn criterion_benchmark(c: &mut Criterion) { diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 9bd47a899ec6..45feb4b02ff6 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -37,9 +37,9 @@ const PROCESSORS: &str = "processors"; const TRANSFORM: &str = "transform"; const TRANSFORMS: &str = "transforms"; -pub enum Content { - Json(String), - Yaml(String), +pub enum Content<'a> { + Json(&'a str), + Yaml(&'a str), } pub fn parse(input: &Content) -> Result> @@ -379,8 +379,7 @@ transform: - field: field2 type: uint32 "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut payload).unwrap(); assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); @@ -432,8 +431,7 @@ transform: - field: ts type: timestamp, ns index: time"#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_str.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_str)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline .prepare(serde_json::Value::String(message), &mut payload) @@ -509,8 +507,7 @@ transform: type: uint32 "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let mut payload = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut payload).unwrap(); assert_eq!(&["my_field"].to_vec(), pipeline.required_keys()); @@ -554,8 +551,7 @@ transform: index: time "#; - let pipeline: Pipeline = - parse(&Content::Yaml(pipeline_yaml.into())).unwrap(); + let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let schema = pipeline.schemas().clone(); let mut result = pipeline.init_intermediate_state(); pipeline.prepare(input_value, &mut result).unwrap(); diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 2e838144a483..4f43b89e2e74 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -243,4 +243,9 @@ impl PipelineOperator { }) .await } + + /// Compile a pipeline. + pub fn build_pipeline(pipeline: &str) -> Result> { + PipelineTable::compile_pipeline(pipeline) + } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 7b3719b66707..c2a36c63ec6d 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -203,7 +203,7 @@ impl PipelineTable { /// Compile a pipeline from a string. pub fn compile_pipeline(pipeline: &str) -> Result> { - let yaml_content = Content::Yaml(pipeline.into()); + let yaml_content = Content::Yaml(pipeline); parse::(&yaml_content).context(CompilePipelineSnafu) } diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index aa96d14d5591..d825c91e4cb3 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -19,7 +19,7 @@ use pipeline::{parse, Content, GreptimeTransformer, Pipeline}; pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { let input_value = serde_json::from_str::(input_str).unwrap(); - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); let mut result = pipeline.init_intermediate_state(); diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index 7577d58080c7..56386d0e860a 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -270,7 +270,7 @@ transform: let input_value = serde_json::from_str::(input_str).unwrap(); - let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into()); + let yaml_content = pipeline::Content::Yaml(pipeline_yaml); let pipeline: pipeline::Pipeline = pipeline::parse(&yaml_content).expect("failed to parse pipeline"); let mut result = pipeline.init_intermediate_state(); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index e68c7b9e6a6e..de724e1a27d2 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -417,7 +417,7 @@ transform: .map(|(_, d)| GreptimeValue { value_data: d }) .collect::>(); - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); let mut stats = pipeline.init_intermediate_state(); @@ -487,7 +487,7 @@ transform: type: json "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -592,7 +592,7 @@ transform: type: json "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -655,7 +655,7 @@ transform: index: timestamp "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); @@ -691,7 +691,7 @@ transform: - message type: string "#; - let yaml_content = Content::Yaml(pipeline_yaml.into()); + let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); let mut status = pipeline.init_intermediate_state(); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d8d07ed31fa0..d6b1e98ab45a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -743,7 +743,14 @@ impl HttpServer { "/pipelines/:pipeline_name", routing::delete(event::delete_pipeline), ) - .route("/pipelines/dryrun", routing::post(event::pipeline_dryrun)) + .route( + "/pipelines/dryrun", + routing::post(event::pipeline_dryrun_v0), + ) + .route( + "/pipelines/v1/dryrun", + routing::post(event::pipeline_dryrun_v1), + ) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 69498c209ab4..5582aa371276 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -38,7 +38,7 @@ use lazy_static::lazy_static; use loki_api::prost_types::Timestamp; use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; -use pipeline::PipelineVersion; +use pipeline::{GreptimeTransformer, PipelineVersion}; use prost::Message; use serde::{Deserialize, Serialize}; use serde_json::{Deserializer, Map, Value}; @@ -276,39 +276,11 @@ fn transform_ndjson_array_factory( }) } -#[axum_macros::debug_handler] -pub async fn pipeline_dryrun( - State(log_state): State, - Query(query_params): Query, - Extension(mut query_ctx): Extension, - TypedHeader(content_type): TypedHeader, - payload: String, +/// Dryrun pipeline with given data +fn dryrun_pipeline( + value: Vec, + pipeline: &pipeline::Pipeline, ) -> Result { - let handler = log_state.log_handler; - let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { - reason: "pipeline_name is required", - })?; - - let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; - - let ignore_errors = query_params.ignore_errors.unwrap_or(false); - - let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - - ensure!( - value.len() <= 10, - InvalidParameterSnafu { - reason: "too many rows for dryrun", - } - ); - - query_ctx.set_channel(Channel::Http); - let query_ctx = Arc::new(query_ctx); - - let pipeline = handler - .get_pipeline(&pipeline_name, version, query_ctx.clone()) - .await?; - let mut intermediate_state = pipeline.init_intermediate_state(); let mut results = Vec::with_capacity(value.len()); @@ -387,6 +359,109 @@ pub async fn pipeline_dryrun( Ok(Json(result).into_response()) } +/// Pipeline info +/// PipelineName: pipeline_name and pipeline_version stored in greptime_private.pipelines +/// PipelineContent: pipeline raw content +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PipelineInfo { + PipelineName { + pipeline_name: String, + pipeline_version: Option, + }, + PipelineContent(String), +} + +impl Default for PipelineInfo { + fn default() -> Self { + Self::PipelineName { + pipeline_name: GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string(), + pipeline_version: None, + } + } +} + +/// Dryrun pipeline with given data +/// pipeline_info to specify pipeline +/// data to specify data +/// data maght be list of string or list of object +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct PipelineDryrunParams { + pub pipeline_info: PipelineInfo, + pub data: Vec, +} + +/// Dryrun pipeline with given data v1 +#[axum_macros::debug_handler] +pub async fn pipeline_dryrun_v1( + State(log_state): State, + Extension(query_ctx): Extension, + Json(paylod): Json, +) -> Result { + let pipeline_info = paylod.pipeline_info; + let data = paylod.data; + match pipeline_info { + PipelineInfo::PipelineName { + pipeline_name, + pipeline_version, + } => { + let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?; + let pipeline = log_state + .log_handler + .get_pipeline(&pipeline_name, version, Arc::new(query_ctx)) + .await?; + dryrun_pipeline(data, &pipeline) + } + PipelineInfo::PipelineContent(pipeline) => { + let pipeline = log_state.log_handler.build_pipeline(&pipeline)?; + dryrun_pipeline(data, &pipeline) + } + } +} + +/// Dryrun pipeline with given data +/// deprecated +/// TODO(paomian): should be removed in the future +/// step to migrate: +/// 1. add v1 url path for pipeline_dryrun_v1 +/// 2. set original path to pipeline_dryrun_v1 +/// 3. delete v1 path for pipeline_dryrun_v1, keep original path +#[axum_macros::debug_handler] +pub async fn pipeline_dryrun_v0( + State(log_state): State, + Query(query_params): Query, + Extension(mut query_ctx): Extension, + TypedHeader(content_type): TypedHeader, + payload: String, +) -> Result { + let handler = log_state.log_handler; + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + })?; + + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; + + let ignore_errors = query_params.ignore_errors.unwrap_or(false); + + let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; + + ensure!( + value.len() <= 10, + InvalidParameterSnafu { + reason: "too many rows for dryrun", + } + ); + + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); + + let pipeline = handler + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + dryrun_pipeline(value, &pipeline) +} + #[axum_macros::debug_handler] pub async fn loki_ingest( State(log_state): State, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 58812e9350bc..b120e7dd2bc3 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -164,4 +164,7 @@ pub trait PipelineHandler { version: PipelineVersion, query_ctx: QueryContextRef, ) -> Result>; + + //// Build a pipeline from a string. + fn build_pipeline(&self, pipeline: &str) -> Result>; }