Skip to content

Commit

Permalink
Merge pull request #313 from microbiomedata/311-workflow-execution-re…
Browse files Browse the repository at this point in the history
…cord-in-database-is-missing-ended_at_time

311 workflow execution record in database is missing ended at time
  • Loading branch information
mbthornton-lbl authored Dec 5, 2024
2 parents 270afce + 6d2e06e commit fc9af85
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
1 change: 1 addition & 0 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
logger.info(f"Created workflow execution record for job {job.opid}")

job.done = True
job.workflow.state["end"] = workflow_execution.ended_at_time
self.file_handler.write_metadata_if_not_exists(job)
self.save_checkpoint()
return database
Expand Down
1 change: 1 addition & 0 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ def make_workflow_execution(self, data_objects: List[DataObject]) -> WorkflowExe
"""
wf_dict = self.as_workflow_execution_dict
wf_dict["has_output"] = [dobj.id for dobj in data_objects]
wf_dict["ended_at_time"] = self.job.metadata.get("end")

# workflow-specific keys
logical_names = set()
Expand Down
14 changes: 14 additions & 0 deletions tests/test_wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtur
for data_object in data_objects:
assert isinstance(data_object, DataObject)
wfe = job.make_workflow_execution(data_objects)
assert wfe.started_at_time
assert wfe.ended_at_time
assert isinstance(wfe, MagsAnalysis)
# attributes from final_stats_json
assert wfe.mags_list
Expand All @@ -305,6 +307,18 @@ def test_workflow_job_data_objects_and_execution_record_mags(site_config, fixtur
assert isinstance(wfe.binned_contig_num, int)


def test_workflow_execution_record_from_workflow_job(site_config, fixtures_dir, tmp_path):
job_metadata = json.load(open(fixtures_dir / "mags_job_metadata.json"))
workflow_state = json.load(open(fixtures_dir / "mags_workflow_state.json"))
# remove 'end' from the workflow state to simulate a job that is still running
workflow_state.pop('end')
job = WorkflowJob(site_config, workflow_state, job_metadata)
data_objects = job.make_data_objects(output_dir=tmp_path)

wfe = job.make_workflow_execution(data_objects)
assert wfe.started_at_time
assert wfe.ended_at_time



def test_workflow_job_from_database_job_record(site_config, fixtures_dir):
Expand Down

0 comments on commit fc9af85

Please sign in to comment.