diff --git a/task_sdk/src/airflow/sdk/execution_time/task_runner.py b/task_sdk/src/airflow/sdk/execution_time/task_runner.py index 5d788cf3d73a3..86c51673fcb94 100644 --- a/task_sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task_sdk/src/airflow/sdk/execution_time/task_runner.py @@ -438,7 +438,12 @@ def run(ti: RuntimeTaskInstance, log: Logger): ) # TODO: Run task failure callbacks here except SystemExit: - ... + # SystemExit needs to be retried if they are eligible. + msg = TaskState( + state=TerminalTIState.FAILED, + end_date=datetime.now(tz=timezone.utc), + ) + # TODO: Run task failure callbacks here except BaseException: # TODO: Run task failure callbacks here msg = TaskState(state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc)) diff --git a/task_sdk/tests/execution_time/test_task_runner.py b/task_sdk/tests/execution_time/test_task_runner.py index 582e5a19e1fb3..54a38c3948067 100644 --- a/task_sdk/tests/execution_time/test_task_runner.py +++ b/task_sdk/tests/execution_time/test_task_runner.py @@ -292,6 +292,44 @@ def test_run_raises_base_exception(time_machine, mocked_parse, make_ti_context, ) +def test_run_raises_system_exit(time_machine, mocked_parse, make_ti_context, mock_supervisor_comms): + """Test running a basic task that exits with SystemExit exception.""" + from airflow.providers.standard.operators.python import PythonOperator + + task = PythonOperator( + task_id="system_exit_task", + python_callable=lambda: exit(10), + ) + + what = StartupDetails( + ti=TaskInstance( + id=uuid7(), + task_id="system_exit_task", + dag_id="basic_dag_system_exit", + run_id="c", + try_number=1, + ), + file="", + requests_fd=0, + ti_context=make_ti_context(), + ) + + ti = mocked_parse(what, "basic_dag_system_exit", task) + + instant = timezone.datetime(2024, 12, 3, 10, 0) + time_machine.move_to(instant, tick=False) + + run(ti, log=mock.MagicMock()) + + mock_supervisor_comms.send_request.assert_called_once_with( + msg=TaskState( + state=TerminalTIState.FAILED, + end_date=instant, + ), + log=mock.ANY, + ) + + def test_startup_basic_templated_dag(mocked_parse, make_ti_context, mock_supervisor_comms): """Test running a DAG with templated task.""" from airflow.providers.standard.operators.bash import BashOperator