Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

50 implement tool to find all missing data objects from legacy projects #51

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nmdc_automation/nmdc_common/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_workflow_activities_informed_by(self, workflow_activity_set: str,
}
url = self.base_url + "nmdcschema/" + workflow_activity_set
response = requests.get(url, params=params, headers=self.headers)
logger.info(response.url)
logger.debug(response.url)
response.raise_for_status()
workflow_activity_record = response.json()["resources"]
return workflow_activity_record
Expand Down
20 changes: 20 additions & 0 deletions nmdc_automation/re_iding/db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,28 @@
OMICS_PROCESSING_SET = "omics_processing_set"
DATA_OBJECT_SET = "data_object_set"
READS_QC_SET = "read_qc_analysis_activity_set"
READS_BASED_TAXONOMY_ANALYSIS_ACTIVITY_SET = "read_based_taxonomy_analysis_activity_set"
METAGENOME_ASSEMBLY_SET = "metagenome_assembly_set"
METAGENOME_ANNOTATION_ACTIVITY_SET = "metagenome_annotation_activity_set"
METAGENOME_SEQUENCING_ACTIVITY_SET = "metagenome_sequencing_activity_set"
MAGS_ACTIVITY_SET = "mags_activity_set"
METATRANSCRIPTOME_ACTIVITY_SET = "metatranscriptome_activity_set"
METAPROTEOMICS_ANALYSIS_ACTIVITY_SET = "metaproteomics_analysis_activity_set"
METABOLOMICS_ANALYSIS_ACTIVITY_SET = "metabolomics_analysis_activity_set"
NOM_ANALYSIS_ACTIVITY_SET= "nom_analysis_activity_set"

ANALYSIS_ACTIVITIES = [
READS_QC_SET,
READS_BASED_TAXONOMY_ANALYSIS_ACTIVITY_SET,
METAGENOME_ANNOTATION_ACTIVITY_SET,
METAGENOME_SEQUENCING_ACTIVITY_SET,
METAGENOME_ASSEMBLY_SET,
MAGS_ACTIVITY_SET,
METATRANSCRIPTOME_ACTIVITY_SET,
METAPROTEOMICS_ANALYSIS_ACTIVITY_SET,
METABOLOMICS_ANALYSIS_ACTIVITY_SET,
NOM_ANALYSIS_ACTIVITY_SET
]



Expand Down
134 changes: 108 additions & 26 deletions nmdc_automation/re_iding/scripts/re_id_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import nmdc_schema.nmdc as nmdc
from nmdc_automation.re_iding.base import ReIdTool
from nmdc_automation.re_iding.changesheets import Changesheet, ChangesheetLineItem
from nmdc_automation.re_iding.db_utils import get_omics_processing_id
from nmdc_automation.re_iding.db_utils import get_omics_processing_id, ANALYSIS_ACTIVITIES

# Defaults
GOLD_STUDY_ID = "gold:Gs0114663"
Expand All @@ -33,14 +33,11 @@
LOG_PATH = DATA_DIR.joinpath("re_id_tool.log")

logging.basicConfig(
filename="re_id.log",
filemode="w",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())



