From 03a8cc6a8a1e090c80fd472c426970250c86124b Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 4 Sep 2024 00:08:59 +0200 Subject: [PATCH] PR #596 finetune tests for cancel feature of job manager Call public `run_jobs` instead of internal `_track_statuses`. Use a fake backend that interacts with standard requests, instead of mocking internals. Parameterize `cancel_after_seconds` instead of separate test functions refs #590, #596 --- openeo/extra/job_management.py | 1 + setup.py | 2 +- tests/extra/test_job_management.py | 153 ++++++++++++++++------------- 3 files changed, 84 insertions(+), 72 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index acdf90827..c49af6165 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -345,6 +345,7 @@ def run_jobs( while ( df[ + # TODO: risk on infinite loop if a backend reports a (non-standard) terminal status that is not covered here (df.status != "finished") & (df.status != "skipped") & (df.status != "start_failed") diff --git a/setup.py b/setup.py index f1ddaf2ac..f4d7c2f7b 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ "matplotlib", "geopandas", "flake8>=5.0.0", - "time_machine", + "time_machine>=2.13.0", "pyproj>=3.2.0", # Pyproj is an optional, best-effort runtime dependency "dirty_equals>=0.6.0", # (#578) On Python 3.7: avoid dirty_equals 0.7.1 which wrongly claims to be Python 3.7 compatible diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index d2266fcc1..07edafe92 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,10 +1,13 @@ import datetime import json +import re import textwrap import threading import time +from typing import Callable, Union from unittest import mock +import dirty_equals import geopandas # TODO: can we avoid using httpretty? @@ -22,6 +25,7 @@ import time_machine import openeo +import openeo.extra.job_management from openeo import BatchJob from openeo.extra.job_management import ( MAX_RETRIES, @@ -33,8 +37,47 @@ from openeo.util import rfc3339 +class FakeBackend: + """ + Fake openEO backend with some basic job management functionality for testing job manager logic. + """ + def __init__(self, *, backend_root_url: str = "http://openeo.test", requests_mock): + self.url = backend_root_url.rstrip("/") + requests_mock.get(f"{self.url}/", json={"api_version": "1.1.0"}) + self.job_db = {} + self.get_job_metadata_mock = requests_mock.get( + re.compile(rf"^{self.url}/jobs/[\w_-]*$"), + json=self._handle_get_job_metadata, + ) + self.cancel_job_mock = requests_mock.delete( + re.compile(rf"^{self.url}/jobs/[\w_-]*/results$"), + json=self._handle_cancel_job, + ) + requests_mock.get(re.compile(rf"^{self.url}/jobs/[\w_-]*/results"), json={"links": []}) + + def set_job_status(self, job_id: str, status: Union[str, Callable[[], str]]): + self.job_db.setdefault(job_id, {})["status"] = status + + def get_job_status(self, job_id: str): + status = self.job_db[job_id]["status"] + if callable(status): + status = status() + return status + + def _handle_get_job_metadata(self, request, context): + job_id = request.path.split("/")[-1] + return {"id": job_id, "status": self.get_job_status(job_id)} + + def _handle_cancel_job(self, request, context): + job_id = request.path.split("/")[-2] + assert self.get_job_status(job_id) == "running" + self.set_job_status(job_id, "canceled") + context.status_code = 204 + + class TestMultiBackendJobManager: + @pytest.fixture def sleep_mock(self): with mock.patch("time.sleep") as sleep: @@ -448,81 +491,49 @@ def start_job(row, connection_provider, connection, **kwargs): assert set(result.status) == {"running"} assert set(result.backend_name) == {"foo"} - def test_cancel_prolonged_job_exceeds_duration(self): - # Set up a sample DataFrame with job data - df = pd.DataFrame( - { - "id": ["job_1"], - "backend_name": ["foo"], - "status": ["running"], - "running_start_time": ["2020-01-01T00:00:00Z"], - } - ) - - # Initialize the manager with the cancel_running_job_after parameter - cancel_after_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) - - # Mock the connection and job retrieval - mock_connection = mock.MagicMock() - mock_job = mock.MagicMock() - mock_job.describe.return_value = {"status": "running"} - manager._get_connection = mock.MagicMock(return_value=mock_connection) - mock_connection.job.return_value = mock_job - - # Set up the running start time and future time - job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") - future_time = ( - job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) + datetime.timedelta(seconds=1) - ) - - # Replace _cancel_prolonged_job with a mock to track its calls - manager._cancel_prolonged_job = mock.MagicMock() - - # Travel to the future where the job has exceeded its allowed running time - with time_machine.travel(future_time, tick=False): - manager._track_statuses(df) - - # Verify that the _cancel_prolonged_job method was called with the correct job and row - manager._cancel_prolonged_job.assert_called_once - - def test_cancel_prolonged_job_within_duration(self): - # Set up a sample DataFrame with job data - df = pd.DataFrame( - { - "id": ["job_1"], - "backend_name": ["foo"], - "status": ["running"], - "running_start_time": ["2020-01-01T00:00:00Z"], - } - ) - - # Initialize the manager with the cancel_running_job_after parameter - cancel_after_seconds = 12 * 60 * 60 # 12 hours - manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds) - - # Mock the connection and job retrieval - mock_connection = mock.MagicMock() - mock_job = mock.MagicMock() - mock_job.describe.return_value = {"status": "running"} - manager._get_connection = mock.MagicMock(return_value=mock_connection) - mock_connection.job.return_value = mock_job - - # Set up the running start time and future time - job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ") - future_time = ( - job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) - datetime.timedelta(seconds=1) + @pytest.mark.parametrize( + ["start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"], + [ + ("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 6 * 60 * 60, "canceled"), + ("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 12 * 60 * 60, "finished"), + ], + ) + def test_automatic_cancel_of_too_long_running_jobs( + self, + requests_mock, + tmp_path, + time_machine, + start_time, + end_time, + end_status, + cancel_after_seconds, + expected_status, + ): + fake_backend = FakeBackend(requests_mock=requests_mock) + + # For simplicity, set up pre-existing job with status "running" (instead of job manager creating+starting it) + job_id = "job-123" + fake_backend.set_job_status(job_id, lambda: "running" if rfc3339.utcnow() < end_time else end_status) + + manager = MultiBackendJobManager(root_dir=tmp_path, cancel_running_job_after=cancel_after_seconds) + manager.add_backend("foo", connection=openeo.connect(fake_backend.url)) + + # Initialize data frame with status "created" (to make sure the start of "running" state is recorded) + df = pd.DataFrame({"id": [job_id], "backend_name": ["foo"], "status": ["created"]}) + + time_machine.move_to(start_time) + # Mock sleep() to not actually sleep, but skip one hour at a time + with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)): + manager.run_jobs(df=df, start_job=lambda **kwargs: None, job_db=tmp_path / "jobs.csv") + + final_df = CsvJobDatabase(tmp_path / "jobs.csv").read() + assert final_df.iloc[0].to_dict() == dirty_equals.IsPartialDict( + id="job-123", status=expected_status, running_start_time="2024-09-01T10:00:00Z" ) - # Replace _cancel_prolonged_job with a mock to track its calls - manager._cancel_prolonged_job = mock.MagicMock() + assert fake_backend.cancel_job_mock.called == (expected_status == "canceled") - # Travel to the future where the job has exceeded its allowed running time - with time_machine.travel(future_time, tick=False): - manager._track_statuses(df) - # Verify that the _cancel_prolonged_job method was called with the correct job and row - manager._cancel_prolonged_job.assert_not_called JOB_DB_DF_BASICS = pd.DataFrame(