diff --git a/task_sdk/tests/execution_time/conftest.py b/task_sdk/tests/execution_time/conftest.py index 032e67ae343ca..9ff2f0378959b 100644 --- a/task_sdk/tests/execution_time/conftest.py +++ b/task_sdk/tests/execution_time/conftest.py @@ -22,9 +22,8 @@ from unittest import mock if TYPE_CHECKING: - from collections.abc import Callable + from datetime import datetime - from airflow.sdk.api.datamodels._generated import TIRunContext from airflow.sdk.definitions.baseoperator import BaseOperator from airflow.sdk.execution_time.comms import StartupDetails from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance @@ -83,7 +82,9 @@ def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) -> RuntimeTas dag = DAG(dag_id=dag_id, start_date=timezone.datetime(2024, 12, 3)) task.dag = dag t = dag.task_dict[task.task_id] - ti = RuntimeTaskInstance.model_construct(**what.ti.model_dump(exclude_unset=True), task=t) + ti = RuntimeTaskInstance.model_construct( + **what.ti.model_dump(exclude_unset=True), task=t, _ti_context_from_server=what.ti_context + ) spy_agency.spy_on(parse, call_fake=lambda _: ti) return ti @@ -91,15 +92,13 @@ def set_dag(what: StartupDetails, dag_id: str, task: BaseOperator) -> RuntimeTas @pytest.fixture -def create_runtime_ti( - mocked_parse: Callable[[StartupDetails, str, BaseOperator], RuntimeTaskInstance], - make_ti_context: Callable[..., TIRunContext], -) -> Callable[[BaseOperator, TIRunContext | None, StartupDetails | None], RuntimeTaskInstance]: +def create_runtime_ti(mocked_parse, make_ti_context): """ Fixture to create a Runtime TaskInstance for testing purposes without defining a dag file. - This fixture sets up a `RuntimeTaskInstance` with default or custom `TIRunContext` and `StartupDetails`, - making it easy to simulate task execution scenarios in tests. + It mimics the behavior of the `parse` function by creating a `RuntimeTaskInstance` based on the provided + `StartupDetails` (formed from arguments) and task. This allows you to test the logic of a task without + having to define a DAG file, parse it, get context from the server, etc. Example usage: :: @@ -118,26 +117,40 @@ def execute(self, context): from airflow.sdk.execution_time.comms import StartupDetails def _create_task_instance( - task, context_from_server: TIRunContext | None = None, startup_details: StartupDetails | None = None + task: BaseOperator, + dag_id: str = "test_dag", + run_id: str = "test_run", + logical_date: str | datetime = "2024-12-01T01:00:00Z", + data_interval_start: str | datetime = "2024-12-01T00:00:00Z", + data_interval_end: str | datetime = "2024-12-01T01:00:00Z", + start_date: str | datetime = "2024-12-01T01:00:00Z", + run_type: str = "manual", + try_number: int = 1, + ti_id=None, ) -> RuntimeTaskInstance: - if context_from_server is None: - context_from_server = make_ti_context() - - if not startup_details: - startup_details = StartupDetails( - ti=TaskInstance( - id=uuid7(), - task_id=task.task_id, - dag_id=context_from_server.dag_run.dag_id, - run_id=context_from_server.dag_run.run_id, - try_number=1, - ), - file="", - requests_fd=0, - ti_context=context_from_server, - ) - - ti = mocked_parse(startup_details, context_from_server.dag_run.dag_id, task) + if not ti_id: + ti_id = uuid7() + + ti_context = make_ti_context( + dag_id=dag_id, + run_id=run_id, + logical_date=logical_date, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + start_date=start_date, + run_type=run_type, + ) + + startup_details = StartupDetails( + ti=TaskInstance( + id=ti_id, task_id=task.task_id, dag_id=dag_id, run_id=run_id, try_number=try_number + ), + file="", + requests_fd=0, + ti_context=ti_context, + ) + + ti = mocked_parse(startup_details, dag_id, task) return ti return _create_task_instance diff --git a/task_sdk/tests/execution_time/test_task_runner.py b/task_sdk/tests/execution_time/test_task_runner.py index 6a7b5743209f5..ebe2e05ec95c5 100644 --- a/task_sdk/tests/execution_time/test_task_runner.py +++ b/task_sdk/tests/execution_time/test_task_runner.py @@ -124,19 +124,12 @@ def test_parse(test_dags_dir: Path, make_ti_context): assert isinstance(ti.task.dag, DAG) -def test_run_basic(time_machine, mocked_parse, make_ti_context, spy_agency, mock_supervisor_comms): +def test_run_basic(time_machine, create_runtime_ti, spy_agency, mock_supervisor_comms): """Test running a basic task.""" - what = StartupDetails( - ti=TaskInstance(id=uuid7(), task_id="hello", dag_id="super_basic_run", run_id="c", try_number=1), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) - ti = mocked_parse(what, "super_basic_run", CustomOperator(task_id="hello")) + ti = create_runtime_ti(dag_id="super_basic_run", task=CustomOperator(task_id="hello")) # Ensure that task is locked for execution spy_agency.spy_on(ti.task.prepare_for_execution) @@ -152,7 +145,7 @@ def test_run_basic(time_machine, mocked_parse, make_ti_context, spy_agency, mock ) -def test_run_deferred_basic(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_deferred_basic(time_machine, create_runtime_ti, mock_supervisor_comms): """Test that a task can transition to a deferred state.""" import datetime @@ -167,12 +160,6 @@ def test_run_deferred_basic(time_machine, mocked_parse, make_ti_context, mock_su timeout=600, ) time_machine.move_to(instant, tick=False) - what = StartupDetails( - ti=TaskInstance(id=uuid7(), task_id="async", dag_id="basic_deferred_run", run_id="c", try_number=1), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) # Expected DeferTask expected_defer_task = DeferTask( @@ -187,14 +174,14 @@ def test_run_deferred_basic(time_machine, mocked_parse, make_ti_context, mock_su ) # Run the task - ti = mocked_parse(what, "basic_deferred_run", task) + ti = create_runtime_ti(dag_id="basic_deferred_run", task=task) run(ti, log=mock.MagicMock()) # send_request will only be called when the TaskDeferred exception is raised mock_supervisor_comms.send_request.assert_called_once_with(msg=expected_defer_task, log=mock.ANY) -def test_run_basic_skipped(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_basic_skipped(time_machine, create_runtime_ti, mock_supervisor_comms): """Test running a basic task that marks itself skipped.""" from airflow.providers.standard.operators.python import PythonOperator @@ -205,14 +192,7 @@ def test_run_basic_skipped(time_machine, mocked_parse, make_ti_context, mock_sup ), ) - what = StartupDetails( - ti=TaskInstance(id=uuid7(), task_id="skip", dag_id="basic_skipped", run_id="c", try_number=1), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, "basic_skipped", task) + ti = create_runtime_ti(dag_id="basic_skipped", task=task) instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -224,7 +204,7 @@ def test_run_basic_skipped(time_machine, mocked_parse, make_ti_context, mock_sup ) -def test_run_raises_base_exception(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_raises_base_exception(time_machine, create_runtime_ti, mock_supervisor_comms): """Test running a basic task that raises a base exception which should send fail_with_retry state.""" from airflow.providers.standard.operators.python import PythonOperator @@ -233,20 +213,7 @@ def test_run_raises_base_exception(time_machine, mocked_parse, make_ti_context, python_callable=lambda: 1 / 0, ) - what = StartupDetails( - ti=TaskInstance( - id=uuid7(), - task_id="zero_division_error", - dag_id="basic_dag_base_exception", - run_id="c", - try_number=1, - ), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, "basic_dag_base_exception", task) + ti = create_runtime_ti(dag_id="basic_dag_base_exception", task=task) instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -262,7 +229,7 @@ def test_run_raises_base_exception(time_machine, mocked_parse, make_ti_context, ) -def test_run_raises_system_exit(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_raises_system_exit(time_machine, create_runtime_ti, mock_supervisor_comms): """Test running a basic task that exits with SystemExit exception.""" from airflow.providers.standard.operators.python import PythonOperator @@ -271,20 +238,7 @@ def test_run_raises_system_exit(time_machine, mocked_parse, make_ti_context, moc python_callable=lambda: exit(10), ) - what = StartupDetails( - ti=TaskInstance( - id=uuid7(), - task_id="system_exit_task", - dag_id="basic_dag_system_exit", - run_id="c", - try_number=1, - ), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, "basic_dag_system_exit", task) + ti = create_runtime_ti(task=task, dag_id="basic_dag_system_exit") instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -300,7 +254,7 @@ def test_run_raises_system_exit(time_machine, mocked_parse, make_ti_context, moc ) -def test_run_raises_airflow_exception(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_raises_airflow_exception(time_machine, create_runtime_ti, mock_supervisor_comms): """Test running a basic task that exits with AirflowException.""" from airflow.providers.standard.operators.python import PythonOperator @@ -311,20 +265,7 @@ def test_run_raises_airflow_exception(time_machine, mocked_parse, make_ti_contex ), ) - what = StartupDetails( - ti=TaskInstance( - id=uuid7(), - task_id="af_exception_task", - dag_id="basic_dag_af_exception", - run_id="c", - try_number=1, - ), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, "basic_dag_af_exception", task) + ti = create_runtime_ti(task=task, dag_id="basic_dag_af_exception") instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -340,7 +281,7 @@ def test_run_raises_airflow_exception(time_machine, mocked_parse, make_ti_contex ) -def test_run_task_timeout(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): +def test_run_task_timeout(time_machine, create_runtime_ti, mock_supervisor_comms): """Test running a basic task that times out.""" from time import sleep @@ -352,20 +293,7 @@ def test_run_task_timeout(time_machine, mocked_parse, make_ti_context, mock_supe python_callable=lambda: sleep(2), ) - what = StartupDetails( - ti=TaskInstance( - id=uuid7(), - task_id="sleep", - dag_id="basic_dag_time_out", - run_id="c", - try_number=1, - ), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, "basic_dag_time_out", task) + ti = create_runtime_ti(task=task, dag_id="basic_dag_time_out") instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -498,7 +426,7 @@ def execute(self, context): ], ) def test_startup_and_run_dag_with_templated_fields( - command, rendered_command, mocked_parse, make_ti_context, time_machine, mock_supervisor_comms + command, rendered_command, create_runtime_ti, time_machine, mock_supervisor_comms ): """Test startup of a DAG with various templated fields.""" @@ -506,16 +434,8 @@ def test_startup_and_run_dag_with_templated_fields( task = BashOperator(task_id="templated_task", bash_command=command) - what = StartupDetails( - ti=TaskInstance(id=uuid7(), task_id="templated_task", dag_id="basic_dag", run_id="c", try_number=1), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - ti = mocked_parse(what, "basic_dag", task) - ti._ti_context_from_server = make_ti_context( - logical_date="2024-12-01 01:00:00+00:00", - run_id="c", + ti = create_runtime_ti( + task=task, dag_id="basic_dag", logical_date="2024-12-01 01:00:00+00:00", run_id="c" ) instant = timezone.datetime(2024, 12, 3, 10, 0) @@ -570,7 +490,7 @@ def execute(self, context): ], ) def test_run_basic_failed( - time_machine, mocked_parse, dag_id, task_id, fail_with_exception, make_ti_context, mock_supervisor_comms + time_machine, create_runtime_ti, dag_id, task_id, fail_with_exception, mock_supervisor_comms ): """Test running a basic task that marks itself as failed by raising exception.""" @@ -585,14 +505,7 @@ def execute(self, context): task = CustomOperator(task_id=task_id, e=fail_with_exception) - what = StartupDetails( - ti=TaskInstance(id=uuid7(), task_id=task_id, dag_id=dag_id, run_id="c", try_number=1), - file="", - requests_fd=0, - ti_context=make_ti_context(), - ) - - ti = mocked_parse(what, dag_id, task) + ti = create_runtime_ti(task=task, dag_id=dag_id) instant = timezone.datetime(2024, 12, 3, 10, 0) time_machine.move_to(instant, tick=False) @@ -609,14 +522,18 @@ def test_get_context_without_ti_context_from_server(self, mocked_parse, make_ti_ """Test get_template_context without ti_context_from_server.""" task = BaseOperator(task_id="hello") + dag_id = "basic_task" + + # Assign task to DAG + get_inline_dag(dag_id=dag_id, task=task) ti_id = uuid7() - ti = TaskInstance( - id=ti_id, task_id=task.task_id, dag_id="basic_task", run_id="test_run", try_number=1 - ) + ti = TaskInstance(id=ti_id, task_id=task.task_id, dag_id=dag_id, run_id="test_run", try_number=1) - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) + # Keep the context empty + runtime_ti = RuntimeTaskInstance.model_construct( + **ti.model_dump(exclude_unset=True), task=task, _ti_context_from_server=None + ) context = runtime_ti.get_template_context() # Verify the context keys and values @@ -638,24 +555,18 @@ def test_get_context_without_ti_context_from_server(self, mocked_parse, make_ti_ "ti": runtime_ti, } - def test_get_context_with_ti_context_from_server(self, mocked_parse, make_ti_context): + def test_get_context_with_ti_context_from_server(self, create_runtime_ti): """Test the context keys are added when sent from API server (mocked)""" from airflow.utils import timezone - ti = TaskInstance(id=uuid7(), task_id="hello", dag_id="basic_task", run_id="test_run", try_number=1) - - task = BaseOperator(task_id=ti.task_id) - - ti_context = make_ti_context(dag_id=ti.dag_id, run_id=ti.run_id) - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=ti_context) - - runtime_ti = mocked_parse(what, ti.dag_id, task) + task = BaseOperator(task_id="hello") # Assume the context is sent from the API server # `task_sdk/tests/api/test_client.py::test_task_instance_start` checks the context is received # from the API server - runtime_ti._ti_context_from_server = ti_context - dr = ti_context.dag_run + runtime_ti = create_runtime_ti(task=task, dag_id="basic_task") + + dr = runtime_ti._ti_context_from_server.dag_run context = runtime_ti.get_template_context() @@ -687,15 +598,11 @@ def test_get_context_with_ti_context_from_server(self, mocked_parse, make_ti_con "ts_nodash_with_tz": "20241201T010000+0000", } - def test_get_connection_from_context(self, mocked_parse, make_ti_context, mock_supervisor_comms): + def test_get_connection_from_context(self, create_runtime_ti, mock_supervisor_comms): """Test that the connection is fetched from the API server via the Supervisor lazily when accessed""" task = BaseOperator(task_id="hello") - ti_id = uuid7() - ti = TaskInstance( - id=ti_id, task_id=task.task_id, dag_id="basic_task", run_id="test_run", try_number=1 - ) conn = ConnectionResult( conn_id="test_conn", conn_type="mysql", @@ -707,8 +614,7 @@ def test_get_connection_from_context(self, mocked_parse, make_ti_context, mock_s extra='{"extra_key": "extra_value"}', ) - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) + runtime_ti = create_runtime_ti(task=task, dag_id="test_get_connection_from_context") mock_supervisor_comms.get_message.return_value = conn context = runtime_ti.get_template_context() @@ -741,15 +647,10 @@ def test_get_connection_from_context(self, mocked_parse, make_ti_context, mock_s dejson_from_conn = conn_from_context.extra_dejson assert dejson_from_conn == {"extra_key": "extra_value"} - def test_template_render(self, mocked_parse, make_ti_context): + def test_template_render(self, create_runtime_ti): task = BaseOperator(task_id="test_template_render_task") - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id="test_template_render", run_id="test_run", try_number=1 - ) - - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) + runtime_ti = create_runtime_ti(task=task, dag_id="test_template_render") template_context = runtime_ti.get_template_context() result = runtime_ti.task.render_template( "Task: {{ dag.dag_id }} -> {{ task.task_id }}", template_context @@ -763,22 +664,21 @@ def test_template_render(self, mocked_parse, make_ti_context): ('{{ conn.get("a_connection", "unused_fallback").host }}', "hostvalue"), ("{{ conn.a_connection.host }}", "hostvalue"), ("{{ conn.a_connection.login }}", "loginvalue"), + ("{{ conn.a_connection.schema }}", "schemavalues"), ("{{ conn.a_connection.password }}", "passwordvalue"), ('{{ conn.a_connection.extra_dejson["extra__asana__workspace"] }}', "extra1"), ("{{ conn.a_connection.extra_dejson.extra__asana__workspace }}", "extra1"), ], ) def test_template_with_connection( - self, content, expected_output, make_ti_context, mocked_parse, mock_supervisor_comms + self, content, expected_output, create_runtime_ti, mock_supervisor_comms ): """ Test the availability of connections in templates """ task = BaseOperator(task_id="hello") + runtime_ti = create_runtime_ti(task=task, dag_id="test_template_with_connection") - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id="basic_task", run_id="test_run", try_number=1 - ) conn = ConnectionResult( conn_id="a_connection", conn_type="a_type", @@ -789,8 +689,6 @@ def test_template_with_connection( extra='{"extra__asana__workspace": "extra1"}', ) - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) mock_supervisor_comms.get_message.return_value = conn context = runtime_ti.get_template_context() @@ -809,20 +707,15 @@ def test_template_with_connection( ], ) def test_get_variable_from_context( - self, mocked_parse, make_ti_context, mock_supervisor_comms, accessor_type, var_value, expected_value + self, create_runtime_ti, mock_supervisor_comms, accessor_type, var_value: str, expected_value ): """Test that the variable is fetched from the API server via the Supervisor lazily when accessed""" task = BaseOperator(task_id="hello") + runtime_ti = create_runtime_ti(task=task) - ti_id = uuid7() - ti = TaskInstance( - id=ti_id, task_id=task.task_id, dag_id="basic_task", run_id="test_run", try_number=1 - ) var = VariableResult(key="test_key", value=var_value) - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) mock_supervisor_comms.get_message.return_value = var context = runtime_ti.get_template_context() @@ -853,8 +746,7 @@ class TestXComAfterTaskExecution: ) def test_xcom_push_flag( self, - mocked_parse, - make_ti_context, + create_runtime_ti, mock_supervisor_comms, spy_agency, do_xcom_push: bool, @@ -869,12 +761,7 @@ def execute(self, context): task = CustomOperator(task_id="hello", do_xcom_push=do_xcom_push) - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id="xcom_push_flag", run_id="test_run", try_number=1 - ) - - what = StartupDetails(ti=ti, file="", requests_fd=0, ti_context=make_ti_context()) - runtime_ti = mocked_parse(what, ti.dag_id, task) + runtime_ti = create_runtime_ti(task=task) spy_agency.spy_on(_push_xcom_if_needed, call_original=True) spy_agency.spy_on(runtime_ti.xcom_push, call_original=False) @@ -888,7 +775,7 @@ def execute(self, context): else: spy_agency.assert_spy_not_called(runtime_ti.xcom_push) - def test_xcom_with_multiple_outputs(self, mocked_parse, spy_agency): + def test_xcom_with_multiple_outputs(self, create_runtime_ti, spy_agency): """Test that the task pushes to XCom when multiple outputs are returned.""" result = {"key1": "value1", "key2": "value2"} @@ -899,12 +786,8 @@ def execute(self, context): task = CustomOperator( task_id="test_xcom_push_with_multiple_outputs", do_xcom_push=True, multiple_outputs=True ) - dag = get_inline_dag(dag_id="test_dag", task=task) - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id=dag.dag_id, run_id="test_run", try_number=1 - ) - runtime_ti = RuntimeTaskInstance.model_construct(**ti.model_dump(exclude_unset=True), task=task) + runtime_ti = create_runtime_ti(task=task) spy_agency.spy_on(runtime_ti.xcom_push, call_original=False) _push_xcom_if_needed(result=result, ti=runtime_ti) @@ -918,7 +801,7 @@ def execute(self, context): for key, value in expected_calls: spy_agency.assert_spy_called_with(runtime_ti.xcom_push, key, value) - def test_xcom_with_multiple_outputs_and_no_mapping_result(self, mocked_parse, spy_agency): + def test_xcom_with_multiple_outputs_and_no_mapping_result(self, create_runtime_ti, spy_agency): """Test that error is raised when multiple outputs are returned without mapping.""" result = "value1" @@ -929,12 +812,8 @@ def execute(self, context): task = CustomOperator( task_id="test_xcom_push_with_multiple_outputs", do_xcom_push=True, multiple_outputs=True ) - dag = get_inline_dag(dag_id="test_dag", task=task) - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id=dag.dag_id, run_id="test_run", try_number=1 - ) - runtime_ti = RuntimeTaskInstance.model_construct(**ti.model_dump(exclude_unset=True), task=task) + runtime_ti = create_runtime_ti(task=task) spy_agency.spy_on(runtime_ti.xcom_push, call_original=False) with pytest.raises( @@ -943,7 +822,7 @@ def execute(self, context): ): _push_xcom_if_needed(result=result, ti=runtime_ti) - def test_xcom_with_multiple_outputs_and_key_is_not_string(self, mocked_parse, spy_agency): + def test_xcom_with_multiple_outputs_and_key_is_not_string(self, create_runtime_ti, spy_agency): """Test that error is raised when multiple outputs are returned and key isn't string.""" result = {2: "value1", "key2": "value2"} @@ -954,12 +833,8 @@ def execute(self, context): task = CustomOperator( task_id="test_xcom_push_with_multiple_outputs", do_xcom_push=True, multiple_outputs=True ) - dag = get_inline_dag(dag_id="test_dag", task=task) - ti = TaskInstance( - id=uuid7(), task_id=task.task_id, dag_id=dag.dag_id, run_id="test_run", try_number=1 - ) - runtime_ti = RuntimeTaskInstance.model_construct(**ti.model_dump(exclude_unset=True), task=task) + runtime_ti = create_runtime_ti(task=task) spy_agency.spy_on(runtime_ti.xcom_push, call_original=False)