From c7797a4b6842ec8ef223575b18c1923f66d16eff Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 25 Nov 2024 14:49:15 -0800 Subject: [PATCH 1/4] fix mis-handled claim_jobs --- nmdc_automation/workflow_automation/watch_nmdc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 0522b4c9..5a76d7eb 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -367,7 +367,7 @@ def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None: for job in unclaimed_jobs: logger.info(f"Claiming job {job.workflow.nmdc_jobid}") claim = self.runtime_api_handler.claim_job(job.workflow.nmdc_jobid) - opid = claim["detail"]["id"] + opid = claim["id"] new_job = self.job_manager.prepare_and_cache_new_job(job, opid) if new_job: new_job.job.submit_job() From 93a445654d427bcd9830a2661e696f66652299af Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 25 Nov 2024 19:04:01 -0800 Subject: [PATCH 2/4] add 404 error handling to get_job_status --- nmdc_automation/workflow_automation/watch_nmdc.py | 3 +++ nmdc_automation/workflow_automation/wfutils.py | 12 ++++++++++++ 2 files changed, 15 insertions(+) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index 5a76d7eb..b11308cc 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -319,7 +319,10 @@ def cycle(self): logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.") self.claim_jobs(unclaimed_jobs) + + logger.info(f"Checking for finished jobs.") successful_jobs, failed_jobs = self.job_manager.get_finished_jobs() + logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.") for job in successful_jobs: job_database = self.job_manager.process_successful_job(job) # sanity checks diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 3f609a90..56ed0786 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -189,6 +189,18 @@ def get_job_status(self) -> str: if not self.job_id: return "Unknown" status_url = f"{self.service_url}/{self.job_id}/status" + # There can be a delay between submitting a job and it + # being available in Cromwell so handle 404 errors + try: + response = requests.get(status_url) + response.raise_for_status() + return response.json().get("status", "Unknown") + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + return "Unknown" + raise e + + response = requests.get(status_url) response.raise_for_status() return response.json().get("status", "Unknown") From a0618215bbfbcbab8b4919faf08b9b1d87d3e718 Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 25 Nov 2024 19:55:39 -0800 Subject: [PATCH 3/4] update check status --- nmdc_automation/run_process/run_workflows.py | 2 +- .../workflow_automation/watch_nmdc.py | 9 ++++---- .../workflow_automation/wfutils.py | 22 +++++++++++++------ 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/nmdc_automation/run_process/run_workflows.py b/nmdc_automation/run_process/run_workflows.py index e0e771d0..bed06251 100644 --- a/nmdc_automation/run_process/run_workflows.py +++ b/nmdc_automation/run_process/run_workflows.py @@ -26,7 +26,7 @@ def watcher(ctx, site_configuration_file): level=logging_level, format="%(asctime)s %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) - logger.info(f"Config file: {site_configuration_file}") + logger.info(f"Initializing Watcher: config file: {site_configuration_file}") ctx.obj = Watcher(site_configuration_file) diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index b11308cc..439479bd 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -35,7 +35,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None): self._state_file = None # set state file if state_file: - logger.info(f"Using state file: {state_file}") + logger.info(f"Initializing FileHandler with state file: {state_file}") self._state_file = Path(state_file) elif self.config.agent_state: logger.info(f"Using state file from config: {self.config.agent_state}") @@ -64,7 +64,6 @@ def state_file(self, value) -> None: def read_state(self) -> Optional[Dict[str, Any]]: """ Read the state file and return the data """ - logging.info(f"Reading state from {self.state_file}") with open(self.state_file, "r") as f: state = loads(f.read()) return state @@ -137,7 +136,7 @@ def restore_from_state(self) -> None: """ Restore jobs from state data """ new_jobs = self.get_new_workflow_jobs_from_state() if new_jobs: - logger.info(f"Restoring {len(new_jobs)} jobs from state.") + logger.info(f"Adding {len(new_jobs)} new jobs from state file.") self.job_cache.extend(new_jobs) def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]: @@ -151,10 +150,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]: # already in cache continue wf_job = WorkflowJob(self.config, workflow_state=job) - logger.debug(f"New workflow job: {wf_job.opid} from state.") + logger.info(f"New Job from State: {wf_job.workflow_execution_id}, {wf_job.workflow.nmdc_jobid}") + logger.info(f"Last Status: {wf_job.workflow.last_status}") job_cache_ids.append(wf_job.opid) wf_job_list.append(wf_job) - logging.info(f"Restored {len(wf_job_list)} jobs from state") return wf_job_list def find_job_by_opid(self, opid) -> Optional[WorkflowJob]: diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index 56ed0786..4f4d5bcc 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -186,11 +186,12 @@ def submit_job(self, force: bool = False) -> Optional[str]: def get_job_status(self) -> str: """ Get the status of a job from Cromwell """ - if not self.job_id: + if not self.workflow.cromwell_jobid: return "Unknown" - status_url = f"{self.service_url}/{self.job_id}/status" + status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status" # There can be a delay between submitting a job and it # being available in Cromwell so handle 404 errors + logging.debug(f"Getting job status from {status_url}") try: response = requests.get(status_url) response.raise_for_status() @@ -200,11 +201,6 @@ def get_job_status(self) -> str: return "Unknown" raise e - - response = requests.get(status_url) - response.raise_for_status() - return response.json().get("status", "Unknown") - def get_job_metadata(self) -> Dict[str, Any]: """ Get metadata for a job from Cromwell """ metadata_url = f"{self.service_url}/{self.job_id}/metadata" @@ -270,6 +266,18 @@ def config(self) -> Dict[str, Any]: # for backward compatibility we need to check for both keys return self.cached_state.get("conf", self.cached_state.get("config", {})) + @property + def last_status(self) -> Optional[str]: + return self.cached_state.get("last_status", None) + + @property + def nmdc_jobid(self) -> Optional[str]: + return self.cached_state.get("nmdc_jobid", None) + + @property + def cromwell_jobid(self) -> Optional[str]: + return self.cached_state.get("cromwell_jobid", None) + @property def execution_template(self) -> Dict[str, str]: # for backward compatibility we need to check for both keys From 13257a9217ad41c80d23d59ded835d45efe2db6c Mon Sep 17 00:00:00 2001 From: Michael Thornton Date: Mon, 25 Nov 2024 20:06:31 -0800 Subject: [PATCH 4/4] add mock kapi responses to unit test --- tests/test_watch_nmdc.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/tests/test_watch_nmdc.py b/tests/test_watch_nmdc.py index c4bd7106..902bb2ee 100644 --- a/tests/test_watch_nmdc.py +++ b/tests/test_watch_nmdc.py @@ -264,7 +264,6 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir): - # Mock the URL and response # Arrange - initial state has 1 failure and is not done fh = FileHandler(site_config, initial_state_file) @@ -287,11 +286,24 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures # sanity check assert len(jm.job_cache) == 3 - # Act - successful_jobs, failed_jobs = jm.get_finished_jobs() - # Assert - assert successful_jobs - assert failed_jobs + # Mock requests for job status + with requests_mock.Mocker() as m: + # Mock the successful job status + m.get( + "http://localhost:8088/api/workflows/v1/9492a397-eb30-472b-9d3b-abc123456789/status", + json={"status": "Succeeded"} + ) + # Mock the failed job status + m.get( + "http://localhost:8088/api/workflows/v1/12345678-abcd-efgh-ijkl-9876543210/status", + json={"status": "Failed"} + ) + + # Act + successful_jobs, failed_jobs = jm.get_finished_jobs() + # Assert + assert successful_jobs + assert failed_jobs # cleanup jm.job_cache = []