Skip to content

Commit

Permalink
AIP-72: Use create_runtime_ti fixture more widely (apache#45510)
Browse files Browse the repository at this point in the history
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
  • Loading branch information
kaxil authored Jan 9, 2025
1 parent b18fccb commit 0e114c2
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 203 deletions.
69 changes: 41 additions & 28 deletions task_sdk/tests/execution_time/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
from unittest import mock

if TYPE_CHECKING:
from collections.abc import Callable
from datetime import datetime

from airflow.sdk.api.datamodels._generated import TIRunContext
from airflow.sdk.definitions.baseoperator import BaseOperator
from airflow.sdk.execution_time.comms import StartupDetails
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
Expand Down Expand Up @@ -83,23 +82,23 @@ def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) -> RuntimeTas
dag = DAG(dag_id=dag_id, start_date=timezone.datetime(2024, 12, 3))
task.dag = dag
t = dag.task_dict[task.task_id]
ti = RuntimeTaskInstance.model_construct(**what.ti.model_dump(exclude_unset=True), task=t)
ti = RuntimeTaskInstance.model_construct(
**what.ti.model_dump(exclude_unset=True), task=t, _ti_context_from_server=what.ti_context
)
spy_agency.spy_on(parse, call_fake=lambda _: ti)
return ti

return set_dag


@pytest.fixture
def create_runtime_ti(
mocked_parse: Callable[[StartupDetails, str, BaseOperator], RuntimeTaskInstance],
make_ti_context: Callable[..., TIRunContext],
) -> Callable[[BaseOperator, TIRunContext | None, StartupDetails | None], RuntimeTaskInstance]:
def create_runtime_ti(mocked_parse, make_ti_context):
"""
Fixture to create a Runtime TaskInstance for testing purposes without defining a dag file.
This fixture sets up a `RuntimeTaskInstance` with default or custom `TIRunContext` and `StartupDetails`,
making it easy to simulate task execution scenarios in tests.
It mimics the behavior of the `parse` function by creating a `RuntimeTaskInstance` based on the provided
`StartupDetails` (formed from arguments) and task. This allows you to test the logic of a task without
having to define a DAG file, parse it, get context from the server, etc.
Example usage: ::
Expand All @@ -118,26 +117,40 @@ def execute(self, context):
from airflow.sdk.execution_time.comms import StartupDetails

def _create_task_instance(
task, context_from_server: TIRunContext | None = None, startup_details: StartupDetails | None = None
task: BaseOperator,
dag_id: str = "test_dag",
run_id: str = "test_run",
logical_date: str | datetime = "2024-12-01T01:00:00Z",
data_interval_start: str | datetime = "2024-12-01T00:00:00Z",
data_interval_end: str | datetime = "2024-12-01T01:00:00Z",
start_date: str | datetime = "2024-12-01T01:00:00Z",
run_type: str = "manual",
try_number: int = 1,
ti_id=None,
) -> RuntimeTaskInstance:
if context_from_server is None:
context_from_server = make_ti_context()

if not startup_details:
startup_details = StartupDetails(
ti=TaskInstance(
id=uuid7(),
task_id=task.task_id,
dag_id=context_from_server.dag_run.dag_id,
run_id=context_from_server.dag_run.run_id,
try_number=1,
),
file="",
requests_fd=0,
ti_context=context_from_server,
)

ti = mocked_parse(startup_details, context_from_server.dag_run.dag_id, task)
if not ti_id:
ti_id = uuid7()

ti_context = make_ti_context(
dag_id=dag_id,
run_id=run_id,
logical_date=logical_date,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
start_date=start_date,
run_type=run_type,
)

startup_details = StartupDetails(
ti=TaskInstance(
id=ti_id, task_id=task.task_id, dag_id=dag_id, run_id=run_id, try_number=try_number
),
file="",
requests_fd=0,
ti_context=ti_context,
)

ti = mocked_parse(startup_details, dag_id, task)
return ti

return _create_task_instance
Loading

0 comments on commit 0e114c2

Please sign in to comment.