Skip to content

Commit

Permalink
init trigger dag_run
Browse files Browse the repository at this point in the history
  • Loading branch information
rawwar committed Nov 11, 2024
1 parent 12c7dd4 commit b11977f
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("POST", DagAccessEntity.RUN)
@action_logging
@provide_session
Expand Down
11 changes: 11 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ class DAGRunResponse(BaseModel):
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None


class DAGRunPostBody(BaseModel):
"""DAG Run Serializer for POST body."""

dag_run_id: str | None
logical_date: datetime | None
data_interval_start: datetime | None
data_interval_end: datetime | None
conf: dict | None
note: str | None
106 changes: 106 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,67 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/:
post:
tags:
- DagRun
summary: Trigger Dag Run
description: Trigger a DAG Run.
operationId: trigger_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DAGRunPostBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagSources/{file_token}:
get:
tags:
Expand Down Expand Up @@ -3714,6 +3775,51 @@ components:
- failed
title: DAGRunPatchStates
description: Enum for DAG Run states when updating a DAG Run.
DAGRunPostBody:
properties:
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
conf:
anyOf:
- type: object
- type: 'null'
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
type: object
required:
- dag_run_id
- logical_date
- data_interval_start
- data_interval_end
- conf
- note
title: DAGRunPostBody
description: DAG Run Serializer for POST body.
DAGRunResponse:
properties:
run_id:
Expand Down
18 changes: 18 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.api_fastapi.core_api.datamodels.dag_run import (
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunPostBody,
DAGRunResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
Expand Down Expand Up @@ -147,3 +148,20 @@ def patch_dag_run(
dag_run = session.get(DagRun, dag_run.id)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)


@dag_run_router.post(
"/",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
]
),
)
def trigger_dag_run(dag_id, post_body: DAGRunPostBody, session: Annotated[Session, Depends(get_session)]):
"""Trigger a DAG Run."""
pass
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,9 @@ export type BackfillServiceCreateBackfillMutationResult = Awaited<
export type ConnectionServicePostConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.postConnection>
>;
export type DagRunServiceTriggerDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.triggerDagRun>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
ConnectionBody,
DAGPatchBody,
DAGRunPatchBody,
DAGRunPostBody,
DagRunState,
DagWarningType,
PoolPatchBody,
Expand Down Expand Up @@ -1541,6 +1542,49 @@ export const useConnectionServicePostConnection = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Trigger Dag Run
* Trigger a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagRunServiceTriggerDagRun = <
TData = Common.DagRunServiceTriggerDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: unknown;
requestBody: DAGRunPostBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: unknown;
requestBody: DAGRunPostBody;
},
TContext
>({
mutationFn: ({ dagId, requestBody }) =>
DagRunService.triggerDagRun({
dagId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
85 changes: 85 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,91 @@ export const $DAGRunPatchStates = {
description: "Enum for DAG Run states when updating a DAG Run.",
} as const;

export const $DAGRunPostBody = {
properties: {
dag_run_id: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Dag Run Id",
},
logical_date: {
anyOf: [
{
type: "string",
format: "date-time",
},
{
type: "null",
},
],
title: "Logical Date",
},
data_interval_start: {
anyOf: [
{
type: "string",
format: "date-time",
},
{
type: "null",
},
],
title: "Data Interval Start",
},
data_interval_end: {
anyOf: [
{
type: "string",
format: "date-time",
},
{
type: "null",
},
],
title: "Data Interval End",
},
conf: {
anyOf: [
{
type: "object",
},
{
type: "null",
},
],
title: "Conf",
},
note: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Note",
},
},
type: "object",
required: [
"dag_run_id",
"logical_date",
"data_interval_start",
"data_interval_end",
"conf",
"note",
],
title: "DAGRunPostBody",
description: "DAG Run Serializer for POST body.",
} as const;

export const $DAGRunResponse = {
properties: {
run_id: {
Expand Down
33 changes: 33 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import type {
DeleteDagRunResponse,
PatchDagRunData,
PatchDagRunResponse,
TriggerDagRunData,
TriggerDagRunResponse,
GetDagSourceData,
GetDagSourceResponse,
GetEventLogData,
Expand Down Expand Up @@ -828,6 +830,37 @@ export class DagRunService {
},
});
}

/**
* Trigger Dag Run
* Trigger a DAG Run.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
public static triggerDagRun(
data: TriggerDagRunData,
): CancelablePromise<TriggerDagRunResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/dags/{dag_id}/dagRuns/",
path: {
dag_id: data.dagId,
},
body: data.requestBody,
mediaType: "application/json",
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
409: "Conflict",
422: "Validation Error",
},
});
}
}

export class DagSourceService {
Expand Down
Loading

0 comments on commit b11977f

Please sign in to comment.