Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

307 watcher incorrect handling of failed job #308

Merged
merged 15 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
[run]
branch = True
omit =
*/site-packages/*
*/distutils/*
*/tests/*
*/test/*
*/__init__.py
*/__main__.py
*/setup.py
*/re_iding/*
*/examples/*
*/nmdc_common/*
8 changes: 4 additions & 4 deletions badges/coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 4 additions & 4 deletions badges/tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
54 changes: 42 additions & 12 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,47 @@ def prepare_and_cache_new_job(self, new_job: WorkflowJob, opid: str, force=False
return new_job

def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
""" Get finished jobs """
"""
Get finished jobs
Returns a tuple of successful jobs and failed jobs
Jobs are considered finished if they have a last status of "Succeeded" or "Failed"
or if they have reached the maximum number of failures

Unfinished jobs are checked for status and updated if needed.
A checkpoint is saved after checking for finished jobs.
"""
successful_jobs = []
failed_jobs = []
for job in self.job_cache:
if not job.done:
status = job.job_status
if status == "Succeeded" and job.opid:
if job.workflow.last_status == "Succeeded" and job.opid:
successful_jobs.append(job)
continue
if job.workflow.last_status == "Failed" and job.workflow.failed_count >= self._MAX_FAILS:
failed_jobs.append(job)
continue
# check status
status = job.job.get_job_status()

if status == "Succeded":
job.workflow.last_status = status
successful_jobs.append(job)
elif status == "Failed" and job.opid:
continue
elif status == "Failed":
job.workflow.last_status = status
job.workflow.failed_count += 1
failed_jobs.append(job)
continue
else:
job.workflow.last_status = status
logger.debug(f"Job {job.opid} status: {status}")
self.save_checkpoint()

if successful_jobs:
logger.info(f"Found {len(successful_jobs)} successful jobs.")
if failed_jobs:
logger.info(f"Found {len(failed_jobs)} failed jobs.")
return (successful_jobs, failed_jobs)
return successful_jobs, failed_jobs

def process_successful_job(self, job: WorkflowJob) -> Database:
""" Process a successful job and return a Database object """
Expand Down Expand Up @@ -242,7 +268,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
self.save_checkpoint()
return database

def process_failed_job(self, job: WorkflowJob) -> None:
def process_failed_job(self, job: WorkflowJob) -> Optional[str]:
""" Process a failed job """
if job.workflow.state.get("failed_count", 0) >= self._MAX_FAILS:
logger.error(f"Job {job.opid} failed {self._MAX_FAILS} times. Skipping.")
Expand All @@ -253,7 +279,8 @@ def process_failed_job(self, job: WorkflowJob) -> None:
job.workflow.state["last_status"] = job.job_status
self.save_checkpoint()
logger.error(f"Job {job.opid} failed {job.workflow.state['failed_count']} times. Retrying.")
job.job.submit_job()
jobid = job.job.submit_job()
return jobid


class RuntimeApiHandler:
Expand Down Expand Up @@ -313,16 +340,18 @@ def restore_from_checkpoint(self, state_data: Dict[str, Any] = None)-> None:
def cycle(self):
""" Perform a cycle of watching for unclaimed jobs, claiming jobs, and processing finished jobs """
self.restore_from_checkpoint()
if not self.should_skip_claim:
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)
# if not self.should_skip_claim: - is this actually used?
unclaimed_jobs = self.runtime_api_handler.get_unclaimed_jobs(self.config.allowed_workflows)
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)


logger.info(f"Checking for finished jobs.")
successful_jobs, failed_jobs = self.job_manager.get_finished_jobs()
logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.")
if not successful_jobs and not failed_jobs:
logger.info("No finished jobs found.")
for job in successful_jobs:
logger.info(f"Processing successful job: {job.opid}, {job.workflow_execution_id}")
job_database = self.job_manager.process_successful_job(job)
# sanity checks
if not job_database.data_object_set:
Expand Down Expand Up @@ -352,6 +381,7 @@ def cycle(self):
logging.info(f"Updated operation {job.opid} response id: {resp}")

for job in failed_jobs:
logger.info(f"Processing failed job: {job.opid}, {job.workflow_execution_id}")
self.job_manager.process_failed_job(job)

def watch(self):
Expand Down
20 changes: 16 additions & 4 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class CromwellRunner(JobRunnerABC):
"""Job runner for Cromwell"""
LABEL_SUBMITTER_VALUE = "nmdcda"
LABEL_PARAMETERS = ["release", "wdl", "git_repo"]
NO_SUBMIT_STATES = ["Submitted", # job is already submitted but not running
# States that indicate a job is in some active state and does not need to be submitted
NO_SUBMIT_STATES = [
"Submitted", # job is already submitted but not running
"Running", # job is already running
"Failed", # job failed
"Succeeded", # job succeeded
"Aborted", # job was aborted and did not finish
"Aborting" # job is in the process of being aborted
"On Hold", # job is on hold and not running. It can be manually resumed later
]
Expand Down Expand Up @@ -152,7 +152,7 @@ def submit_job(self, force: bool = False) -> Optional[str]:
:param force: if True, submit the job even if it is in a state that does not require submission
:return: the job id
"""
status = self.get_job_status()
status = self.workflow.last_status
if status in self.NO_SUBMIT_STATES and not force:
logging.info(f"Job {self.job_id} in state {status}, skipping submission")
return
Expand Down Expand Up @@ -270,6 +270,18 @@ def config(self) -> Dict[str, Any]:
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@last_status.setter
def last_status(self, status: str):
self.cached_state["last_status"] = status

@property
def failed_count(self) -> int:
return self.cached_state.get("failed_count", 0)

@failed_count.setter
def failed_count(self, count: int):
self.cached_state["failed_count"] = count

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)
Expand Down
Loading