@click.group()
Expand Down Expand Up @@ -76,8 +73,8 @@ def extract_records(ctx, study_id, api_base_url):
Write the results, as a list of nmdc-schema Database instances to a JSON file.
"""
start_time = time.time()
logger.info(f"Extracting workflow records for study_id: {study_id}")
logger.info(f"study_id: {study_id}")
logging.info(f"Extracting workflow records for study_id: {study_id}")
logging.info(f"study_id: {study_id}")

config = ctx.obj["site_config"]
# api_client = NmdcRuntimeUserApi(config)
Expand All @@ -88,22 +85,22 @@ def extract_records(ctx, study_id, api_base_url):
api_client.get_omics_processing_records_part_of_study(
study_id
))
logger.info(
logging.info(
f"Retrieved {len(omics_processing_records)} OmicsProcessing records for study {study_id}"
)

retrieved_databases = []
# 2. For each OmicsProcessing record, find the legacy identifier:
for omics_processing_record in omics_processing_records:
db = nmdc.Database()
logger.info(f"omics_processing_record: " f"{omics_processing_record['id']}")
logging.info(f"omics_processing_record: " f"{omics_processing_record['id']}")
legacy_id = _get_legacy_id(omics_processing_record)
logger.info(f"legacy_id: {legacy_id}")
logging.info(f"legacy_id: {legacy_id}")

omics_type = omics_processing_record["omics_type"]["has_raw_value"]
omics_id = omics_processing_record["id"]
if omics_type not in ["Metagenome", "Metatranscriptome"]:
logger.info(
logging.info(
f"omics_processing_record {omics_id}: {omics_type}] "
f"is not a Metagenome or Metatranscriptome, skipping"
)
Expand All @@ -112,11 +109,11 @@ def extract_records(ctx, study_id, api_base_url):
for data_object_id in omics_processing_record["has_output"]:
data_object_record = api_client.get_data_object(data_object_id)
if not data_object_record:
logger.warning(f"no data object found for {data_object_id}")
logging.warning(f"no data object found for {data_object_id}")
continue
data_object_type = data_object_record.get("data_object_type")
data_object_description = data_object_record.get("description")
logger.info(
logging.info(
f"has_output: "
f"{data_object_record['id']}, "
f"Type: {data_object_type}, "
Expand All @@ -143,13 +140,13 @@ def extract_records(ctx, study_id, api_base_url):
"metatranscriptome_activity_set": metatranscriptome_activity_records,
}
for set_name, workflow_records in downstream_workflow_activity_sets.items():
logger.info(f"set_name: {set_name} for {legacy_id}")
logging.info(f"set_name: {set_name} for {legacy_id}")
workflow_records = api_client.get_workflow_activities_informed_by(set_name,
legacy_id)
logger.info(f"found {len(workflow_records)} records")
logging.info(f"found {len(workflow_records)} records")
db.__setattr__(set_name, workflow_records)
for workflow_record in workflow_records:
logger.info(f"record: {workflow_record['id']}, {workflow_record['name']}")
logging.info(f"record: {workflow_record['id']}, {workflow_record['name']}")
input_output_data_object_ids = []
if "has_input" in workflow_record:
input_output_data_object_ids.extend(workflow_record["has_input"])
Expand All @@ -161,11 +158,11 @@ def extract_records(ctx, study_id, api_base_url):
data_object_id
)
if not data_object_record:
logger.warning(f"no data object found for {data_object_id}")
logging.warning(f"no data object found for {data_object_id}")
continue
data_object_type = data_object_record.get("data_object_type")
data_object_description = data_object_record.get("description")
logger.info(
logging.info(
f"has_output: "
f"{data_object_record['id']}, "
f"Type: {data_object_type}, "
Expand All @@ -180,7 +177,7 @@ def extract_records(ctx, study_id, api_base_url):
for data_object in orphaned_data_objects:
if data_object not in db.data_object_set:
db.data_object_set.append(data_object)
logger.info(
logging.info(
f"Added orphaned data object: "
f"{data_object['id']}, {data_object['description']}"
)
Expand All @@ -189,8 +186,8 @@ def extract_records(ctx, study_id, api_base_url):

json_data = json.loads(json_dumper.dumps(retrieved_databases, inject_type=False))
db_outfile = DATA_DIR.joinpath(f"{study_id}_associated_record_dump.json")
logger.info(f"Writing {len(retrieved_databases)} records to {db_outfile}")
logger.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Writing {len(retrieved_databases)} records to {db_outfile}")
logging.info(f"Elapsed time: {time.time() - start_time}")
with open(db_outfile, "w") as f:
f.write(json.dumps(json_data, indent=4))

Expand Down Expand Up @@ -267,8 +264,8 @@ def process_records(ctx, dryrun, study_id, data_dir):

re_ided_db_records.append(new_db)

logger.info(f"Writing {len(re_ided_db_records)} records to {db_outfile}")
logger.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Writing {len(re_ided_db_records)} records to {db_outfile}")
logging.info(f"Elapsed time: {time.time() - start_time}")
json_data = json.loads(json_dumper.dumps(re_ided_db_records, inject_type=False))
with open(db_outfile, "w") as f:
f.write(json.dumps(json_data, indent=4))
Expand Down Expand Up @@ -340,9 +337,9 @@ def ingest_records(ctx, reid_records_file, changesheet_only):
# submit the record to the workflows endpoint
if not changesheet_only:
resp = api_client.post_objects(record)
logger.info(f"{record} posted, got response: {resp}")
logging.info(f"{record} posted, got response: {resp}")
else:
logger.info(f"changesheet_only is True, skipping ingest")
logging.info(f"changesheet_only is True, skipping ingest")

changesheet.write_changesheet()
logging.info(f"changesheet written to {changesheet.output_filepath}")
Expand Down Expand Up @@ -427,6 +424,91 @@ def delete_old_records(ctx, old_records_file):
)


@cli.command()
@click.option("--study-id", default=STUDY_ID, help="NMDC study ID")
@click.option("--api-base-url", default=NAPA_BASE_URL,
help=f"Optional base URL for the NMDC API. Default: {NAPA_BASE_URL}")
@click.pass_context
def orphan_data_objects(ctx, study_id, api_base_url):
"""
Scan project data directories, read in the data object records from 'data_objects.json'
and find data objects that:
- are associated with one or more workflow activities has_input or has_output
- are not present in the data_objects collection in the NMDC database

