Skip to content

Commit

Permalink
Ensure read_qc workflow records are processed first for each omics re…
Browse files Browse the repository at this point in the history
…cord
  • Loading branch information
mbthornton-lbl committed Feb 7, 2024
1 parent 33b294e commit c99cd3e
Showing 1 changed file with 33 additions and 65 deletions.
98 changes: 33 additions & 65 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ def extract_records(ctx, study_id, api_base_url):
for omics_processing_record in omics_processing_records:
db = nmdc.Database()
db_failed = nmdc.Database()
db_no_type = nmdc.Database()
is_failed_data = False
is_no_type_data = False

logging.info(f"omics_processing_record: " f"{omics_processing_record['id']}")
legacy_id = _get_legacy_id(omics_processing_record)
Expand Down Expand Up @@ -125,23 +123,22 @@ def extract_records(ctx, study_id, api_base_url):
metatranscriptome_activity_records,
) = ([], [], [], [], [], [])

downstream_workflow_activity_sets = {
"read_qc_analysis_activity_set": read_qc_records,
"read_based_taxonomy_analysis_activity_set": readbased_records,
"metagenome_assembly_set": metagenome_assembly_records,
"metagenome_annotation_activity_set": metagenome_annotation_records,
"mags_activity_set": mags_records,
"metatranscriptome_activity_set": metatranscriptome_activity_records,
}
downstream_workflow_activity_sets = (
("read_qc_analysis_activity_set", read_qc_records),
("read_based_taxonomy_analysis_activity_set", readbased_records),
("metagenome_assembly_set", metagenome_assembly_records),
("metagenome_annotation_activity_set", metagenome_annotation_records),
("mags_activity_set", mags_records),
("metatranscriptome_activity_set", metatranscriptome_activity_records),
)
is_reads_qc_missing_data_objects = False
for set_name, workflow_records in downstream_workflow_activity_sets.items():
for set_name, workflow_records in downstream_workflow_activity_sets:
logging.info(f"set_name: {set_name} for {legacy_id}")
workflow_records = api_client.get_workflow_activities_informed_by(set_name,
legacy_id)
logging.info(f"found {len(workflow_records)} records")
passing_records = []
failing_records = []
no_type_records = []

# Get workflow record(s) for each activity set - generally only one but could be more
for workflow_record in workflow_records:
Expand All @@ -153,15 +150,12 @@ def extract_records(ctx, study_id, api_base_url):
input_output_data_object_ids.update(workflow_record["has_output"])

is_missing_data_objects = False
is_no_type_data_objects = False
passing_data_objects = set()
failing_data_objects = set()
no_type_data_objects = set()
for data_object_id in input_output_data_object_ids:
data_object_record = api_client.get_data_object(
data_object_id
)

