Skip to content

Commit

Permalink
Merge pull request #51 from microbiomedata/50-implement-tool-to-find-…
Browse files Browse the repository at this point in the history
…all-missing-data-objects-from-legacy-projects

50 implement tool to find all missing data objects from legacy projects
  • Loading branch information
mbthornton-lbl authored Feb 7, 2024
2 parents 0fe29da + 6ed76e3 commit 72f2a81
Show file tree
Hide file tree
Showing 4 changed files with 401,459 additions and 27 deletions.
2 changes: 1 addition & 1 deletion nmdc_automation/nmdc_common/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_workflow_activities_informed_by(self, workflow_activity_set: str,
}
url = self.base_url + "nmdcschema/" + workflow_activity_set
response = requests.get(url, params=params, headers=self.headers)
logger.info(response.url)
logger.debug(response.url)
response.raise_for_status()
workflow_activity_record = response.json()["resources"]
return workflow_activity_record
Expand Down
20 changes: 20 additions & 0 deletions nmdc_automation/re_iding/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,28 @@
OMICS_PROCESSING_SET = "omics_processing_set"
DATA_OBJECT_SET = "data_object_set"
READS_QC_SET = "read_qc_analysis_activity_set"
READS_BASED_TAXONOMY_ANALYSIS_ACTIVITY_SET = "read_based_taxonomy_analysis_activity_set"
METAGENOME_ASSEMBLY_SET = "metagenome_assembly_set"
METAGENOME_ANNOTATION_ACTIVITY_SET = "metagenome_annotation_activity_set"
METAGENOME_SEQUENCING_ACTIVITY_SET = "metagenome_sequencing_activity_set"
MAGS_ACTIVITY_SET = "mags_activity_set"
METATRANSCRIPTOME_ACTIVITY_SET = "metatranscriptome_activity_set"
METAPROTEOMICS_ANALYSIS_ACTIVITY_SET = "metaproteomics_analysis_activity_set"
METABOLOMICS_ANALYSIS_ACTIVITY_SET = "metabolomics_analysis_activity_set"
NOM_ANALYSIS_ACTIVITY_SET= "nom_analysis_activity_set"

ANALYSIS_ACTIVITIES = [
READS_QC_SET,
READS_BASED_TAXONOMY_ANALYSIS_ACTIVITY_SET,
METAGENOME_ANNOTATION_ACTIVITY_SET,
METAGENOME_SEQUENCING_ACTIVITY_SET,
METAGENOME_ASSEMBLY_SET,
MAGS_ACTIVITY_SET,
METATRANSCRIPTOME_ACTIVITY_SET,
METAPROTEOMICS_ANALYSIS_ACTIVITY_SET,
METABOLOMICS_ANALYSIS_ACTIVITY_SET,
NOM_ANALYSIS_ACTIVITY_SET
]



Expand Down
134 changes: 108 additions & 26 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import nmdc_schema.nmdc as nmdc
from nmdc_automation.re_iding.base import ReIdTool
from nmdc_automation.re_iding.changesheets import Changesheet, ChangesheetLineItem
from nmdc_automation.re_iding.db_utils import get_omics_processing_id
from nmdc_automation.re_iding.db_utils import get_omics_processing_id, ANALYSIS_ACTIVITIES

# Defaults
GOLD_STUDY_ID = "gold:Gs0114663"
Expand All @@ -33,14 +33,11 @@
LOG_PATH = DATA_DIR.joinpath("re_id_tool.log")

logging.basicConfig(
filename="re_id.log",
filemode="w",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())



