Skip to content

Commit

Permalink
update check status
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 26, 2024
1 parent 93a4456 commit a061821
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion nmdc_automation/run_process/run_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def watcher(ctx, site_configuration_file):
level=logging_level, format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
logger.info(f"Config file: {site_configuration_file}")
logger.info(f"Initializing Watcher: config file: {site_configuration_file}")
ctx.obj = Watcher(site_configuration_file)


Expand Down
9 changes: 4 additions & 5 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, config: SiteConfig, state_file: Union[str, Path] = None):
self._state_file = None
# set state file
if state_file:
logger.info(f"Using state file: {state_file}")
logger.info(f"Initializing FileHandler with state file: {state_file}")
self._state_file = Path(state_file)
elif self.config.agent_state:
logger.info(f"Using state file from config: {self.config.agent_state}")
Expand Down Expand Up @@ -64,7 +64,6 @@ def state_file(self, value) -> None:

def read_state(self) -> Optional[Dict[str, Any]]:
""" Read the state file and return the data """
logging.info(f"Reading state from {self.state_file}")
with open(self.state_file, "r") as f:
state = loads(f.read())
return state
Expand Down Expand Up @@ -137,7 +136,7 @@ def restore_from_state(self) -> None:
""" Restore jobs from state data """
new_jobs = self.get_new_workflow_jobs_from_state()
if new_jobs:
logger.info(f"Restoring {len(new_jobs)} jobs from state.")
logger.info(f"Adding {len(new_jobs)} new jobs from state file.")
self.job_cache.extend(new_jobs)

def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
Expand All @@ -151,10 +150,10 @@ def get_new_workflow_jobs_from_state(self) -> List[WorkflowJob]:
# already in cache
continue
wf_job = WorkflowJob(self.config, workflow_state=job)
logger.debug(f"New workflow job: {wf_job.opid} from state.")
logger.info(f"New Job from State: {wf_job.workflow_execution_id}, {wf_job.workflow.nmdc_jobid}")
logger.info(f"Last Status: {wf_job.workflow.last_status}")
job_cache_ids.append(wf_job.opid)
wf_job_list.append(wf_job)
logging.info(f"Restored {len(wf_job_list)} jobs from state")
return wf_job_list

def find_job_by_opid(self, opid) -> Optional[WorkflowJob]:
Expand Down
22 changes: 15 additions & 7 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ def submit_job(self, force: bool = False) -> Optional[str]:

def get_job_status(self) -> str:
""" Get the status of a job from Cromwell """
if not self.job_id:
if not self.workflow.cromwell_jobid:
return "Unknown"
status_url = f"{self.service_url}/{self.job_id}/status"
status_url = f"{self.service_url}/{self.workflow.cromwell_jobid}/status"
# There can be a delay between submitting a job and it
# being available in Cromwell so handle 404 errors
logging.debug(f"Getting job status from {status_url}")
try:
response = requests.get(status_url)
response.raise_for_status()
Expand All @@ -200,11 +201,6 @@ def get_job_status(self) -> str:
return "Unknown"
raise e


response = requests.get(status_url)
response.raise_for_status()
return response.json().get("status", "Unknown")

def get_job_metadata(self) -> Dict[str, Any]:
""" Get metadata for a job from Cromwell """
metadata_url = f"{self.service_url}/{self.job_id}/metadata"
Expand Down Expand Up @@ -270,6 +266,18 @@ def config(self) -> Dict[str, Any]:
# for backward compatibility we need to check for both keys
return self.cached_state.get("conf", self.cached_state.get("config", {}))

@property
def last_status(self) -> Optional[str]:
return self.cached_state.get("last_status", None)

@property
def nmdc_jobid(self) -> Optional[str]:
return self.cached_state.get("nmdc_jobid", None)

@property
def cromwell_jobid(self) -> Optional[str]:
return self.cached_state.get("cromwell_jobid", None)

@property
def execution_template(self) -> Dict[str, str]:
# for backward compatibility we need to check for both keys
Expand Down

0 comments on commit a061821

Please sign in to comment.