# Check for orphaned data objects
if not data_object_record:
logging.error(f"DataObjectNotFound {data_object_id} for {workflow_record['type']}"
Expand All @@ -170,74 +164,56 @@ def extract_records(ctx, study_id, api_base_url):
is_failed_data = True
if set_name == "read_qc_analysis_activity_set":
is_reads_qc_missing_data_objects = True
logging.error(f"ReadsQCMissingDataObjects: {workflow_record['id']}, {workflow_record['type']}, {workflow_record['name']}")
continue
logging.error(f"ReadsQCMissingDataObjects: {workflow_record['id']}, "
f"{workflow_record['name']} failing all data objects")
# All other data objects fail if one is missing
failing_data_objects.update(passing_data_objects)
passing_data_objects.clear()

continue
# If ReadQC was failed for missing Data Objects, every data object is failed
if is_reads_qc_missing_data_objects:
if data_object_record not in failing_data_objects:
logging.error(f"ReadsQCFailingDataObjects: {data_object_id}")
failing_data_objects.add(data_object_record)

logging.error(f"FailedDataObject: ReadsQCFailed: {data_object_id},")
failing_data_objects.add(data_object_record)
continue
# If we found a missing data object for this workflow record, we fail all its data objects
if is_missing_data_objects:
if data_object_record not in failing_data_objects:
failing_data_objects.add(data_object_record)
logging.error(f"FailedDataObject: {data_object_id},")

# Some legacy Data Objects cannot be typed
# Some legacy Data Objects cannot be readily typed - warn and skip
data_object_type = data_object_record.get("data_object_type")
data_object_url = data_object_record.get("url")
if not data_object_type and not data_object_url:
is_no_type_data_objects = True
is_no_type_data = True
logging.warning(f"DataObjectNoType: {data_object_id}")
no_type_records.append(data_object_record)




passing_data_objects.add(data_object_record)
continue
else:
passing_data_objects.add(data_object_record)
continue
if passing_data_objects:
db.data_object_set.extend(passing_data_objects)
if failing_data_objects:
db_failed.data_object_set.extend(failing_data_objects)
if no_type_data_objects:
db_no_type.data_object_set.extend(no_type_data_objects)

# add the workflow record if not missing data objects or data object type is missing
# if ReadsQC was failed for missing Data Objects, every other workflow record is failed as well
if is_reads_qc_missing_data_objects:
logging.warning(f"ReadsQCMissingDataObjects: {workflow_record['id']}, {workflow_record['type']}, {workflow_record['name']}")
logging.error(f"ReadsQCMissingDataObjects: {workflow_record['id']}, {workflow_record['name']}")
failing_records.append(workflow_record)
# if this workflow had missing data objects, fail it
elif is_missing_data_objects:
logging.warning(f"MissingDataObjects: {workflow_record['id']}, {workflow_record['type']}, {workflow_record['name']}")
failing_records.append(workflow_record)

elif is_no_type_data_objects:
logging.warning(f"NoTypeDataObjects: {workflow_record['id']}, {workflow_record['type']}, {workflow_record['name']}")
no_type_records.append(workflow_record)
else:
passing_records.append(workflow_record)

if failing_records:
db_failed.__setattr__(set_name, failing_records)
if no_type_records:
db_no_type.__setattr__(set_name, no_type_records)
db.__setattr__(set_name, passing_records)



# Search for orphaned data objects with the legacy ID in the description
orphaned_data_objects = api_client.get_data_objects_by_description(legacy_id)
# check that we don't already have the data object in the set
for data_object in orphaned_data_objects:
if data_object not in db.data_object_set:
db.data_object_set.append(data_object)
logging.info(
f"Added orphaned data object: "
f"{data_object['id']}, {data_object['description']}"
)
if passing_records:
db.__setattr__(set_name, passing_records)

retrieved_databases.append(db)
if is_failed_data:
retrieved_missing_data_objects.append(db_failed)
if is_no_type_data:
retrieved_no_type_data_objects.append(db_no_type)

json_data = json.loads(json_dumper.dumps(retrieved_databases, inject_type=False))
db_outfile = DATA_DIR.joinpath(f"{study_id}_associated_record_dump.json")
Expand All @@ -254,14 +230,6 @@ def extract_records(ctx, study_id, api_base_url):
json_data = json.loads(json_dumper.dumps(retrieved_missing_data_objects, inject_type=False))
f.write(json.dumps(json_data, indent=4))

# write no type records to a separate file if they exist
if retrieved_no_type_data_objects:
db_no_type_outfile = DATA_DIR.joinpath(f"{study_id}_no_type_record_dump.json")
logging.info(f"Writing {len(retrieved_no_type_data_objects)} no type records to {db_no_type_outfile}")
with open(db_no_type_outfile, "w") as f:
json_data = json.loads(json_dumper.dumps(retrieved_no_type_data_objects, inject_type=False))
f.write(json.dumps(json_data, indent=4))


@cli.command()
@click.option(
Expand Down

0 comments on commit c99cd3e

Please sign in to comment.