Skip to content

Commit

Permalink
chore: remove dryrun v1 and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Dec 11, 2024
1 parent f456d47 commit 375b733
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 146 deletions.
6 changes: 1 addition & 5 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,11 +745,7 @@ impl HttpServer {
)
.route(
"/pipelines/dryrun",
routing::post(event::pipeline_dryrun_v0),
)
.route(
"/pipelines/v1/dryrun",
routing::post(event::pipeline_dryrun_v1),
routing::post(event::pipeline_dryrun),
)
.layer(
ServiceBuilder::new()
Expand Down
106 changes: 52 additions & 54 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,75 +391,73 @@ pub struct PipelineDryrunParams {
pub data: Vec<Value>,
}

/// Dryrun pipeline with given data v1
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun_v1(
State(log_state): State<LogState>,
Extension(query_ctx): Extension<QueryContext>,
Json(paylod): Json<PipelineDryrunParams>,
) -> Result<Response> {
let pipeline_info = paylod.pipeline_info;
let data = paylod.data;
match pipeline_info {
PipelineInfo::PipelineName {
pipeline_name,
pipeline_version,
} => {
let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?;
let pipeline = log_state
.log_handler
.get_pipeline(&pipeline_name, version, Arc::new(query_ctx))
.await?;
dryrun_pipeline(data, &pipeline)
}
PipelineInfo::PipelineContent(pipeline) => {
let pipeline = log_state.log_handler.build_pipeline(&pipeline)?;
dryrun_pipeline(data, &pipeline)
}
}
}

/// Dryrun pipeline with given data
/// deprecated
/// TODO(paomian): should be removed in the future
/// step to migrate:
/// 1. add v1 url path for pipeline_dryrun_v1
/// 2. set original path to pipeline_dryrun_v1
/// 3. delete v1 path for pipeline_dryrun_v1, keep original path
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun_v0(
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
match serde_json::from_str::<PipelineDryrunParams>(&payload) {
Ok(params) => {
let pipeline_info = params.pipeline_info;
let data = params.data;

ensure!(
data.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);

let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
match pipeline_info {
PipelineInfo::PipelineName {
pipeline_name,
pipeline_version,
} => {
let version = to_pipeline_version(pipeline_version).context(PipelineSnafu)?;
let pipeline = handler
.get_pipeline(&pipeline_name, version, Arc::new(query_ctx))
.await?;
dryrun_pipeline(data, &pipeline)
}
PipelineInfo::PipelineContent(pipeline) => {
let pipeline = handler.build_pipeline(&pipeline)?;
dryrun_pipeline(data, &pipeline)
}
}
}
Err(_) => {
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;

let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;

let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);

ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);
let value =
extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;

query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);

let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);

dryrun_pipeline(value, &pipeline)
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;

dryrun_pipeline(value, &pipeline)
}
}
}

#[axum_macros::debug_handler]
Expand Down
Loading

0 comments on commit 375b733

Please sign in to comment.