@click.group()
Expand Down Expand Up @@ -76,8 +73,8 @@ def extract_records(ctx, study_id, api_base_url):
Write the results, as a list of nmdc-schema Database instances to a JSON file.
"""
start_time = time.time()
logger.info(f"Extracting workflow records for study_id: {study_id}")
logger.info(f"study_id: {study_id}")
logging.info(f"Extracting workflow records for study_id: {study_id}")
logging.info(f"study_id: {study_id}")

config = ctx.obj["site_config"]
# api_client = NmdcRuntimeUserApi(config)
Expand All @@ -88,22 +85,22 @@ def extract_records(ctx, study_id, api_base_url):
api_client.get_omics_processing_records_part_of_study(
study_id
))
logger.info(
logging.info(
f"Retrieved {len(omics_processing_records)} OmicsProcessing records for study {study_id}"
)

retrieved_databases = []
# 2. For each OmicsProcessing record, find the legacy identifier:
for omics_processing_record in omics_processing_records:
db = nmdc.Database()
logger.info(f"omics_processing_record: " f"{omics_processing_record['id']}")
logging.info(f"omics_processing_record: " f"{omics_processing_record['id']}")
legacy_id = _get_legacy_id(omics_processing_record)
logger.info(f"legacy_id: {legacy_id}")
logging.info(f"legacy_id: {legacy_id}")

omics_type = omics_processing_record["omics_type"]["has_raw_value"]
omics_id = omics_processing_record["id"]
if omics_type not in ["Metagenome", "Metatranscriptome"]:
logger.info(
logging.info(
f"omics_processing_record {omics_id}: {omics_type}] "
f"is not a Metagenome or Metatranscriptome, skipping"
)
Expand All @@ -112,11 +109,11 @@ def extract_records(ctx, study_id, api_base_url):
for data_object_id in omics_processing_record["has_output"]:
data_object_record = api_client.get_data_object(data_object_id)
if not data_object_record:
logger.warning(f"no data object found for {data_object_id}")
logging.warning(f"no data object found for {data_object_id}")
continue
data_object_type = data_object_record.get("data_object_type")
data_object_description = data_object_record.get("description")
logger.info(
logging.info(
f"has_output: "
f"{data_object_record['id']}, "
f"Type: {data_object_type}, "
Expand All @@ -143,13 +140,13 @@ def extract_records(ctx, study_id, api_base_url):
"metatranscriptome_activity_set": metatranscriptome_activity_records,
}
for set_name, workflow_records in downstream_workflow_activity_sets.items():
logger.info(f"set_name: {set_name} for {legacy_id}")
logging.info(f"set_name: {set_name} for {legacy_id}")
workflow_records = api_client.get_workflow_activities_informed_by(set_name,
legacy_id)
logger.info(f"found {len(workflow_records)} records")
logging.info(f"found {len(workflow_records)} records")
db.__setattr__(set_name, workflow_records)
for workflow_record in workflow_records:
logger.info(f"record: {workflow_record['id']}, {workflow_record['name']}")
logging.info(f"record: {workflow_record['id']}, {workflow_record['name']}")
input_output_data_object_ids = []
if "has_input" in workflow_record:
input_output_data_object_ids.extend(workflow_record["has_input"])
Expand All @@ -161,11 +158,11 @@ def extract_records(ctx, study_id, api_base_url):
data_object_id
)
if not data_object_record:
logger.warning(f"no data object found for {data_object_id}")
logging.warning(f"no data object found for {data_object_id}")
continue
data_object_type = data_object_record.get("data_object_type")
data_object_description = data_object_record.get("description")
logger.info(
logging.info(
f"has_output: "
f"{data_object_record['id']}, "
f"Type: {data_object_type}, "
Expand All @@ -180,7 +177,7 @@ def extract_records(ctx, study_id, api_base_url):
for data_object in orphaned_data_objects:
if data_object not in db.data_object_set:
db.data_object_set.append(data_object)
logger.info(
logging.info(
f"Added orphaned data object: "
f"{data_object['id']}, {data_object['description']}"
)
Expand All @@ -189,8 +186,8 @@ def extract_records(ctx, study_id, api_base_url):

json_data = json.loads(json_dumper.dumps(retrieved_databases, inject_type=False))
db_outfile = DATA_DIR.joinpath(f"{study_id}_associated_record_dump.json")
logger.info(f"Writing {len(retrieved_databases)} records to {db_outfile}")
logger.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Writing {len(retrieved_databases)} records to {db_outfile}")
logging.info(f"Elapsed time: {time.time() - start_time}")
with open(db_outfile, "w") as f:
f.write(json.dumps(json_data, indent=4))

Expand Down Expand Up @@ -267,8 +264,8 @@ def process_records(ctx, dryrun, study_id, data_dir):

re_ided_db_records.append(new_db)

logger.info(f"Writing {len(re_ided_db_records)} records to {db_outfile}")
logger.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Writing {len(re_ided_db_records)} records to {db_outfile}")
logging.info(f"Elapsed time: {time.time() - start_time}")
json_data = json.loads(json_dumper.dumps(re_ided_db_records, inject_type=False))
with open(db_outfile, "w") as f:
f.write(json.dumps(json_data, indent=4))
Expand Down Expand Up @@ -340,9 +337,9 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
# submit the record to the workflows endpoint
if not changesheet_only:
resp = api_client.post_objects(record)
logger.info(f"{record} posted, got response: {resp}")
logging.info(f"{record} posted, got response: {resp}")
else:
logger.info(f"changesheet_only is True, skipping ingest")
logging.info(f"changesheet_only is True, skipping ingest")

changesheet.write_changesheet()
logging.info(f"changesheet written to {changesheet.output_filepath}")
Expand Down Expand Up @@ -427,6 +424,91 @@ def delete_old_records(ctx, old_records_file):
)


@cli.command()
@click.option("--study-id", default=STUDY_ID, help="NMDC study ID")
@click.option("--api-base-url", default=NAPA_BASE_URL,
help=f"Optional base URL for the NMDC API. Default: {NAPA_BASE_URL}")
@click.pass_context
def orphan_data_objects(ctx, study_id, api_base_url):
"""
Scan project data directories, read in the data object records from 'data_objects.json'
and find data objects that:
- are associated with one or more workflow activities has_input or has_output
- are not present in the data_objects collection in the NMDC database
Write the results to a JSON file of nmdc DataObject instances.
"""
start_time = time.time()
logging.info(f"Scanning for orphaned data objects for {study_id}")


api_client = NmdcApi(api_base_url)
with open("unique_data_objects.json", "r") as f:
data_objects = json.load(f)
# index the data objects by ID
data_objects_by_id = {data_object["id"]: data_object for data_object in data_objects}


# 1. Retrieve all OmicsProcessing records for the updated NMDC study ID
omics_processing_records = (
api_client.get_omics_processing_records_part_of_study(
study_id
))
orphan_data_object_ids = set()
# 2. For each OmicsProcessing record, find the legacy identifier:
for omics_processing_record in omics_processing_records:
informed_by_id = _get_legacy_id(omics_processing_record)
for activity_set_name in ANALYSIS_ACTIVITIES:
workflow_records = api_client.get_workflow_activities_informed_by(activity_set_name, informed_by_id)
for workflow_record in workflow_records:
data_object_ids = set()
data_object_ids.update(workflow_record["has_input"])
data_object_ids.update(workflow_record["has_output"])

# Search the data object IDs
for data_object_id in data_object_ids:
data_object_record = api_client.get_data_object(data_object_id)
if not data_object_record:
logging.warning(f"{informed_by_id} : {workflow_record['id']} "
f"{workflow_record['name']} missing: {data_object_id}")
orphan_data_object_ids.add(data_object_id)
continue
logging.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Found {len(orphan_data_object_ids)} orphaned data objects")
# get orphaned data objects from the data_objects_by_id if present
orphaned_data_objects = []
for data_object_id in orphan_data_object_ids:
if data_object_id in data_objects_by_id:
orphaned_data_objects.append(data_objects_by_id[data_object_id])
else:
logging.warning(f"orphaned data object {data_object_id} not found in data_objects.json")
logging.info(f"Writing {len(orphaned_data_objects)} orphaned data objects to orphaned_data_objects.json")
if orphaned_data_objects:
with open(f"{study_id}_orphaned_data_objects.json", "w") as f:
f.write(json.dumps(orphaned_data_objects, indent=4))


@cli.command()
@click.argument("data-objects-file", type=click.Path(exists=True))
def get_unique_data_objects(data_objects_file):
"""
Read in a raw json dump of data objects and return a list of unique data objects
as a json dump.
"""
with open(data_objects_file, "r") as f:
data_objects = json.load(f)

unique_data_objects = []
unique_data_object_ids = set()
for data_object in data_objects:
if data_object["id"] not in unique_data_object_ids:
unique_data_objects.append(data_object)
unique_data_object_ids.add(data_object["id"])
with open("unique_data_objects.json", "w") as f:
f.write(json.dumps(unique_data_objects, indent=4))



def _get_data_dir(data_dir, dryrun):
"""
Return the path to the data object files
Expand Down Expand Up @@ -466,7 +548,7 @@ def _get_legacy_id(omics_processing_record: dict) -> str:
alternative_ids = omics_processing_record.get("alternative_identifiers", [])
legacy_ids.extend(alternative_ids)
if len(legacy_ids) == 0:
logging.warning(
logging.debug(
f"No legacy IDs found for: {omics_processing_record['id']} using ID instead"
)
return omics_processing_record["id"]
Expand Down
Loading

0 comments on commit 72f2a81

Please sign in to comment.