Write the results to a JSON file of nmdc DataObject instances.
"""
start_time = time.time()
logging.info(f"Scanning for orphaned data objects for {study_id}")


api_client = NmdcApi(api_base_url)
with open("unique_data_objects.json", "r") as f:
data_objects = json.load(f)
# index the data objects by ID
data_objects_by_id = {data_object["id"]: data_object for data_object in data_objects}


# 1. Retrieve all OmicsProcessing records for the updated NMDC study ID
omics_processing_records = (
api_client.get_omics_processing_records_part_of_study(
study_id
))
orphan_data_object_ids = set()
# 2. For each OmicsProcessing record, find the legacy identifier:
for omics_processing_record in omics_processing_records:
informed_by_id = _get_legacy_id(omics_processing_record)
for activity_set_name in ANALYSIS_ACTIVITIES:
workflow_records = api_client.get_workflow_activities_informed_by(activity_set_name, informed_by_id)
for workflow_record in workflow_records:
data_object_ids = set()
data_object_ids.update(workflow_record["has_input"])
data_object_ids.update(workflow_record["has_output"])

# Search the data object IDs
for data_object_id in data_object_ids:
data_object_record = api_client.get_data_object(data_object_id)
if not data_object_record:
logging.warning(f"{informed_by_id} : {workflow_record['id']} "
f"{workflow_record['name']} missing: {data_object_id}")
orphan_data_object_ids.add(data_object_id)
continue
logging.info(f"Elapsed time: {time.time() - start_time}")
logging.info(f"Found {len(orphan_data_object_ids)} orphaned data objects")
# get orphaned data objects from the data_objects_by_id if present
orphaned_data_objects = []
for data_object_id in orphan_data_object_ids:
if data_object_id in data_objects_by_id:
orphaned_data_objects.append(data_objects_by_id[data_object_id])
else:
logging.warning(f"orphaned data object {data_object_id} not found in data_objects.json")
logging.info(f"Writing {len(orphaned_data_objects)} orphaned data objects to orphaned_data_objects.json")
if orphaned_data_objects:
with open(f"{study_id}_orphaned_data_objects.json", "w") as f:
f.write(json.dumps(orphaned_data_objects, indent=4))


@cli.command()
@click.argument("data-objects-file", type=click.Path(exists=True))
def get_unique_data_objects(data_objects_file):
"""
Read in a raw json dump of data objects and return a list of unique data objects
as a json dump.
"""
with open(data_objects_file, "r") as f:
data_objects = json.load(f)

unique_data_objects = []
unique_data_object_ids = set()
for data_object in data_objects:
if data_object["id"] not in unique_data_object_ids:
unique_data_objects.append(data_object)
unique_data_object_ids.add(data_object["id"])
with open("unique_data_objects.json", "w") as f:
f.write(json.dumps(unique_data_objects, indent=4))



def _get_data_dir(data_dir, dryrun):
"""
Return the path to the data object files
Expand Down Expand Up @@ -466,7 +548,7 @@ def _get_legacy_id(omics_processing_record: dict) -> str:
alternative_ids = omics_processing_record.get("alternative_identifiers", [])
legacy_ids.extend(alternative_ids)
if len(legacy_ids) == 0:
logging.warning(
logging.debug(
f"No legacy IDs found for: {omics_processing_record['id']} using ID instead"
)
return omics_processing_record["id"]
Expand Down
Loading
Loading