Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix mis-handled claim_jobs #305

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nmdc_automation/run_process/run_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def watcher(ctx, site_configuration_file):
level=logging_level, format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
logger.info(f"Config file: {site_configuration_file}")
logger.info(f"Initializing Watcher: config file: {site_configuration_file}")
ctx.obj = Watcher(site_configuration_file)


Expand Down
14 changes: 8 additions & 6 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None):
self._state_file = None
# set state file
if state_file:
logger.info(f"Using state file: {state_file}")
logger.info(f"Initializing FileHandler with state file: {state_file}")
self._state_file = Path(state_file)
elif self.config.agent_state:
logger.info(f"Using state file from config: {self.config.agent_state}")
Expand Down Expand Up @@ -64,7 +64,6 @@ def state_file(self, value) -> None:

def read_state(self) -> Optional[Dict[str, Any]]:
""" Read the state file and return the data """
logging.info(f"Reading state from {self.state_file}")
with open(self.state_file, "r") as f:
state = loads(f.read())
return state
Expand Down Expand Up @@ -137,7 +136,7 @@ def restore_from_state(self) -> None:
""" Restore jobs from state data """
new_jobs = self.get_new_workflow_jobs_from_state()
if new_jobs:
logger.info(f"Restoring {len(new_jobs)} jobs from state.")
logger.info(f"Adding {len(new_jobs)} new jobs from state file.")
self.job_cache.extend(new_jobs)

def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
Expand All @@ -151,10 +150,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
# already in cache
continue
wf_job = WorkflowJob(self.config, workflow_state=job)
logger.debug(f"New workflow job: {wf_job.opid} from state.")
logger.info(f"New Job from State: {wf_job.workflow_execution_id}, {wf_job.workflow.nmdc_jobid}")
logger.info(f"Last Status: {wf_job.workflow.last_status}")
job_cache_ids.append(wf_job.opid)
wf_job_list.append(wf_job)
logging.info(f"Restored {len(wf_job_list)} jobs from state")
return wf_job_list

def find_job_by_opid(self, opid) -> Optional[WorkflowJob]:
Expand Down Expand Up @@ -319,7 +318,10 @@ def cycle(self):
logger.info(f"Found {len(unclaimed_jobs)} unclaimed jobs.")
self.claim_jobs(unclaimed_jobs)


logger.info(f"Checking for finished jobs.")
successful_jobs, failed_jobs = self.job_manager.get_finished_jobs()
logger.debug(f"Found {len(successful_jobs)} successful jobs and {len(failed_jobs)} failed jobs.")
for job in successful_jobs:
job_database = self.job_manager.process_successful_job(job)
# sanity checks
Expand Down Expand Up @@ -367,7 +369,7 @@ def claim_jobs(self, unclaimed_jobs: List[WorkflowJob] = None) -> None:
for job in unclaimed_jobs:
logger.info(f"Claiming job {job.workflow.nmdc_jobid}")
claim = self.runtime_api_handler.claim_job(job.workflow.nmdc_jobid)
opid = claim["detail"]["id"]
opid = claim["id"]
new_job = self.job_manager.prepare_and_cache_new_job(job, opid)
if new_job:
new_job.job.submit_job()
Expand Down
30 changes: 25 additions & 5 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,20 @@ def submit_job(self, force: bool = False) -> Optional[str]:

def get_job_status(self) -> str:
""" Get the status of a job from Cromwell """
if not self.job_id:
if not self.workflow.cromwell_jobid:
return "Unknown"
status_url = f"{self.service_url}/{self.job_id}/status"
response = requests.get(status_url)
response.raise_for_status()
return response.json().get("status", "Unknown")
status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status"
# There can be a delay between submitting a job and it
# being available in Cromwell so handle 404 errors
logging.debug(f"Getting job status from {status_url}")
try:
response = requests.get(status_url)
response.raise_for_status()
return response.json().get("status", "Unknown")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return "Unknown"
raise e

def get_job_metadata(self) -> Dict[str, Any]:
""" Get metadata for a job from Cromwell """
Expand Down Expand Up @@ -258,6 +266,18 @@ def config(self) -> Dict[str, Any]:
# for backward compatibility we need to check for both keys
return self.cached_state.get("conf", self.cached_state.get("config", {}))

@property
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)

@property
def cromwell_jobid(self) -> Optional[str]:
return self.cached_state.get("cromwell_jobid", None)

@property
def execution_template(self) -> Dict[str, str]:
# for backward compatibility we need to check for both keys
Expand Down
24 changes: 18 additions & 6 deletions tests/test_watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ def test_job_manager_prepare_and_cache_new_job_force(site_config, initial_state_


def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir):
# Mock the URL and response

# Arrange - initial state has 1 failure and is not done
fh = FileHandler(site_config, initial_state_file)
Expand All @@ -287,11 +286,24 @@ def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures
# sanity check
assert len(jm.job_cache) == 3

# Act
successful_jobs, failed_jobs = jm.get_finished_jobs()
# Assert
assert successful_jobs
assert failed_jobs
# Mock requests for job status
with requests_mock.Mocker() as m:
# Mock the successful job status
m.get(
"http://localhost:8088/api/workflows/v1/9492a397-eb30-472b-9d3b-abc123456789/status",
json={"status": "Succeeded"}
)
# Mock the failed job status
m.get(
"http://localhost:8088/api/workflows/v1/12345678-abcd-efgh-ijkl-9876543210/status",
json={"status": "Failed"}
)

# Act
successful_jobs, failed_jobs = jm.get_finished_jobs()
# Assert
assert successful_jobs
assert failed_jobs
# cleanup
jm.job_cache = []

Expand Down