Skip to content

Commit

Permalink
Fixed the endless reschedule (apache#45224)
Browse files Browse the repository at this point in the history
  • Loading branch information
morooshka authored Dec 27, 2024
1 parent af130c0 commit 7f2b8ef
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 31 deletions.
36 changes: 7 additions & 29 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.session import create_session

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.typing_compat import Self
from airflow.utils.context import Context

Expand Down Expand Up @@ -84,30 +82,6 @@ def __bool__(self) -> bool:
return self.is_done


@provide_session
def _orig_start_date(
dag_id: str, task_id: str, run_id: str, map_index: int, try_number: int, session: Session = NEW_SESSION
):
"""
Get the original start_date for a rescheduled task.
:meta private:
"""
return session.scalar(
select(TaskReschedule)
.where(
TaskReschedule.dag_id == dag_id,
TaskReschedule.task_id == task_id,
TaskReschedule.run_id == run_id,
TaskReschedule.map_index == map_index,
TaskReschedule.try_number == try_number,
)
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
.limit(1)
)


class BaseSensorOperator(BaseOperator, SkipMixin):
"""
Sensor operators are derived from this class and inherit these attributes.
Expand Down Expand Up @@ -246,8 +220,12 @@ def execute(self, context: Context) -> Any:
ti = context["ti"]
max_tries: int = ti.max_tries or 0
retries: int = self.retries or 0

# If reschedule, use the start date of the first try (first try can be either the very
# first execution of the task, or the first execution after the task was cleared.)
# first execution of the task, or the first execution after the task was cleared).
# If the first try's record was not saved due to the Exception occurred and the following
# transaction rollback, the next available attempt should be taken
# to prevent falling in the endless rescheduling
first_try_number = max_tries - retries + 1
with create_session() as session:
start_date = session.scalar(
Expand All @@ -257,7 +235,7 @@ def execute(self, context: Context) -> Any:
TaskReschedule.task_id == ti.task_id,
TaskReschedule.run_id == ti.run_id,
TaskReschedule.map_index == ti.map_index,
TaskReschedule.try_number == first_try_number,
TaskReschedule.try_number >= first_try_number,
)
.order_by(TaskReschedule.id.asc())
.with_only_columns(TaskReschedule.start_date)
Expand Down
285 changes: 283 additions & 2 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def wrapper(ti):


class DummySensor(BaseSensorOperator):
def __init__(self, return_value=False, **kwargs):
def __init__(self, return_value: bool | None = False, **kwargs):
super().__init__(**kwargs)
self.return_value = return_value

Expand Down Expand Up @@ -422,7 +422,7 @@ def _get_tis():
assert task_reschedules[0].try_number == 1
assert dummy_ti.state == State.NONE

# second poke timesout and task instance is failed
# second poke times out and task instance is failed
time_machine.coordinates.shift(sensor.poke_interval)
with pytest.raises(AirflowSensorTimeout):
self._run(sensor)
Expand Down Expand Up @@ -878,6 +878,287 @@ def _increment_try_number():
assert sensor_ti.max_tries == 4
assert sensor_ti.state == State.FAILED

def test_reschedule_set_retries_no_errors_timeout_fail(
self, make_sensor, time_machine, session, task_reschedules_for_ti
):
"""
Mode "reschedule", retries set, but no errors occurred, time gone out.
Retries and timeout configurations interact correctly.
Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7)
If no errors occurred, no retries should be executed.
This is how it is expected to behave:
1. 00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
2. 00:05 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
3. 00:10 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
4. 00:15 Raises AirflowSensorTimeout: try_number=1 max_tries=2 state=FAILED
"""
sensor, dr = make_sensor(
return_value=None,
poke_interval=5,
timeout=10,
retries=2,
retry_delay=timedelta(seconds=3),
mode="reschedule",
silent_fail=False,
)

def _get_sensor_ti():
return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP)

sensor.poke = Mock(side_effect=[False, False, False, False])

# Scheduler does this before the first run
sensor_ti = _get_sensor_ti()
sensor_ti.state = State.SCHEDULED
sensor_ti.try_number += 1
session.commit()

# 1-st poke
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 1

# 2-nd poke
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 2

# 3-rd poke
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 3

# 4-th poke causes timeout
time_machine.coordinates.shift(sensor.poke_interval)

with pytest.raises(AirflowSensorTimeout):
self._run(sensor)

sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.FAILED

# On failed by timeout poke reschedule attempt is not saved
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 3

def test_reschedule_set_retries_1st_poke_runtime_error_timeout_fail(
self, make_sensor, time_machine, session, task_reschedules_for_ti
):
"""
Mode "reschedule", retries set, fist poke causes RuntimeError, time gone out.
RuntimeError on first iteration results in no reschedule attempt recorded in DB.
In that case sensor should not result in endless loop and should calculate running
timeout from:
1. The current system date on the second poke
2. From the second attempt on subsequent attempts
See issue #45050 https://github.com/apache/airflow/issues/45050
Retries and timeout configurations interact correctly.
Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7)
Number of retries incremented on error, but run timeout should not be extended.
This is how it is expected to behave:
1. 00:00 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY
2. 00:07 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE
This is a first saved reschedule attempt, and it is used to calculate the run duration
3. 00:12 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE
4. 00:17 Returns False: try_number=2 max_tries=2 state=UP_FOR_RESCHEDULE
5. 00:23 Raises AirflowSensorTimeout: try_number=2 max_tries=2 state=FAILED
"""
sensor, dr = make_sensor(
return_value=None,
poke_interval=5,
timeout=10,
retries=2,
retry_delay=timedelta(seconds=7),
mode="reschedule",
silent_fail=False,
)

def _get_sensor_ti():
return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP)

sensor.poke = Mock(side_effect=[RuntimeError, False, False, False, False])

# Scheduler does this before the first run
sensor_ti = _get_sensor_ti()
sensor_ti.state = State.SCHEDULED
sensor_ti.try_number += 1
session.commit()

# 1-st poke
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)

with pytest.raises(RuntimeError):
self._run(sensor)

sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RETRY

# On runtime error no reschedule attempt saved
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 0

# Scheduler does this before retry
sensor_ti = _get_sensor_ti()
sensor_ti.try_number += 1
session.commit()

# 2-nd poke
time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1))
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 1

# 3-rd poke
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 2

# 4-th poke
time_machine.coordinates.shift(sensor.poke_interval)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 3

# 5-th poke causes timeout
time_machine.coordinates.shift(sensor.poke_interval)

with pytest.raises(AirflowSensorTimeout):
self._run(sensor)

sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.FAILED

# On failed by timeout poke reschedule attempt is not saved
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 3

def test_reschedule_set_retries_2nd_poke_runtime_error_timeout_fail(
self, make_sensor, time_machine, session, task_reschedules_for_ti
):
"""
Mode "reschedule", retries set, fist poke causes RuntimeError, time gone out.
RuntimeError on second iteration results in no reschedule for the second attempt
recorded in DB.
In that case running timeout calculation should not be affected because sensor
should use the first attempt as the start of execution.
Retries and timeout configurations interact correctly.
Given a sensor configured: poke_interval=5 timeout=10 retries=2 retry_delay=timedelta(seconds=7)
Number of retries incremented on error, but run timeout should not be extended.
This is how it is expected to behave:
00:00 Returns False: try_number=1 max_tries=2 state=UP_FOR_RESCHEDULE
00:05 Raises RuntimeError: try_number=1 max_tries=2 state=UP_FOR_RETRY
00:12 Raises AirflowSensorTimeout: try_number=2 max_tries=2 state=FAILED
"""
sensor, dr = make_sensor(
return_value=None,
poke_interval=5,
timeout=10,
retries=2,
retry_delay=timedelta(seconds=7),
mode="reschedule",
silent_fail=False,
)

def _get_sensor_ti():
return next(x for x in dr.get_task_instances(session=session) if x.task_id == SENSOR_OP)

sensor.poke = Mock(side_effect=[False, RuntimeError, False])

# Scheduler does this before the first run
sensor_ti = _get_sensor_ti()
sensor_ti.state = State.SCHEDULED
sensor_ti.try_number += 1
session.commit()

# 1-st poke
print("**1" * 40)
date1 = timezone.utcnow()
time_machine.move_to(date1, tick=False)
self._run(sensor)
sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 1

# 2-nd poke
print("**2" * 40)
time_machine.coordinates.shift(sensor.poke_interval)

with pytest.raises(RuntimeError):
self._run(sensor)

sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 1
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.UP_FOR_RETRY

# On runtime error no reschedule attempt saved
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 1

# Scheduler does this before retry
sensor_ti = _get_sensor_ti()
sensor_ti.try_number += 1
session.commit()

# 3-rd poke causes timeout
print("*3*" * 40)
time_machine.coordinates.shift(sensor.retry_delay + timedelta(seconds=1))

with pytest.raises(AirflowSensorTimeout):
self._run(sensor)

sensor_ti = _get_sensor_ti()
assert sensor_ti.try_number == 2
assert sensor_ti.max_tries == 2
assert sensor_ti.state == State.FAILED

# On failed by timeout poke reschedule attempt is not saved
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == 0

def test_reschedule_and_retry_timeout_and_silent_fail(self, make_sensor, time_machine, session):
"""
Test mode="reschedule", silent_fail=True then retries and timeout configurations interact correctly.
Expand Down

0 comments on commit 7f2b8ef

Please sign in to comment.