Skip to content

Commit

Permalink
Handle omics processing without has_output
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Feb 12, 2024
1 parent 8ff939a commit 471fc9a
Showing 1 changed file with 42 additions and 9 deletions.
51 changes: 42 additions & 9 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,39 @@ def extract_records(ctx, study_id, api_base_url):
omics_processing_has_outputs = omics_processing_record.get("has_output", [])
# if no has_output, fail the record and its workflow activities and data objects
if not omics_processing_has_outputs:
logging.error(f"No has_output for {omics_id}")
is_failed_data = True
is_omics_missing_has_output = True
omics_level_failure_count += 1
db_failed.omics_processing_set.append(omics_processing_record)
logging.warning(f"No has_output for {omics_id} searching for has_input from ReadQC")
# look for has_input from ReadQC
has_input_data_objects = _get_has_input_from_read_qc(api_client, legacy_id)
if has_input_data_objects:
logging.info(f"has_input_data_objects: {len(has_input_data_objects)}")
omics_processing_record["has_output"] = has_input_data_objects
db.omics_processing_set.append(omics_processing_record)
omics_processing_has_outputs = has_input_data_objects
else:
logging.error(f"OmicsMissingHasOutput: {omics_id} failing")
is_failed_data = True
is_omics_missing_has_output = True
omics_level_failure_count += 1
db_failed.omics_processing_set.append(omics_processing_record)
else:
logging.info(f"Adding OmicsProcessing: {omics_id}")
db.omics_processing_set.append(omics_processing_record)

for data_object_id in omics_processing_has_outputs:
data_object_record = api_client.get_data_object(data_object_id)
# If the data object is an orphan, fail the omics processing record and its data objects
if not data_object_record:
logging.error(f"Missing Data Object: {data_object_id} for {omics_id}")
continue
db.data_object_set.append(data_object_record)
logging.error(f"OmicsProcessingOrphanDataObject: {data_object_id} for {omics_id}")
is_failed_data = True
is_omics_missing_has_output = True
omics_level_failure_count += 1
db_failed.data_object_set.append(data_object_record)
db_failed.omics_processing_set.append(omics_processing_record)
db.omics_processing_set.remove(omics_processing_record)
else:
logging.info(f"Adding OmicsProcessing {data_object_record.get('data_object_type')} has_output "
f"DataObject:{data_object_id}")
db.data_object_set.append(data_object_record)

# downstream workflow activity sets
(
Expand Down Expand Up @@ -258,13 +277,15 @@ def extract_records(ctx, study_id, api_base_url):
logging.error(f"FailedRecords: {set_name}, {len(failing_records)}")
db_failed.__setattr__(set_name, failing_records)
if passing_records:
logging.info(f"PassingRecords: {set_name}, {len(passing_records)}")
db.__setattr__(set_name, passing_records)



if is_failed_data:
retrieved_failed_databases.append(db_failed)
else:
# db is empty if omics_processing has_output is missing
if not is_omics_missing_has_output:
retrieved_databases.append(db)

json_data = json.loads(json_dumper.dumps(retrieved_databases, inject_type=False))
Expand Down Expand Up @@ -640,6 +661,18 @@ def _get_legacy_id(omics_processing_record: dict) -> str:
legacy_id = legacy_ids[0]
return legacy_id

def _get_has_input_from_read_qc(api_client, legacy_id):
"""
Get the has_input data objects for the given legacy ID
"""
read_qc_records = api_client.get_workflow_activities_informed_by(
"read_qc_analysis_activity_set", legacy_id
)
has_input_data_objects = set()
for record in read_qc_records:
has_input_data_objects.update(record.get("has_input", []))
return list(has_input_data_objects)


if __name__ == "__main__":
cli(obj={})

0 comments on commit 471fc9a

Please sign in to comment.