Skip to content

Commit

Permalink
review comments from kaxil - part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh committed Nov 25, 2024
1 parent 38f483f commit 4d3140d
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import logging
from datetime import datetime
from typing import TYPE_CHECKING, Annotated
from uuid import UUID

Expand Down Expand Up @@ -134,17 +135,13 @@ def ti_update_state(
session.flush()

ti = session.query(TI).filter(TI.id == ti_id_str).one_or_none()
ti.state = State.DEFERRED
ti.trigger_id = trigger_row.id
ti.next_method = ti_patch_payload.next_method
ti.next_kwargs = ti_patch_payload.kwargs or {}
timeout = ti_patch_payload.timeout

# Calculate timeout too if it was passed
if timeout is not None:
ti.trigger_timeout = timezone.utcnow() + timeout
trigger_timeout: datetime | None = None
if ti_patch_payload.timeout is not None:
trigger_timeout = timezone.utcnow() + ti_patch_payload.timeout
else:
ti.trigger_timeout = None
trigger_timeout = None

# If an execution_timeout is set, set the timeout to the minimum of
# it and the trigger timeout
Expand All @@ -154,14 +151,18 @@ def ti_update_state(
if TYPE_CHECKING:
assert ti.start_date
if ti.trigger_timeout:
ti.trigger_timeout = min(ti.start_date + execution_timeout, ti.trigger_timeout)
trigger_timeout = min(ti.start_date + execution_timeout, ti.trigger_timeout)
else:
ti.trigger_timeout = ti.start_date + execution_timeout
trigger_timeout = ti.start_date + execution_timeout

session.commit()

log.info("TI %s state updated to: deferred", ti_id_str)
return
query = update(TI).where(TI.id == ti_id_str)
query = query.values(
state=State.DEFERRED,
trigger_id=trigger_row.id,
next_method=ti_patch_payload.next_method,
next_kwargs=ti_patch_payload.kwargs or {},
trigger_timeout=trigger_timeout,
)

# TODO: Replace this with FastAPI's Custom Exception handling:
# https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
Expand Down

0 comments on commit 4d3140d

Please sign in to comment.