-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-84 Migrate Trigger Dag Run endpoint to FastAPI #43875
base: main
Are you sure you want to change the base?
Changes from 2 commits
ed49619
028c9e5
f6df0fc
7c06d2b
1a3a96c
faab1d4
b07d3b3
c426fcb
c8ee434
e6964b8
2eb473d
b82b1ae
d2d45bd
e6f863e
f730881
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,9 @@ | |
|
||
from typing import Annotated | ||
|
||
import pendulum | ||
from fastapi import Depends, HTTPException, Query, Request, status | ||
from sqlalchemy import select | ||
from sqlalchemy import or_, select | ||
from sqlalchemy.orm import Session | ||
|
||
from airflow.api.common.mark_tasks import ( | ||
|
@@ -36,13 +37,18 @@ | |
DAGRunPatchBody, | ||
DAGRunPatchStates, | ||
DAGRunResponse, | ||
TriggerDAGRunPostBody, | ||
) | ||
from airflow.api_fastapi.core_api.datamodels.task_instances import ( | ||
TaskInstanceCollectionResponse, | ||
TaskInstanceResponse, | ||
) | ||
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc | ||
from airflow.models import DAG, DagRun | ||
from airflow.models import DAG, DagModel, DagRun | ||
from airflow.models.dag_version import DagVersion | ||
from airflow.timetables.base import DataInterval | ||
from airflow.utils.state import DagRunState | ||
from airflow.utils.types import DagRunTriggeredByType, DagRunType | ||
|
||
dag_run_router = AirflowRouter(tags=["DagRun"], prefix="/dags/{dag_id}/dagRuns") | ||
|
||
|
@@ -228,3 +234,79 @@ def clear_dag_run( | |
) | ||
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) | ||
return DAGRunResponse.model_validate(dag_run_cleared, from_attributes=True) | ||
|
||
|
||
@dag_run_router.post( | ||
"", | ||
responses=create_openapi_http_exception_doc( | ||
[ | ||
status.HTTP_400_BAD_REQUEST, | ||
status.HTTP_404_NOT_FOUND, | ||
status.HTTP_409_CONFLICT, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This 409 needs to stay though to be reflected in the documentation. |
||
] | ||
), | ||
) | ||
def trigger_dag_run( | ||
dag_id, body: TriggerDAGRunPostBody, request: Request, session: Annotated[Session, Depends(get_session)] | ||
): | ||
"""Trigger a DAG.""" | ||
dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) | ||
if not dm: | ||
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") | ||
|
||
if dm.has_import_errors: | ||
raise HTTPException( | ||
status.HTTP_400_BAD_REQUEST, | ||
f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", | ||
) | ||
|
||
logical_date = pendulum.instance(body.logical_date) | ||
run_id = body.dag_run_id | ||
dagrun_instance = session.scalar( | ||
select(DagRun) | ||
.where( | ||
DagRun.dag_id == dag_id, | ||
or_(DagRun.run_id == run_id, DagRun.logical_date == logical_date), | ||
) | ||
.limit(1) | ||
) | ||
if not dagrun_instance: | ||
dag: DAG = request.app.state.dag_bag.get_dag(dag_id) | ||
|
||
if body.data_interval_start and body.data_interval_end: | ||
data_interval = DataInterval( | ||
start=pendulum.instance(body.data_interval_start), | ||
end=pendulum.instance(body.data_interval_end), | ||
) | ||
else: | ||
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) | ||
dag_version = DagVersion.get_latest_version(dag.dag_id) | ||
dag_run = dag.create_dagrun( | ||
run_type=DagRunType.MANUAL, | ||
run_id=run_id, | ||
logical_date=logical_date, | ||
data_interval=data_interval, | ||
state=DagRunState.QUEUED, | ||
conf=body.conf, | ||
external_trigger=True, | ||
dag_version=dag_version, | ||
session=session, | ||
triggered_by=DagRunTriggeredByType.REST_API, | ||
) | ||
dag_run_note = body.note | ||
if dag_run_note: | ||
current_user_id = None # refer to https://github.com/apache/airflow/issues/43534 | ||
dag_run.note = (dag_run_note, current_user_id) | ||
return DAGRunResponse.model_validate(dag_run, from_attributes=True) | ||
|
||
if dagrun_instance.logical_date == logical_date: | ||
raise HTTPException( | ||
status.HTTP_409_CONFLICT, | ||
f"DAGRun with DAG ID: '{dag_id}' and " | ||
f"DAGRun logical date: '{logical_date.isoformat(sep=' ')}' already exists", | ||
) | ||
|
||
raise HTTPException( | ||
status.HTTP_409_CONFLICT, | ||
f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{body.dag_run_id}' already exists", | ||
) | ||
Comment on lines
+369
to
+372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DB duplicate entry exceptions are already handled by the application. You can just always try to execute this code, if the database crashes with duplicate entry, a nice 409 errors will automatically be returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to legacy endpoint implementation, logical_date can't be None. So, I had to remove
datetime| None
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to remove this entirely and let the backend interpret the logical_date. I assume that has to happen at a deeper layer than use
fastapi
is that right @uranusjr ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I referred to the implementation and updated this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah a deeper layer should infer logical date from the data interval, or use
now()
if those are not provided. This should not take the argument at all.