Skip to content

Commit

Permalink
chore: pipeline dryrun api can currently receives pipeline raw content
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 11, 2024
1 parent 3d1b8c4 commit f456d47
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 54 deletions.
5 changes: 5 additions & 0 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
Expand Down Expand Up @@ -84,6 +85,10 @@ impl PipelineHandler for Instance {
.await
.context(PipelineSnafu)
}

fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline<GreptimeTransformer>> {
PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
}
}

impl Instance {
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/benches/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ transform:
type: uint32
"#;

parse(&Content::Yaml(pipeline_yaml.into())).unwrap()
parse(&Content::Yaml(pipeline_yaml)).unwrap()
}

fn criterion_benchmark(c: &mut Criterion) {
Expand Down
18 changes: 7 additions & 11 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";

pub enum Content {
Json(String),
Yaml(String),
pub enum Content<'a> {
Json(&'a str),
Yaml(&'a str),
}

pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
Expand Down Expand Up @@ -379,8 +379,7 @@ transform:
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
Expand Down Expand Up @@ -432,8 +431,7 @@ transform:
- field: ts
type: timestamp, ns
index: time"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_str.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_str)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline
.prepare(serde_json::Value::String(message), &mut payload)
Expand Down Expand Up @@ -509,8 +507,7 @@ transform:
type: uint32
"#;

let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
Expand Down Expand Up @@ -554,8 +551,7 @@ transform:
index: time
"#;

let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let schema = pipeline.schemas().clone();
let mut result = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut result).unwrap();
Expand Down
5 changes: 5 additions & 0 deletions src/pipeline/src/manager/pipeline_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,9 @@ impl PipelineOperator {
})
.await
}

/// Compile a pipeline.
pub fn build_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
PipelineTable::compile_pipeline(pipeline)
}
}
2 changes: 1 addition & 1 deletion src/pipeline/src/manager/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ impl PipelineTable {

/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline.into());
let yaml_content = Content::Yaml(pipeline);
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
}

Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ transform:

let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();
Expand Down
10 changes: 5 additions & 5 deletions src/pipeline/tests/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ transform:
.map(|(_, d)| GreptimeValue { value_data: d })
.collect::<Vec<GreptimeValue>>();

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut stats = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -487,7 +487,7 @@ transform:
type: json
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -592,7 +592,7 @@ transform:
type: json
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -655,7 +655,7 @@ transform:
index: timestamp
"#;

let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down Expand Up @@ -691,7 +691,7 @@ transform:
- message
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();

let mut status = pipeline.init_intermediate_state();
Expand Down
9 changes: 8 additions & 1 deletion src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,14 @@ impl HttpServer {
"/pipelines/:pipeline_name",
routing::delete(event::delete_pipeline),
)
.route("/pipelines/dryrun", routing::post(event::pipeline_dryrun))
.route(
"/pipelines/dryrun",
routing::post(event::pipeline_dryrun_v0),
)
.route(
"/pipelines/v1/dryrun",
routing::post(event::pipeline_dryrun_v1),
)
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
Expand Down
141 changes: 108 additions & 33 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineVersion;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
Expand Down Expand Up @@ -276,39 +276,11 @@ fn transform_ndjson_array_factory(
})
}

#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
/// Dryrun pipeline with given data
fn dryrun_pipeline(
value: Vec<Value>,
pipeline: &pipeline::Pipeline<GreptimeTransformer>,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

let mut intermediate_state = pipeline.init_intermediate_state();

let mut results = Vec::with_capacity(value.len());
Expand Down Expand Up @@ -387,6 +359,109 @@ pub async fn pipeline_dryrun(
Ok(Json(result).into_response())
}

/// Pipeline info
/// PipelineName: pipeline_name and pipeline_version stored in greptime_private.pipelines
/// PipelineContent: pipeline raw content
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PipelineInfo {
PipelineName {
pipeline_name: String,
pipeline_version: Option<String>,
},
PipelineContent(String),
}

impl Default for PipelineInfo {
fn default() -> Self {
Self::PipelineName {
pipeline_name: GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string(),
pipeline_version: None,
}
}
}

/// Dryrun pipeline with given data
/// pipeline_info to specify pipeline
/// data to specify data
/// data maght be list of string or list of object
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct PipelineDryrunParams {
pub pipeline_info: PipelineInfo,
pub data: Vec<Value>,
}

/// Dryrun pipeline with given data v1
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun_v1(
State(log_state): State<LogState>,
Extension(query_ctx): Extension<QueryContext>,
Json(paylod): Json<PipelineDryrunParams>,
) -> Result<Response> {
let pipeline_info = paylod.pipeline_info;
let data = paylod.data;
match pipeline_info {
PipelineInfo::PipelineName {
pipeline_name,
pipeline_version,
} => {
let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?;
let pipeline = log_state
.log_handler
.get_pipeline(&pipeline_name, version, Arc::new(query_ctx))
.await?;
dryrun_pipeline(data, &pipeline)
}
PipelineInfo::PipelineContent(pipeline) => {
let pipeline = log_state.log_handler.build_pipeline(&pipeline)?;
dryrun_pipeline(data, &pipeline)
}
}
}

/// Dryrun pipeline with given data
/// deprecated
/// TODO(paomian): should be removed in the future
/// step to migrate:
/// 1. add v1 url path for pipeline_dryrun_v1
/// 2. set original path to pipeline_dryrun_v1
/// 3. delete v1 path for pipeline_dryrun_v1, keep original path
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun_v0(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

dryrun_pipeline(value, &pipeline)
}

#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,
Expand Down
3 changes: 3 additions & 0 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,7 @@ pub trait PipelineHandler {
version: PipelineVersion,
query_ctx: QueryContextRef,
) -> Result<Option<()>>;

//// Build a pipeline from a string.
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline<GreptimeTransformer>>;
}

0 comments on commit f456d47

Please sign in to comment.