From edc332118a48f0b1da3d53992b50cb15c054dcdd Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 11:31:15 -0800 Subject: [PATCH 01/14] update get_finished_jobs to use last_status --- .../workflow_automation/watch_nmdc.py | 8 +- tests/conftest.py | 6 +- ..._state.json => agent_state_1_failure.json} | 4 +- ...job_state.json => failed_job_state_2.json} | 0 tests/test_watch_nmdc.py | 97 +++++++++++-------- 5 files changed, 65 insertions(+), 50 deletions(-) rename tests/fixtures/{initial_state.json => agent_state_1_failure.json} (99%) rename tests/fixtures/{failed_job_state.json => failed_job_state_2.json} (100%) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 439479bd..ce5bc06e 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -189,16 +189,16 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: failed_jobs = [] for job in self.job_cache: if not job.done: - status = job.job_status - if status == "Succeeded" and job.opid: + last_status = job.workflow.last_status + if last_status == "Succeeded" and job.opid: successful_jobs.append(job) - elif status == "Failed" and job.opid: + elif last_status == "Failed" and job.opid: failed_jobs.append(job) if successful_jobs: logger.info(f"Found {len(successful_jobs)} successful jobs.") if failed_jobs: logger.info(f"Found {len(failed_jobs)} failed jobs.") - return (successful_jobs, failed_jobs) + return successful_jobs, failed_jobs def process_successful_job(self, job: WorkflowJob) -> Database: """ Process a successful job and return a Database object """ diff --git a/tests/conftest.py b/tests/conftest.py index b14164d5..8a0187c4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -123,10 +123,10 @@ def site_config(site_config_file): return SiteConfig(site_config_file) @fixture -def initial_state_file(fixtures_dir, tmp_path): - state_file = fixtures_dir / "initial_state.json" +def initial_state_file_1_failure(fixtures_dir, tmp_path): + state_file = fixtures_dir / "agent_state_1_failure.json" # make a working copy in tmp_path - copied_state_file = tmp_path / "initial_state.json" + copied_state_file = tmp_path / "agent_state_1_failure.json" shutil.copy(state_file, copied_state_file) return copied_state_file diff --git a/tests/fixtures/initial_state.json b/tests/fixtures/agent_state_1_failure.json similarity index 99% rename from tests/fixtures/initial_state.json rename to tests/fixtures/agent_state_1_failure.json index aa7dfc62..a56e1cba 100644 --- a/tests/fixtures/initial_state.json +++ b/tests/fixtures/agent_state_1_failure.json @@ -1,12 +1,12 @@ { "jobs": [ { - "type": "MAGs: v1.3.10", + "type": "MAGs: v1.3.12", "cromwell_jobid": "9492a397-eb30-472b-9d3b-abc123456789", "nmdc_jobid": "nmdc:66cf64b6-7462-11ef-8b84-abc123456789", "conf": { "git_repo": "https://github.com/microbiomedata/metaMAGs", - "release": "v1.3.10", + "release": "v1.3.12", "wdl": "mbin_nmdc.wdl", "activity_id": "nmdc:wfmag-11-g7msr323.1", "activity_set": "mags_activity_set", diff --git a/tests/fixtures/failed_job_state.json b/tests/fixtures/failed_job_state_2.json similarity index 100% rename from tests/fixtures/failed_job_state.json rename to tests/fixtures/failed_job_state_2.json diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index 902bb2ee..438ba2ac 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -21,10 +21,10 @@ # FileHandler init tests -def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_state_file(site_config, initial_state_file_1_failure, tmp_path): copy_state_file = tmp_path / "copy_state.json" - shutil.copy(initial_state_file, copy_state_file) - fh = FileHandler(site_config, initial_state_file) + shutil.copy(initial_state_file_1_failure, copy_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) assert fh assert fh.state_file assert isinstance(fh.state_file, PosixPath) @@ -35,7 +35,7 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert not fh.state_file # test setter - fh.state_file = initial_state_file + fh.state_file = initial_state_file_1_failure assert fh.state_file assert fh.state_file.exists() assert fh.state_file.is_file() @@ -48,9 +48,9 @@ def test_file_handler_init_from_state_file(site_config, initial_state_file, tmp_ assert fh.state_file.is_file() -def test_file_handler_init_from_config_agent_state(site_config, initial_state_file, tmp_path): +def test_file_handler_init_from_config_agent_state(site_config, initial_state_file_1_failure, tmp_path): with patch("nmdc_automation.config.siteconfig.SiteConfig.agent_state", new_callable=PropertyMock) as mock_agent_state: - mock_agent_state.return_value = initial_state_file + mock_agent_state.return_value = initial_state_file_1_failure fh = FileHandler(site_config) assert fh assert fh.state_file @@ -76,8 +76,8 @@ def test_file_handler_init_default_state(site_config): assert fh2.state_file.exists() -def test_file_handler_read_state(site_config, initial_state_file): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_read_state(site_config, initial_state_file_1_failure): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state assert isinstance(state, dict) @@ -86,8 +86,8 @@ def test_file_handler_read_state(site_config, initial_state_file): assert len(state.get("jobs")) == 1 -def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir): - fh = FileHandler(site_config, initial_state_file) +def test_file_handler_write_state(site_config, initial_state_file_1_failure, fixtures_dir): + fh = FileHandler(site_config, initial_state_file_1_failure) state = fh.read_state() assert state # add new job @@ -106,7 +106,7 @@ def test_file_handler_write_state(site_config, initial_state_file, fixtures_dir) fh.write_state(state) -def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_dir): +def test_file_handler_get_output_path(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -116,7 +116,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ expected_output_path = site_config.data_dir / Path(was_informed_by) / Path(workflow_execution_id) - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act output_path = fh.get_output_path(mock_job) @@ -127,7 +127,7 @@ def test_file_handler_get_output_path(site_config, initial_state_file, fixtures_ assert output_path == expected_output_path -def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file, fixtures_dir, tmp_path): +def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_file_1_failure, fixtures_dir, tmp_path): # Arrange was_informed_by = "nmdc:1234" workflow_execution_id = "nmdc:56789" @@ -141,7 +141,7 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # patch config.data_dir with patch("nmdc_automation.config.siteconfig.SiteConfig.data_dir", new_callable=PropertyMock) as mock_data_dir: mock_data_dir.return_value = tmp_path - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) # Act metadata_path = fh.write_metadata_if_not_exists(mock_job) @@ -153,18 +153,18 @@ def test_file_handler_write_metadata_if_not_exists(site_config, initial_state_fi # JobManager tests -def test_job_manager_init(site_config, initial_state_file): +def test_job_manager_init(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) assert jm assert jm.file_handler assert jm.file_handler.state_file -def test_job_manager_restore_from_state(site_config, initial_state_file): +def test_job_manager_restore_from_state(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh, init_cache=False) # Act jm.restore_from_state() @@ -174,10 +174,14 @@ def test_job_manager_restore_from_state(site_config, initial_state_file): assert len(jm.job_cache) == 1 assert isinstance(jm.job_cache[0], WorkflowJob) + # job has been cached - get new workflow jobs from state should not return any + new_jobs = jm.get_new_workflow_jobs_from_state() + assert not new_jobs -def test_job_manager_job_checkpoint(site_config, initial_state_file): + +def test_job_manager_job_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act data = jm.job_checkpoint() @@ -189,9 +193,9 @@ def test_job_manager_job_checkpoint(site_config, initial_state_file): assert len(data.get("jobs")) == 1 -def test_job_manager_save_checkpoint(site_config, initial_state_file): +def test_job_manager_save_checkpoint(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act jm.save_checkpoint() @@ -202,9 +206,9 @@ def test_job_manager_save_checkpoint(site_config, initial_state_file): # cleanup fh.state_file.unlink() -def test_job_manager_find_job_by_opid(site_config, initial_state_file): +def test_job_manager_find_job_by_opid(site_config, initial_state_file_1_failure): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Act job = jm.find_job_by_opid("nmdc:test-opid") @@ -215,9 +219,9 @@ def test_job_manager_find_job_by_opid(site_config, initial_state_file): assert not job.done -def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "new_state_job.json")) assert new_job_state @@ -234,9 +238,9 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file, jm.job_cache = [] -def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file, fixtures_dir): +def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) #already has an opid new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) @@ -261,12 +265,10 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_ assert job2.opid == opid - - -def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir): +def test_job_manager_get_finished_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - initial state has 1 failure and is not done - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) # Add a finished job: finished job is not done, but has a last_status of Succeeded @@ -278,7 +280,7 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures assert len(jm.job_cache) == 2 # add a failed job - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) assert failed_job.job_status == "Failed" @@ -308,16 +310,14 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures jm.job_cache = [] -def test_job_manager_process_successful_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_process_successful_job(site_config, initial_state_file_1_failure, fixtures_dir): # mock job.job.get_job_metadata - use fixture cromwell/succeded_metadata.json job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) with patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.get_job_metadata") as mock_get_metadata: mock_get_metadata.return_value = job_metadata - - # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) assert new_job_state @@ -334,11 +334,25 @@ def test_job_manager_process_successful_job(site_config, initial_state_file, fix jm.job_cache = [] -def test_job_manager_process_failed_job(site_config, initial_state_file, fixtures_dir): +def test_job_manager_process_failed_job_1_failure(site_config, initial_state_file_1_failure, fixtures_dir): + # Arrange + fh = FileHandler(site_config, initial_state_file_1_failure) + jm = JobManager(site_config, fh) + # job handler should initialize the job_cache from the state file by default + assert jm.job_cache + assert isinstance(jm.job_cache, list) + assert len(jm.job_cache) == 1 + + successful_jobs, failed_jobs = jm.get_finished_jobs() + assert not successful_jobs + assert failed_jobs + + +def test_job_manager_process_failed_job_2_failures(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange - fh = FileHandler(site_config, initial_state_file) + fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) - failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json")) + failed_job_state = json.load(open(fixtures_dir / "failed_job_state_2.json")) assert failed_job_state failed_job = WorkflowJob(site_config, failed_job_state) jm.job_cache.append(failed_job) @@ -346,6 +360,7 @@ def test_job_manager_process_failed_job(site_config, initial_state_file, fixture jm.process_failed_job(failed_job) # Assert assert failed_job.done + assert failed_job.job_status == "Failed" @fixture @@ -376,7 +391,7 @@ def test_claim_jobs(mock_submit, site_config_file, site_config, fixtures_dir): assert unclaimed_wfj.job_status -def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file, fixtures_dir): +def test_runtime_manager_get_unclaimed_jobs(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange rt = RuntimeApiHandler(site_config) # Act From 75128c21b5c24bc1b9f44d6de3b78201ff97355e Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 12:22:28 -0800 Subject: [PATCH 02/14] update submit_job to look at last status, and update no submit states --- nmdc_automation/workflow_automation/wfutils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 4f4d5bcc..6a01bd7c 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -70,11 +70,11 @@ class CromwellRunner(JobRunnerABC): """Job runner for Cromwell""" LABEL_SUBMITTER_VALUE = "nmdcda" LABEL_PARAMETERS = ["release", "wdl", "git_repo"] - NO_SUBMIT_STATES = ["Submitted", # job is already submitted but not running + # States that indicate a job is in some active state and does not need to be submitted + NO_SUBMIT_STATES = [ + "Submitted", # job is already submitted but not running "Running", # job is already running - "Failed", # job failed "Succeeded", # job succeeded - "Aborted", # job was aborted and did not finish "Aborting" # job is in the process of being aborted "On Hold", # job is on hold and not running. It can be manually resumed later ] @@ -152,7 +152,7 @@ def submit_job(self, force: bool = False) -> Optional[str]: :param force: if True, submit the job even if it is in a state that does not require submission :return: the job id """ - status = self.get_job_status() + status = self.workflow.last_status if status in self.NO_SUBMIT_STATES and not force: logging.info(f"Job {self.job_id} in state {status}, skipping submission") return From e32caba11d76acdb7aeec23dbf21d5744e4caf2d Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 12:22:54 -0800 Subject: [PATCH 03/14] update process failed job to return optional jobid --- nmdc_automation/workflow_automation/watch_nmdc.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index ce5bc06e..3046340e 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -242,7 +242,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database: self.save_checkpoint() return database - def process_failed_job(self, job: WorkflowJob) -> None: + def process_failed_job(self, job: WorkflowJob) -> Optional[str]: """ Process a failed job """ if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS: logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.") @@ -253,7 +253,8 @@ def process_failed_job(self, job: WorkflowJob) -> None: job.workflow.state["last_status"] = job.job_status self.save_checkpoint() logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") - job.job.submit_job() + jobid = job.job.submit_job() + return jobid class RuntimeApiHandler: @@ -321,7 +322,8 @@ def cycle(self): logger.info(f"Checking for finished jobs.") successful_jobs, failed_jobs = self.job_manager.get_finished_jobs() - logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.") + if not successful_jobs and not failed_jobs: + logger.info("No finished jobs found.") for job in successful_jobs: job_database = self.job_manager.process_successful_job(job) # sanity checks From 78b3f5e33a428f176834a3dc6c4e49984ef5debc Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 12:23:09 -0800 Subject: [PATCH 04/14] update unit tests --- tests/test_watch_nmdc.py | 42 ++++++++++++++++++++++++++++++++-------- tests/test_wfutils.py | 3 ++- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index 438ba2ac..34f36261 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -334,18 +334,44 @@ def test_job_manager_process_successful_job(site_config, initial_state_file_1_fa jm.job_cache = [] -def test_job_manager_process_failed_job_1_failure(site_config, initial_state_file_1_failure, fixtures_dir): +def test_job_manager_get_finished_jobs_1_failure(site_config, initial_state_file_1_failure, fixtures_dir): # Arrange + with requests_mock.Mocker() as mocker: + # Mock the GET request for the workflow status + mocker.get( + "http://localhost:8088/api/workflows/v1/9492a397-eb30-472b-9d3b-abc123456789/status", + json={"status": "Failed"} # Mocked response body + ) + fh = FileHandler(site_config, initial_state_file_1_failure) + jm = JobManager(site_config, fh) + # job handler should initialize the job_cache from the state file by default + assert jm.job_cache + assert isinstance(jm.job_cache, list) + assert len(jm.job_cache) == 1 + + successful_jobs, failed_jobs = jm.get_finished_jobs() + assert not successful_jobs + assert failed_jobs + failed_job = failed_jobs[0] + assert failed_job.job_status == "Failed" + +@mock.patch("nmdc_automation.workflow_automation.wfutils.CromwellRunner.generate_submission_files") +def test_job_manager_process_failed_job_1_failure( + mock_generate_submission_files, site_config, initial_state_file_1_failure, mock_cromwell_api): + # Arrange + mock_generate_submission_files.return_value = { + "workflowSource": "workflowSource", + "workflowDependencies": "workflowDependencies", + "workflowInputs": "workflowInputs", + "labels": "labels" + } fh = FileHandler(site_config, initial_state_file_1_failure) jm = JobManager(site_config, fh) - # job handler should initialize the job_cache from the state file by default - assert jm.job_cache - assert isinstance(jm.job_cache, list) - assert len(jm.job_cache) == 1 + failed_job = jm.job_cache[0] + # Act + jobid = jm.process_failed_job(failed_job) + assert jobid - successful_jobs, failed_jobs = jm.get_finished_jobs() - assert not successful_jobs - assert failed_jobs def test_job_manager_process_failed_job_2_failures(site_config, initial_state_file_1_failure, fixtures_dir): diff --git a/tests/test_wfutils.py b/tests/test_wfutils.py index 37ed404a..ca9509a3 100644 --- a/tests/test_wfutils.py +++ b/tests/test_wfutils.py @@ -271,7 +271,8 @@ def test_cromwell_job_runner_submit_job_new_job(mock_generate_submission_files, wf_state_manager = WorkflowStateManager(wf_state) job_runner = CromwellRunner(site_config, wf_state_manager) - job_runner.submit_job() + jobid = job_runner.submit_job() + assert jobid def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtures_dir, tmp_path): From 9201c534ab8e61a2f743e1ca241b013dd365d6b0 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 12:34:35 -0800 Subject: [PATCH 05/14] update test coverage to omit deprecated re_iding code --- .coveragerc | 11 +++++++++++ badges/coverage.svg | 8 ++++---- badges/tests.svg | 8 ++++---- pytest.xml | 2 +- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/.coveragerc b/.coveragerc index 398ff08a..d38fcc4a 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,13 @@ [run] branch = True +omit = + */site-packages/* + */distutils/* + */tests/* + */test/* + */__init__.py + */__main__.py + */setup.py + */re_iding/* + */examples/* + */nmdc_common/* diff --git a/badges/coverage.svg b/badges/coverage.svg index 24c797f7..07ecfe02 100644 --- a/badges/coverage.svg +++ b/badges/coverage.svg @@ -5,7 +5,7 @@ width="92.5" height="20" role="img" - aria-label="coverage: 63%" + aria-label="coverage: 67%" > - coverage: 63% + coverage: 67% @@ -42,8 +42,8 @@ - 63% - 63% + 67% + 67% diff --git a/badges/tests.svg b/badges/tests.svg index 1227c608..b85ca4de 100644 --- a/badges/tests.svg +++ b/badges/tests.svg @@ -5,7 +5,7 @@ width="62.5" height="20" role="img" - aria-label="tests: 26" + aria-label="tests: 95" > - tests: 26 + tests: 95 @@ -42,8 +42,8 @@ - 26 - 26 + 95 + 95 diff --git a/pytest.xml b/pytest.xml index d4d6c35d..aa997e1b 100644 --- a/pytest.xml +++ b/pytest.xml @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file From 61f1896228ad094637d7efe7948a23a224757bc7 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 12:51:54 -0800 Subject: [PATCH 06/14] more logging --- nmdc_automation/workflow_automation/watch_nmdc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 3046340e..bd2fa2e9 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -325,6 +325,7 @@ def cycle(self): if not successful_jobs and not failed_jobs: logger.info("No finished jobs found.") for job in successful_jobs: + logger.info(f"Processing successful job: {job.opid}, {job.workflow_execution_id}") job_database = self.job_manager.process_successful_job(job) # sanity checks if not job_database.data_object_set: @@ -354,6 +355,7 @@ def cycle(self): logging.info(f"Updated operation {job.opid} response id: {resp}") for job in failed_jobs: + logger.info(f"Processing failed job: {job.opid}, {job.workflow_execution_id}") self.job_manager.process_failed_job(job) def watch(self): From e5bf09b651b70991b390d4dd8c9e870c58b91d8b Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 14:52:33 -0800 Subject: [PATCH 07/14] fix logic in get_finished_jobs --- .../workflow_automation/watch_nmdc.py | 32 +++++++++++++++---- .../workflow_automation/wfutils.py | 12 +++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index bd2fa2e9..644042e6 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -189,11 +189,29 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: failed_jobs = [] for job in self.job_cache: if not job.done: - last_status = job.workflow.last_status - if last_status == "Succeeded" and job.opid: + if job.workflow.last_status == "Succeeded" and job.opid: + job.done = True successful_jobs.append(job) - elif last_status == "Failed" and job.opid: + continue + if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS: + job.done = True failed_jobs.append(job) + continue + # check status + status = job.job.get_job_status() + if status == "Succeded": + job.workflow.last_status = status + successful_jobs.append(job) + continue + elif status == "Failed": + job.workflow.last_status = status + job.workflow.failed_count += 1 + failed_jobs.append(job) + continue + else: + job.workflow.last_status = status + self.save_checkpoint() + if successful_jobs: logger.info(f"Found {len(successful_jobs)} successful jobs.") if failed_jobs: @@ -314,10 +332,10 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None: def cycle(self): """ Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """ self.restore_from_checkpoint() - if not self.should_skip_claim: - unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows) - logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") - self.claim_jobs(unclaimed_jobs) + # if not self.should_skip_claim: - is this actually used? + unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows) + logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") + self.claim_jobs(unclaimed_jobs) logger.info(f"Checking for finished jobs.") diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 6a01bd7c..c63b1847 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -270,6 +270,18 @@ def config(self) -> Dict[str, Any]: def last_status(self) -> Optional[str]: return self.cached_state.get("last_status", None) + @last_status.setter + def last_status(self, status: str): + self.cached_state["last_status"] = status + + @property + def failed_count(self) -> int: + return self.cached_state.get("failed_count", 0) + + @failed_count.setter + def failed_count(self, count: int): + self.cached_state["failed_count"] = count + @property def nmdc_jobid(self) -> Optional[str]: return self.cached_state.get("nmdc_jobid", None) From 4981e9b007bc5bcb8cbb2ed4cf8883a7673ced45 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 14:56:23 -0800 Subject: [PATCH 08/14] fix get_finished_jobs logic --- nmdc_automation/workflow_automation/watch_nmdc.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 644042e6..1b22f6c9 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -190,7 +190,6 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: for job in self.job_cache: if not job.done: if job.workflow.last_status == "Succeeded" and job.opid: - job.done = True successful_jobs.append(job) continue if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS: From fac4dcf22bb163bedda91533dc00c69fad4b77e5 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 15:00:03 -0800 Subject: [PATCH 09/14] docstring for get_finished_jobs --- nmdc_automation/workflow_automation/watch_nmdc.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 1b22f6c9..a71472e6 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -184,7 +184,15 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False return new_job def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: - """ Get finished jobs """ + """ + Get finished jobs + Returns a tuple of successful jobs and failed jobs + Jobs are considered finished if they have a last status of "Succeeded" or "Failed" + or if they have reached the maximum number of failures + + Unfinished jobs are checked for status and updated if needed. + A checkpoint is saved after checking for finished jobs. + """ successful_jobs = [] failed_jobs = [] for job in self.job_cache: @@ -193,7 +201,6 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: successful_jobs.append(job) continue if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS: - job.done = True failed_jobs.append(job) continue # check status From a6ec7e0b5b6bd5a6430906b5a588d4945997c91e Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 26 Nov 2024 16:42:48 -0800 Subject: [PATCH 10/14] add more debug logging --- nmdc_automation/workflow_automation/watch_nmdc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index a71472e6..8273b264 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -205,6 +205,7 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: continue # check status status = job.job.get_job_status() + if status == "Succeded": job.workflow.last_status = status successful_jobs.append(job) @@ -216,6 +217,7 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]: continue else: job.workflow.last_status = status + logger.debug(f"Job {job.opid} status: {status}") self.save_checkpoint() if successful_jobs: From f5657a4b4642afae7f5979dd11ae22a39bad8764 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 2 Dec 2024 09:56:03 -0800 Subject: [PATCH 11/14] update polling interval to 60 seconds and make logging less verbose --- nmdc_automation/workflow_automation/watch_nmdc.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 8273b264..c97837a8 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -319,7 +319,7 @@ def update_operation(self, opid, done, meta): class Watcher: """ Watcher class for monitoring and managing jobs """ def __init__(self, site_configuration_file: Union[str, Path], state_file: Union[str, Path] = None): - self._POLL = 20 + self._POLL = 60 self._MAX_FAILS = 2 self.should_skip_claim = False self.config = SiteConfig(site_configuration_file) @@ -342,14 +342,15 @@ def cycle(self): self.restore_from_checkpoint() # if not self.should_skip_claim: - is this actually used? unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows) - logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") + if unclaimed_jobs: + logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") self.claim_jobs(unclaimed_jobs) - logger.info(f"Checking for finished jobs.") + logger.debug(f"Checking for finished jobs.") successful_jobs, failed_jobs = self.job_manager.get_finished_jobs() if not successful_jobs and not failed_jobs: - logger.info("No finished jobs found.") + logger.debug("No finished jobs found.") for job in successful_jobs: logger.info(f"Processing successful job: {job.opid}, {job.workflow_execution_id}") job_database = self.job_manager.process_successful_job(job) @@ -389,6 +390,7 @@ def watch(self): logger.info("Entering polling loop") while True: try: + print(".") self.cycle() except (IOError, ValueError, TypeError, AttributeError) as e: logger.exception(f"Error occurred during cycle: {e}", exc_info=True) From 7e0780c92df7f526fa2f1feeb9831c3565b9f5ba Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 2 Dec 2024 12:18:16 -0800 Subject: [PATCH 12/14] reduce verbosity of log message --- nmdc_automation/workflow_automation/watch_nmdc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index c97837a8..51d5b3da 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -379,7 +379,7 @@ def cycle(self): resp = self.runtime_api_handler.update_operation( job.opid, done=True, meta=job.job.metadata ) - logging.info(f"Updated operation {job.opid} response id: {resp}") + logging.info(f"Updated operation {job.opid} response id: {resp['id']}") for job in failed_jobs: logger.info(f"Processing failed job: {job.opid}, {job.workflow_execution_id}") From a48eb0551a4da5e1b864efb2ba13384832cfd59b Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 2 Dec 2024 13:08:26 -0800 Subject: [PATCH 13/14] update logging --- nmdc_automation/workflow_automation/watch_nmdc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 51d5b3da..68bfd3a3 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -278,7 +278,7 @@ def process_failed_job(self, job: WorkflowJob) -> Optional[str]: job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1 job.workflow.state["last_status"] = job.job_status self.save_checkpoint() - logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") + logger.info(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") jobid = job.job.submit_job() return jobid From f7cb51de8fac674401bef8422be322b174c3052c Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Tue, 3 Dec 2024 11:20:49 -0800 Subject: [PATCH 14/14] Address review comments --- nmdc_automation/workflow_automation/watch_nmdc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 68bfd3a3..f4469801 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -278,7 +278,7 @@ def process_failed_job(self, job: WorkflowJob) -> Optional[str]: job.workflow.state["failed_count"] = job.workflow.state.get("failed_count", 0) + 1 job.workflow.state["last_status"] = job.job_status self.save_checkpoint() - logger.info(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") + logger.warning(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.") jobid = job.job.submit_job() return jobid @@ -319,7 +319,7 @@ def update_operation(self, opid, done, meta): class Watcher: """ Watcher class for monitoring and managing jobs """ def __init__(self, site_configuration_file: Union[str, Path], state_file: Union[str, Path] = None): - self._POLL = 60 + self._POLL_INTERVAL_SEC = 60 self._MAX_FAILS = 2 self.should_skip_claim = False self.config = SiteConfig(site_configuration_file) @@ -394,7 +394,7 @@ def watch(self): self.cycle() except (IOError, ValueError, TypeError, AttributeError) as e: logger.exception(f"Error occurred during cycle: {e}", exc_info=True) - sleep(self._POLL) + sleep(self._POLL_INTERVAL_SEC) def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None: """ Claim unclaimed jobs, prepare them, and submit them. Write a checkpoint after claiming jobs. """