Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Migrate Trigger Dag Run endpoint to FastAPI #43875

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def get_dag_runs_batch(*, session: Session = NEW_SESSION) -> APIResponse:
return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_dag("POST", DagAccessEntity.RUN)
@action_logging
@provide_session
Expand Down
11 changes: 11 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ class DAGRunResponse(BaseModel):
triggered_by: DagRunTriggeredByType
conf: dict
note: str | None


class TriggerDAGRunPostBody(BaseModel):
"""Trigger DAG Run Serializer for POST body."""

dag_run_id: str | None
logical_date: datetime
Copy link
Collaborator Author

@rawwar rawwar Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to legacy endpoint implementation, logical_date can't be None. So, I had to remove datetime| None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to remove this entirely and let the backend interpret the logical_date. I assume that has to happen at a deeper layer than use fastapi is that right @uranusjr ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I referred to the implementation and updated this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah a deeper layer should infer logical date from the data interval, or use now() if those are not provided. This should not take the argument at all.

data_interval_start: datetime | None
data_interval_end: datetime | None
rawwar marked this conversation as resolved.
Show resolved Hide resolved
conf: dict | None
note: str | None
104 changes: 104 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,67 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns:
post:
tags:
- DagRun
summary: Trigger Dag Run
description: Trigger a DAG.
operationId: trigger_dag_run
parameters:
- name: dag_id
in: path
required: true
schema:
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/TriggerDAGRunPostBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema: {}
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dagSources/{file_token}:
get:
tags:
Expand Down Expand Up @@ -6605,6 +6666,49 @@ components:
- microseconds
title: TimeDelta
description: TimeDelta can be used to interact with datetime.timedelta objects.
TriggerDAGRunPostBody:
properties:
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
logical_date:
type: string
format: date-time
title: Logical Date
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
conf:
anyOf:
- type: object
- type: 'null'
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
type: object
required:
- dag_run_id
- logical_date
- data_interval_start
- data_interval_end
- conf
- note
title: TriggerDAGRunPostBody
description: Trigger DAG Run Serializer for POST body.
TriggerResponse:
properties:
id:
Expand Down
86 changes: 84 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

from typing import Annotated

import pendulum
from fastapi import Depends, HTTPException, Query, Request, status
from sqlalchemy import select
from sqlalchemy import or_, select
from sqlalchemy.orm import Session

from airflow.api.common.mark_tasks import (
Expand All @@ -36,13 +37,18 @@
DAGRunPatchBody,
DAGRunPatchStates,
DAGRunResponse,
TriggerDAGRunPostBody,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
TaskInstanceCollectionResponse,
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models import DAG, DagRun
from airflow.models import DAG, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns")

Expand Down Expand Up @@ -228,3 +234,79 @@ def clear_dag_run(
)
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True)


@dag_run_router.post(
"",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 409 needs to stay though to be reflected in the documentation.

]
),
)
def trigger_dag_run(
dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)]
):
"""Trigger a DAG."""
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1))
if not dm:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found")

if dm.has_import_errors:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
)

logical_date = pendulum.instance(body.logical_date)
run_id = body.dag_run_id
dagrun_instance = session.scalar(
select(DagRun)
.where(
DagRun.dag_id == dag_id,
or_(DagRun.run_id == run_id, DagRun.logical_date == logical_date),
)
.limit(1)
)
if not dagrun_instance:
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)

if body.data_interval_start and body.data_interval_end:
data_interval = DataInterval(
start=pendulum.instance(body.data_interval_start),
end=pendulum.instance(body.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
logical_date=logical_date,
data_interval=data_interval,
state=DagRunState.QUEUED,
conf=body.conf,
external_trigger=True,
dag_version=dag_version,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
dag_run_note = body.note
if dag_run_note:
current_user_id = None # refer to https://github.com/apache/airflow/issues/43534
dag_run.note = (dag_run_note, current_user_id)
return DAGRunResponse.model_validate(dag_run, from_attributes=True)

if dagrun_instance.logical_date == logical_date:
raise HTTPException(
status.HTTP_409_CONFLICT,
f"DAGRun with DAG ID: '{dag_id}' and "
f"DAGRun logical date: '{logical_date.isoformat(sep=' ')}' already exists",
)

raise HTTPException(
status.HTTP_409_CONFLICT,
f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists",
)
Comment on lines +369 to +372
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DB duplicate entry exceptions are already handled by the application.

You can just always try to execute this code, if the database crashes with duplicate entry, a nice 409 errors will automatically be returned.

3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,9 @@ export type ConnectionServiceTestConnectionMutationResult = Awaited<
export type DagRunServiceClearDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.clearDagRun>
>;
export type DagRunServiceTriggerDagRunMutationResult = Awaited<
ReturnType<typeof DagRunService.triggerDagRun>
>;
export type PoolServicePostPoolMutationResult = Awaited<
ReturnType<typeof PoolService.postPool>
>;
Expand Down
44 changes: 44 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import {
PoolPatchBody,
PoolPostBody,
TaskInstancesBatchBody,
TriggerDAGRunPostBody,
VariableBody,
} from "../requests/types.gen";
import * as Common from "./common";
Expand Down Expand Up @@ -1997,6 +1998,49 @@ export const useDagRunServiceClearDagRun = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Trigger Dag Run
* Trigger a DAG.
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
* @returns unknown Successful Response
* @throws ApiError
*/
export const useDagRunServiceTriggerDagRun = <
TData = Common.DagRunServiceTriggerDagRunMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
dagId: unknown;
requestBody: TriggerDAGRunPostBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
dagId: unknown;
requestBody: TriggerDAGRunPostBody;
},
TContext
>({
mutationFn: ({ dagId, requestBody }) =>
DagRunService.triggerDagRun({
dagId,
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Post Pool
* Create a Pool.
Expand Down
Loading