Skip to content

Commit

Permalink
try normalizing the workflow dict after creation
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Nov 15, 2024
1 parent e9d9fe6 commit a2a7e75
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
3 changes: 3 additions & 0 deletions nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def process_successful_job(self, job: WorkflowJob) -> Database:
# exit early if there is an error
sys.exit(1)
database.workflow_execution_set = [workflow_execution_record]
logger.info(f"Created workflow execution record for job {job.opid}")

self.file_handler.write_metadata_if_not_exists(job)
return database
Expand Down Expand Up @@ -328,6 +329,8 @@ def cycle(self):
if validation_report.results:
logger.error(f"Validation error: {validation_report.results[0].message}")
continue
else:
logger.info(f"Database object validated for job {job.opid}")

# post workflow execution and data objects to the runtime api
resp = self.runtime_api_handler.post_objects(job_dict)
Expand Down
24 changes: 18 additions & 6 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,16 +548,28 @@ def make_workflow_execution_record(self, data_objects: List[DataObject]) -> Dict
else:
logging.warning(f"Field {field_name} not found in {data_path}")

wf_dict = _normalize_workflow_dict(wf_dict)
return wf_dict


def _normalize_value(field_name,value: Any) -> Any:
""" Normalize values and fix common issues """
# completeness and contamination need to be converted from string to float
if field_name in ["completeness", "contamination"]:
logging.info(f"Converting {field_name} to float")
return float(value)
def _normalize_workflow_dict(workflow_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Traverse the workflow dict and normalize incorrectly formatted values e.g. "16.37" -> 16.37
"""
for key, value in workflow_dict.items():
if isinstance(value, dict):
workflow_dict[key] = _normalize_workflow_dict(value)
else:
workflow_dict[key] = _normalize_value(key, value)
return workflow_dict


def _normalize_value(key: str, value: Any) -> Any:
"""
Normalize a value based on the key.
"""
if key in ["completeness", "contamination"]:
return float(value)
return value

def _json_tmp(data):
Expand Down

0 comments on commit a2a7e75

Please sign in to comment.