-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-84 Migrate Trigger Dag Run endpoint to FastAPI #43875
base: main
Are you sure you want to change the base?
Changes from all commits
ed49619
028c9e5
f6df0fc
7c06d2b
1a3a96c
faab1d4
b07d3b3
c426fcb
c8ee434
e6964b8
2eb473d
b82b1ae
d2d45bd
e6f863e
f730881
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,12 @@ | |
from datetime import datetime | ||
from enum import Enum | ||
|
||
from pydantic import Field | ||
from fastapi import HTTPException, status | ||
from pydantic import AwareDatetime, Field, computed_field, model_validator | ||
|
||
from airflow.api_fastapi.core_api.base import BaseModel | ||
from airflow.models import DagRun | ||
from airflow.utils import timezone | ||
from airflow.utils.state import DagRunState | ||
from airflow.utils.types import DagRunTriggeredByType, DagRunType | ||
|
||
|
@@ -73,3 +76,37 @@ class DAGRunCollectionResponse(BaseModel): | |
|
||
dag_runs: list[DAGRunResponse] | ||
total_entries: int | ||
|
||
|
||
class TriggerDAGRunPostBody(BaseModel): | ||
"""Trigger DAG Run Serializer for POST body.""" | ||
|
||
dag_run_id: str | None = None | ||
data_interval_start: AwareDatetime | None = None | ||
data_interval_end: AwareDatetime | None = None | ||
|
||
conf: dict = Field(default_factory=dict) | ||
note: str | None = None | ||
|
||
model_config = {"extra": "forbid"} | ||
|
||
@model_validator(mode="after") | ||
def check_data_intervals(cls, values): | ||
if (values.data_interval_start is None) != (values.data_interval_end is None): | ||
raise HTTPException( | ||
status.HTTP_422_UNPROCESSABLE_ENTITY, | ||
"Either both data_interval_start and data_interval_end must be provided or both must be None", | ||
) | ||
Comment on lines
+96
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pydantic is taking care of formatting correctly 422 errors. Those have a whole specific structure. You can just raise |
||
return values | ||
|
||
@model_validator(mode="after") | ||
def validate_dag_run_id(self): | ||
if not self.dag_run_id: | ||
self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @uranusjr If we remove the logical_date, then we have I assume this will be updated later when the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, Shall I just remove logical_date from request body and use current time by default to generate run_id? Once updated made to create_dagrun, this can also be updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, let's remove it from the request body. |
||
return self | ||
|
||
# Mypy issue https://github.com/python/mypy/issues/1362 | ||
@computed_field # type: ignore[misc] | ||
@property | ||
def logical_date(self) -> datetime: | ||
return timezone.utcnow() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
from typing import Annotated, cast | ||
|
||
import pendulum | ||
from fastapi import Depends, HTTPException, Query, Request, status | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session | ||
|
@@ -45,13 +46,18 @@ | |
DAGRunPatchBody, | ||
DAGRunPatchStates, | ||
DAGRunResponse, | ||
TriggerDAGRunPostBody, | ||
) | ||
from airflow.api_fastapi.core_api.datamodels.task_instances import ( | ||
TaskInstanceCollectionResponse, | ||
TaskInstanceResponse, | ||
) | ||
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc | ||
from airflow.models import DAG, DagRun | ||
from airflow.models import DAG, DagModel, DagRun | ||
from airflow.models.dag_version import DagVersion | ||
from airflow.timetables.base import DataInterval | ||
from airflow.utils.state import DagRunState | ||
from airflow.utils.types import DagRunTriggeredByType, DagRunType | ||
|
||
dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") | ||
|
||
|
@@ -296,3 +302,71 @@ def get_dag_runs( | |
dag_runs=dag_runs, | ||
total_entries=total_entries, | ||
) | ||
|
||
|
||
@dag_run_router.post( | ||
"", | ||
responses=create_openapi_http_exception_doc( | ||
[ | ||
status.HTTP_400_BAD_REQUEST, | ||
status.HTTP_404_NOT_FOUND, | ||
status.HTTP_409_CONFLICT, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This 409 needs to stay though to be reflected in the documentation. |
||
] | ||
), | ||
) | ||
def trigger_dag_run( | ||
dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)] | ||
) -> DAGRunResponse: | ||
"""Trigger a DAG.""" | ||
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) | ||
if not dm: | ||
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") | ||
|
||
if dm.has_import_errors: | ||
raise HTTPException( | ||
status.HTTP_400_BAD_REQUEST, | ||
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", | ||
) | ||
|
||
run_id = body.dag_run_id | ||
logical_date = pendulum.instance(body.logical_date) | ||
dagrun_instance = session.scalar( | ||
select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id).limit(1) | ||
) | ||
|
||
if not dagrun_instance: | ||
try: | ||
dag: DAG = request.app.state.dag_bag.get_dag(dag_id) | ||
|
||
if body.data_interval_start and body.data_interval_end: | ||
data_interval = DataInterval( | ||
start=pendulum.instance(body.data_interval_start), | ||
end=pendulum.instance(body.data_interval_end), | ||
) | ||
else: | ||
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) | ||
dag_version = DagVersion.get_latest_version(dag.dag_id) | ||
dag_run = dag.create_dagrun( | ||
run_type=DagRunType.MANUAL, | ||
run_id=run_id, | ||
logical_date=logical_date, | ||
data_interval=data_interval, | ||
state=DagRunState.QUEUED, | ||
conf=body.conf, | ||
external_trigger=True, | ||
dag_version=dag_version, | ||
session=session, | ||
triggered_by=DagRunTriggeredByType.REST_API, | ||
) | ||
dag_run_note = body.note | ||
if dag_run_note: | ||
current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 | ||
dag_run.note = (dag_run_note, current_user_id) | ||
return dag_run | ||
except ValueError as e: | ||
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) | ||
|
||
raise HTTPException( | ||
status.HTTP_409_CONFLICT, | ||
f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists", | ||
) | ||
Comment on lines
+369
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DB duplicate entry exceptions are already handled by the application. You can just always try to execute this code, if the database crashes with duplicate entry, a nice 409 errors will automatically be returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it'll be great to keep this consistent across all APIs. Legacy API throws 400 bad request on extra properties passed, while since FastAPI and Pydantic ignore extra properties, new APIs don't throw an error yet.
We could either forbid extra properties for all models or for none of them, so the users have a consistent view of the APIs. In new APIs, I suppose currently forbidding is being done only in
CreateAssetEventsBody
here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I created a draft to start a conversation on this: #44306
Forgot to link this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice,
Maybe remove it in this PR so there is no special case. #44306 will introduce it for all or none.