diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow/api_fastapi/execution_api/routes/task_instances.py index f1ab877ddfaa..3d6e032d1673 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -18,6 +18,7 @@ from __future__ import annotations import logging +from datetime import datetime from typing import TYPE_CHECKING, Annotated from uuid import UUID @@ -134,17 +135,13 @@ def ti_update_state( session.flush() ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none() - ti.state = State.DEFERRED - ti.trigger_id = trigger_row.id - ti.next_method = ti_patch_payload.next_method - ti.next_kwargs = ti_patch_payload.kwargs or {} - timeout = ti_patch_payload.timeout # Calculate timeout too if it was passed - if timeout is not None: - ti.trigger_timeout = timezone.utcnow() + timeout + trigger_timeout: datetime | None = None + if ti_patch_payload.timeout is not None: + trigger_timeout = timezone.utcnow() + ti_patch_payload.timeout else: - ti.trigger_timeout = None + trigger_timeout = None # If an execution_timeout is set, set the timeout to the minimum of # it and the trigger timeout @@ -154,14 +151,18 @@ def ti_update_state( if TYPE_CHECKING: assert ti.start_date if ti.trigger_timeout: - ti.trigger_timeout = min(ti.start_date + execution_timeout, ti.trigger_timeout) + trigger_timeout = min(ti.start_date + execution_timeout, ti.trigger_timeout) else: - ti.trigger_timeout = ti.start_date + execution_timeout + trigger_timeout = ti.start_date + execution_timeout - session.commit() - - log.info("TI %s state updated to: deferred", ti_id_str) - return + query = update(TI).where(TI.id == ti_id_str) + query = query.values( + state=State.DEFERRED, + trigger_id=trigger_row.id, + next_method=ti_patch_payload.next_method, + next_kwargs=ti_patch_payload.kwargs or {}, + trigger_timeout=trigger_timeout, + ) # TODO: Replace this with FastAPI's Custom Exception handling: # https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers