diff --git a/dispatcher/backend/src/common/utils.py b/dispatcher/backend/src/common/utils.py index 7c0a417e..7527324b 100644 --- a/dispatcher/backend/src/common/utils.py +++ b/dispatcher/backend/src/common/utils.py @@ -153,7 +153,7 @@ def add_to_debug_if_present( flag_modified(task, "files") # mark 'files' as modified session.flush() # we have to flush first to avoid circular dependency - if schedule: + if schedule and code == TaskStatus.reserved: schedule.most_recent_task = task if code == TaskStatus.scraper_completed and schedule: diff --git a/dispatcher/backend/src/tests/integration/routes/business_logic/test_periodic_scheduling.py b/dispatcher/backend/src/tests/integration/routes/business_logic/test_periodic_scheduling.py new file mode 100644 index 00000000..5a91e126 --- /dev/null +++ b/dispatcher/backend/src/tests/integration/routes/business_logic/test_periodic_scheduling.py @@ -0,0 +1,101 @@ +import datetime +from typing import Any, Dict + +import sqlalchemy as sa +import sqlalchemy.orm as so + +import db.models as dbm +from common import getnow +from common.enum import TaskStatus +from db import Session +from utils.scheduling import request_tasks_using_schedule + + +class TestPeriodicScheduling: + def get_requested_tasks_for_schedule( + self, session: so.Session, schedule: Dict[str, Any] + ): + """return the list of requested tasks for a given schedule""" + return session.execute( + sa.select(dbm.RequestedTask).where( + dbm.RequestedTask.schedule_id == schedule["_id"] + ) + ).all() + + def assert_number_of_requested_tasks_for_schedule( + self, schedule: Dict[str, Any], expected: int + ): + """assert that the number of requested tasks for a given schedule is expected""" + with Session.begin() as session: + req_tasks = self.get_requested_tasks_for_schedule(session, schedule) + assert len(req_tasks) == expected + + def test_periodic_scheduling_simple(self, temp_schedule): + """simple tests of periodic scheduling""" + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) + request_tasks_using_schedule() + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1) + request_tasks_using_schedule() # request again should not duplicate requests + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1) + + def test_periodic_scheduling_most_recent_recent(self, temp_schedule, make_task): + """most recent task is too recent => do not request it again""" + most_recent = make_task(temp_schedule["name"]) + now = getnow() + with Session.begin() as session: + task = dbm.Task.get(session, most_recent["_id"]) + task.timestamp = { + TaskStatus.requested: now - datetime.timedelta(hours=4), + TaskStatus.reserved: now - datetime.timedelta(hours=3, minutes=59), + TaskStatus.started: now - datetime.timedelta(hours=3, minutes=58), + TaskStatus.succeeded: now - datetime.timedelta(hours=1), + } + schedule_obj = dbm.Schedule.get(session, temp_schedule["name"]) + schedule_obj.most_recent_task = task + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) + request_tasks_using_schedule() + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) + + def test_periodic_scheduling_most_recent_overdue_complete( + self, temp_schedule, make_task + ): + """most recent task is overdue and not running => request a new execution""" + most_recent = make_task(temp_schedule["name"]) + now = getnow() + with Session.begin() as session: + task = dbm.Task.get(session, most_recent["_id"]) + task.timestamp = { + TaskStatus.requested: now - datetime.timedelta(days=40, hours=4), + TaskStatus.reserved: now + - datetime.timedelta(days=40, hours=3, minutes=59), + TaskStatus.started: now + - datetime.timedelta(days=40, hours=3, minutes=58), + TaskStatus.succeeded: now - datetime.timedelta(days=40, hours=1), + } + schedule_obj = dbm.Schedule.get(session, temp_schedule["name"]) + schedule_obj.most_recent_task = task + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) + request_tasks_using_schedule() + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 1) + + def test_periodic_scheduling_most_recent_overdue_running( + self, temp_schedule, make_task + ): + """most recent task is overdue but still running => do not request it again""" + most_recent = make_task(temp_schedule["name"]) + now = getnow() + with Session.begin() as session: + task = dbm.Task.get(session, most_recent["_id"]) + task.timestamp = { + TaskStatus.requested: now - datetime.timedelta(days=40, hours=4), + TaskStatus.reserved: now + - datetime.timedelta(days=40, hours=3, minutes=59), + TaskStatus.started: now + - datetime.timedelta(days=40, hours=3, minutes=58), + } + task.status = TaskStatus.started + schedule_obj = dbm.Schedule.get(session, temp_schedule["name"]) + schedule_obj.most_recent_task = task + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) + request_tasks_using_schedule() + self.assert_number_of_requested_tasks_for_schedule(temp_schedule, 0) diff --git a/dispatcher/backend/src/tests/integration/routes/business_logic/test_task_event_handler.py b/dispatcher/backend/src/tests/integration/routes/business_logic/test_task_event_handler.py new file mode 100644 index 00000000..3e36ca15 --- /dev/null +++ b/dispatcher/backend/src/tests/integration/routes/business_logic/test_task_event_handler.py @@ -0,0 +1,28 @@ +import db.models as dbm +from common.enum import TaskStatus +from common.utils import task_event_handler +from db import Session + + +class TestTaskEvents: + def test_task_event_reserved_updates_most_recent_task( + self, temp_schedule, make_task, worker + ): + task = make_task(schedule_name=temp_schedule["name"]) + with Session.begin() as session: + schedule = dbm.Schedule.get(session, temp_schedule["name"]) + assert schedule.most_recent_task_id is None + task_event_handler( + session, task["_id"], TaskStatus.reserved, {"worker": worker["name"]} + ) + assert schedule.most_recent_task_id == task["_id"] + + def test_task_event_started_does_not_updates_most_recent_task( + self, temp_schedule, make_task, worker + ): + task = make_task(schedule_name=temp_schedule["name"]) + with Session.begin() as session: + schedule = dbm.Schedule.get(session, temp_schedule["name"]) + assert schedule.most_recent_task_id is None + task_event_handler(session, task["_id"], TaskStatus.started, {}) + assert schedule.most_recent_task_id is None diff --git a/dispatcher/backend/src/tests/integration/routes/conftest.py b/dispatcher/backend/src/tests/integration/routes/conftest.py index f0a0eb93..99faff3e 100644 --- a/dispatcher/backend/src/tests/integration/routes/conftest.py +++ b/dispatcher/backend/src/tests/integration/routes/conftest.py @@ -1,6 +1,7 @@ import datetime import pytest +import sqlalchemy as sa from werkzeug.security import generate_password_hash import db.models as dbm @@ -8,6 +9,7 @@ from common.enum import TaskStatus from common.roles import ROLES from db import Session +from utils.offliners import expanded_config @pytest.fixture(scope="module") @@ -70,6 +72,7 @@ def _make_schedule( session.flush() garbage_collector.add_schedule_id(schedule.id) document = { + "_id": schedule.id, "name": schedule.name, "category": schedule.category, "enabled": schedule.enabled, @@ -93,6 +96,31 @@ def schedule(make_schedule): return make_schedule() +@pytest.fixture +def temp_schedule(make_schedule): + """build a temporary schedule which will be deleted at the end of the test + + NB: associated tasks and requested tasks are also deleted + """ + schedule = make_schedule(name="periodic_sched_test") + yield schedule + with Session.begin() as session: + schedule_obj = dbm.Schedule.get(session, schedule["name"]) + schedule_obj.most_recent_task = None + session.flush() + session.execute( + sa.delete(dbm.RequestedTask).where( + dbm.RequestedTask.schedule_id == schedule["_id"] + ) + ) + session.execute( + sa.delete(dbm.Task).where(dbm.Task.schedule_id == schedule["_id"]) + ) + session.execute( + sa.delete(dbm.Schedule).where(dbm.Schedule.id == schedule["_id"]) + ) + + @pytest.fixture(scope="module") def schedules(make_schedule, make_config, make_language): schedules = [] @@ -403,3 +431,111 @@ def workers(make_worker, make_user, make_config, make_language, deleted_user): workers.append(worker) return workers + + +@pytest.fixture(scope="module") +def make_task(make_event, make_schedule, make_config, worker, garbage_collector): + def _make_task( + schedule_name="schedule_name", + status=TaskStatus.succeeded, + ): + now = getnow() + if status == TaskStatus.requested: + events = [TaskStatus.requested] + elif status == TaskStatus.reserved: + events = [TaskStatus.requested, TaskStatus.reserved] + elif status == TaskStatus.started: + events = [TaskStatus.requested, TaskStatus.reserved, TaskStatus.started] + elif status == TaskStatus.succeeded: + events = [ + TaskStatus.requested, + TaskStatus.reserved, + TaskStatus.started, + TaskStatus.succeeded, + ] + else: + events = [ + TaskStatus.requested, + TaskStatus.reserved, + TaskStatus.started, + TaskStatus.failed, + ] + + timestamp = {event: now for event in events} + events = [make_event(event, timestamp[event]) for event in events] + container = { + "command": "mwoffliner --mwUrl=https://example.com", + "image": {"name": "mwoffliner", "tag": "1.8.0"}, + "exit_code": 0, + "stderr": "example_stderr", + "stdout": "example_stdout", + } + debug = {"args": [], "kwargs": {}} + + if status == TaskStatus.failed: + debug["exception"] = "example_exception" + debug["traceback"] = "example_traceback" + files = {} + else: + files = {"mwoffliner_1.zim": {"name": "mwoffliner_1.zim", "size": 1000}} + config = expanded_config(make_config()) + with Session.begin() as session: + worker_obj = dbm.Worker.get(session, worker["name"]) + schedule = dbm.Schedule.get(session, schedule_name, run_checks=False) + if schedule is None: + make_schedule(schedule_name) + schedule = dbm.Schedule.get(session, schedule_name) + task = dbm.Task( + updated_at=now, + events=events, + debug=debug, + status=status, + timestamp=timestamp, + requested_by="bob", + canceled_by=None, + container=container, + priority=1, + config=config, + notification={}, + files=files, + upload={}, + original_schedule_name=schedule_name, + ) + task.schedule_id = schedule.id + task.worker_id = worker_obj.id + session.add(task) + session.flush() + garbage_collector.add_task_id(task.id) + + return { + "_id": task.id, + "status": task.status, + "worker": worker_obj.name, + "schedule_name": schedule.name, + "timestamp": task.timestamp, + "events": task.events, + "container": task.container, + "debug": task.debug, + "files": task.files, + } + + yield _make_task + + +@pytest.fixture(scope="module") +def tasks(make_task): + tasks = [] + for i in range(5): + tasks += [ + make_task(status=TaskStatus.requested), + make_task(status=TaskStatus.reserved), + make_task(status=TaskStatus.started), + make_task(status=TaskStatus.succeeded), + make_task(status=TaskStatus.failed), + ] + return tasks + + +@pytest.fixture(scope="module") +def task(make_task): + return make_task(status=TaskStatus.succeeded) diff --git a/dispatcher/backend/src/tests/integration/routes/schedules/test_schedule.py b/dispatcher/backend/src/tests/integration/routes/schedules/test_schedule.py index f414a0e6..963e520d 100644 --- a/dispatcher/backend/src/tests/integration/routes/schedules/test_schedule.py +++ b/dispatcher/backend/src/tests/integration/routes/schedules/test_schedule.py @@ -353,6 +353,7 @@ def test_get_schedule_with_name(self, client, schedule): assert "most_recent_task" in response_json response_json.pop("duration", None) response_json.pop("most_recent_task", None) + schedule.pop("_id") assert response_json == schedule diff --git a/dispatcher/backend/src/tests/integration/routes/tasks/conftest.py b/dispatcher/backend/src/tests/integration/routes/tasks/conftest.py deleted file mode 100644 index 9abc1e67..00000000 --- a/dispatcher/backend/src/tests/integration/routes/tasks/conftest.py +++ /dev/null @@ -1,115 +0,0 @@ -import pytest - -import db.models as dbm -from common import getnow -from common.enum import TaskStatus -from db import Session -from utils.offliners import expanded_config - - -@pytest.fixture(scope="module") -def make_task(make_event, make_schedule, make_config, worker, garbage_collector): - def _make_task( - schedule_name="schedule_name", - status=TaskStatus.succeeded, - ): - now = getnow() - if status == TaskStatus.requested: - events = [TaskStatus.requested] - elif status == TaskStatus.reserved: - events = [TaskStatus.requested, TaskStatus.reserved] - elif status == TaskStatus.started: - events = [TaskStatus.requested, TaskStatus.reserved, TaskStatus.started] - elif status == TaskStatus.succeeded: - events = [ - TaskStatus.requested, - TaskStatus.reserved, - TaskStatus.started, - TaskStatus.succeeded, - ] - else: - events = [ - TaskStatus.requested, - TaskStatus.reserved, - TaskStatus.started, - TaskStatus.failed, - ] - - timestamp = {event: now for event in events} - events = [make_event(event, timestamp[event]) for event in events] - container = { - "command": "mwoffliner --mwUrl=https://example.com", - "image": {"name": "mwoffliner", "tag": "1.8.0"}, - "exit_code": 0, - "stderr": "example_stderr", - "stdout": "example_stdout", - } - debug = {"args": [], "kwargs": {}} - - if status == TaskStatus.failed: - debug["exception"] = "example_exception" - debug["traceback"] = "example_traceback" - files = {} - else: - files = {"mwoffliner_1.zim": {"name": "mwoffliner_1.zim", "size": 1000}} - config = expanded_config(make_config()) - with Session.begin() as session: - worker_obj = dbm.Worker.get(session, worker["name"]) - schedule = dbm.Schedule.get(session, schedule_name, run_checks=False) - if schedule is None: - make_schedule(schedule_name) - schedule = dbm.Schedule.get(session, schedule_name) - task = dbm.Task( - updated_at=now, - events=events, - debug=debug, - status=status, - timestamp=timestamp, - requested_by="bob", - canceled_by=None, - container=container, - priority=1, - config=config, - notification={}, - files=files, - upload={}, - original_schedule_name=schedule_name, - ) - task.schedule_id = schedule.id - task.worker_id = worker_obj.id - session.add(task) - session.flush() - garbage_collector.add_task_id(task.id) - - return { - "_id": task.id, - "status": task.status, - "worker": worker_obj.name, - "schedule_name": schedule.name, - "timestamp": task.timestamp, - "events": task.events, - "container": task.container, - "debug": task.debug, - "files": task.files, - } - - yield _make_task - - -@pytest.fixture(scope="module") -def tasks(make_task): - tasks = [] - for i in range(5): - tasks += [ - make_task(status=TaskStatus.requested), - make_task(status=TaskStatus.reserved), - make_task(status=TaskStatus.started), - make_task(status=TaskStatus.succeeded), - make_task(status=TaskStatus.failed), - ] - return tasks - - -@pytest.fixture(scope="module") -def task(make_task): - return make_task(status=TaskStatus.succeeded) diff --git a/dispatcher/backend/src/utils/scheduling.py b/dispatcher/backend/src/utils/scheduling.py index d22a43cb..cf878ea7 100644 --- a/dispatcher/backend/src/utils/scheduling.py +++ b/dispatcher/backend/src/utils/scheduling.py @@ -220,6 +220,13 @@ def request_tasks_using_schedule(session: so.Session): > period_start ): continue + # don't request a task if the most_recent_task is still running + if last_run.status not in TaskStatus.complete(): + logger.debug( + f"{schedule.name} not requested because most_recent_task " + f"{last_run.id} did not complete" + ) + continue if request_a_schedule( session=session,