diff --git a/nmdc_automation/workflow_automation/watch_nmdc.py b/nmdc_automation/workflow_automation/watch_nmdc.py index f4469801..01cd2232 100644 --- a/nmdc_automation/workflow_automation/watch_nmdc.py +++ b/nmdc_automation/workflow_automation/watch_nmdc.py @@ -264,6 +264,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database: logger.info(f"Created workflow execution record for job {job.opid}") job.done = True + job.workflow.state["end"] = workflow_execution.ended_at_time self.file_handler.write_metadata_if_not_exists(job) self.save_checkpoint() return database diff --git a/nmdc_automation/workflow_automation/wfutils.py b/nmdc_automation/workflow_automation/wfutils.py index c63b1847..c9da153c 100755 --- a/nmdc_automation/workflow_automation/wfutils.py +++ b/nmdc_automation/workflow_automation/wfutils.py @@ -551,6 +551,7 @@ def make_workflow_execution(self, data_objects: List[DataObject]) -> WorkflowExe """ wf_dict = self.as_workflow_execution_dict wf_dict["has_output"] = [dobj.id for dobj in data_objects] + wf_dict["ended_at_time"] = self.job.metadata.get("end") # workflow-specific keys logical_names = set() diff --git a/tests/test_wfutils.py b/tests/test_wfutils.py index ca9509a3..f6a75300 100644 --- a/tests/test_wfutils.py +++ b/tests/test_wfutils.py @@ -284,6 +284,8 @@ def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtur for data_object in data_objects: assert isinstance(data_object, DataObject) wfe = job.make_workflow_execution(data_objects) + assert wfe.started_at_time + assert wfe.ended_at_time assert isinstance(wfe, MagsAnalysis) # attributes from final_stats_json assert wfe.mags_list @@ -305,6 +307,18 @@ def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtur assert isinstance(wfe.binned_contig_num, int) +def test_workflow_execution_record_from_workflow_job(site_config, fixtures_dir, tmp_path): + job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json")) + workflow_state = json.load(open(fixtures_dir / "mags_workflow_state.json")) + # remove 'end' from the workflow state to simulate a job that is still running + workflow_state.pop('end') + job = WorkflowJob(site_config, workflow_state, job_metadata) + data_objects = job.make_data_objects(output_dir=tmp_path) + + wfe = job.make_workflow_execution(data_objects) + assert wfe.started_at_time + assert wfe.ended_at_time + def test_workflow_job_from_database_job_record(site_config, fixtures_dir):