Skip to content

Commit

Permalink
Merge pull request #41 from microbiomedata/40-re-id-logic-for-metatra…
Browse files Browse the repository at this point in the history
…nscriptomes

40 re id logic for metatranscriptomes
  • Loading branch information
mbthornton-lbl authored Jan 23, 2024
2 parents 6bc6154 + 428e78d commit 80c8aa7
Show file tree
Hide file tree
Showing 21 changed files with 763 additions and 466 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ htmlcov/
attic
.idea/

configs/.local_napa_config.toml
nmdc_automation/re_iding/scripts/data/dryrun_data/
35 changes: 34 additions & 1 deletion configs/re_iding_worklfows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,37 @@ Workflows:
data_object_type: Read Based Analysis Info File
description: Read based analysis info for {id}
name: File containing reads based analysis information
suffix: profiler.info
suffix: profiler.info

- Name: Metatranscriptome Activity
Type: nmdc:MetatranscriptomeActivity
Enabled: False
Git_repo: https://github.com/microbiomedata/MetatranscriptomeActivity
Version: 0.0.0
Collection: metatranscriptome_activity_set
ActivityRange: MetatranscriptomeActivity
Predecessors:
- Reads QC
- Reads QC Interleave
Inputs:
input_file: do:Filtered Sequencing Reads
proj: "{activity_id}"
Activity:
name: "Metatranscriptome Activity for {id}"
type: nmdc:MetatranscriptomeActivity
Outputs:
- output: read_count_and_rpkm
name: Read count and RPKM
suffix: ".json"
data_object_type: Read Count and RPKM
description: "Read count and RPKM for {id}"
- output: qc_non_rRNA_R1
name: Non-rRNA reads R1
suffix: "filtered_R1.fastq"
data_object_type: QC non-rRNA R1
description: "R1 reads without the ribosomal sequences for {id}"
- output: qc_non_rRNA_R2
name: Non-rRNA reads R2
suffix: "filtered_R2.fastq"
data_object_type: QC non-rRNA R2
description: "R2 reads without the ribosomal sequences for {id}"
60 changes: 0 additions & 60 deletions nmdc_automation/api/nmdcapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,66 +374,6 @@ def request(self, method, url_path, params_or_json_data=None):
rv.raise_for_status()
return rv

def get_omics_processing_records_for_nmdc_study(self, nmdc_study_id: str):
"""
Retrieve all OmicsProcessing records for the given NMDC study ID.
"""
url = "queries:run"
params = {"find": "omics_processing_set",
"filter": {"part_of": {"$elemMatch": {"$eq": nmdc_study_id}}}}
response = self.request("POST", url, params_or_json_data=params)
if response.status_code != 200:
raise Exception(
f"Error retrieving OmicsProcessing records for study {nmdc_study_id}"
)
omics_processing_records = response.json()["cursor"]["firstBatch"]
return omics_processing_records

def get_workflow_activity_informed_by(self, workflow_activity_set: str,
informed_by_id: str):
"""
Retrieve a workflow activity record for the given workflow activity set
and informed by a given OmicsProcessing ID.
"""
url = "queries:run"
params = {"find": workflow_activity_set,
"filter": {"was_informed_by": informed_by_id}}
response = self.request("POST", url, params_or_json_data=params)
if response.status_code != 200:
raise Exception(
f"Error retrieving {workflow_activity_set} record informed by {informed_by_id}"
)
workflow_activity_record = response.json()["cursor"]["firstBatch"]
return workflow_activity_record

def get_data_objects_by_description(self, description: str):
"""
Retrieve data objects the given description in its description.
"""
response = self.request(
"POST",
"queries:run",
params_or_json_data={
"find": "data_object_set",
"filter": {"description": {"$regex": description, "$options": "i"}},
},
)
response.raise_for_status()
return response.json()["cursor"]["firstBatch"]

def get_data_object_by_id(self, data_object_id: str):
"""
Retrieve a data object record for the given data object ID.
"""
url = f"data_objects/{data_object_id}"
response = self.request("GET", url)
if response.status_code != 200:
raise Exception(
f"Error retrieving data object record for {data_object_id}"
)
data_object_record = response.json()
return data_object_record

def run_query(self, query: dict):
"""
Function to run a query using the Microbiome Data API.
Expand Down
Empty file.
112 changes: 112 additions & 0 deletions nmdc_automation/nmdc_common/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
"""Client for the NMDC API."""
# TODO: move all of this to a separate project nmdc-common. But for now, just
# copy it here.

import logging
import requests
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class NmdcApi:
"""
Basic API Client for GET requests.
"""

def __init__(self, api_base_url):
if not api_base_url.endswith("/"):
api_base_url += "/"
self.base_url = api_base_url
self.headers = {'accept': 'application/json', 'Content-Type': 'application/json'}


def get_biosamples_part_of_study(self, study_id: str) -> list[dict]:
"""
Get the biosamples that are part of a study.
"""
biosample_records = []
params = {
'filter': '{"part_of": "'+study_id+'"}',
'max_page_size': '1000',
}
url = self.base_url + "nmdcschema/biosample_set"
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
biosample_records.extend(response.json()["resources"])
# Get the next page of results, if any
while response.json().get("next_page_token") is not None:
params['page_token'] = response.json()["next_page_token"]
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
biosample_records.extend(response.json()["resources"])


return biosample_records

def get_omics_processing_records_part_of_study(self, study_id: str) -> list[dict]:
"""
Get the OmicsProcessing records that are part of a study.
"""
omics_processing_records = []
params = {
'filter': '{"part_of": "'+study_id+'"}',
'max_page_size': '1000',
}
url = self.base_url + "nmdcschema/omics_processing_set"
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
omics_processing_records.extend(response.json()["resources"])
# Get the next page of results, if any
while response.json().get("next_page_token") is not None:
params['page_token'] = response.json()["next_page_token"]
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
omics_processing_records.extend(response.json()["resources"])
return omics_processing_records

def get_workflow_activities_informed_by(self, workflow_activity_set: str,
informed_by_id: str) -> list[dict]:
"""
Retrieve workflow activity record(s) for the given workflow
activity set and informed by a given OmicsProcessing ID.
"""
params = {
'filter': '{"was_informed_by": "'+informed_by_id+'"}',
}
url = self.base_url + "nmdcschema/" + workflow_activity_set
response = requests.get(url, params=params, headers=self.headers)
logger.info(response.url)
response.raise_for_status()
workflow_activity_record = response.json()["resources"]
return workflow_activity_record

def get_data_object(self, data_object_id: str) -> Optional[dict]:
"""
Retrieve a data object record by ID.
"""
url = self.base_url + "nmdcschema/data_object_set/" + data_object_id
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data_object_record = response.json()
except requests.exceptions.HTTPError as err:
if err.response.status_code == 404:
return None
else:
raise
return data_object_record

def get_data_objects_by_description(self, description: str):
"""
Retrieve data object records by description.
"""
params = {
'filter': '{"description.search": "'+description+'"}',
}
url = self.base_url + "data_objects"
response = requests.get(url, params=params, headers=self.headers)
response.raise_for_status()
data_object_records = response.json()["results"]
return data_object_records
92 changes: 85 additions & 7 deletions nmdc_automation/re_iding/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
from nmdc_automation.re_iding.db_utils import (OMICS_PROCESSING_SET,
READS_QC_SET,
METAGENOME_ASSEMBLY_SET,
METATRANSCRIPTOME_ACTIVITY_SET,
check_for_single_omics_processing_record,
get_data_object_record_by_id,
get_omics_processing_id)
from nmdc_automation.re_iding.file_utils import (find_data_object_type,
compute_new_paths,
get_new_paths,
assembly_file_operations)
compute_new_paths_and_link,
assembly_file_operations)

NAPA_TEMPLATE = "../../../configs/re_iding_worklfows.yaml"
DATA_BASE_URL = "https://data.microbiomedata.org/data"
Expand Down Expand Up @@ -60,6 +60,8 @@ def _workflow_template_for_type(self, workflow_type: str) -> Dict:
workflow_type = workflow_type.replace("QC", "Qc")
if workflow_type == "nmdc:ReadbasedAnalysis":
workflow_type = "nmdc:ReadBasedTaxonomyAnalysisActivity"
if workflow_type == "nmdc:MetaT":
workflow_type = "nmdc:MetatranscriptomeActivity"

for t in self.workflow_template:
type = t["Type"]
Expand Down Expand Up @@ -163,7 +165,7 @@ def update_reads_qc_analysis_activity_set(self, db_record: Dict,
logger.info(f"old_do_id: {old_do_id}")
old_do_rec = get_data_object_record_by_id(db_record, old_do_id)
data_object_type = find_data_object_type(old_do_rec)
new_file_path = compute_new_paths(
new_file_path = compute_new_paths_and_link(
old_do_rec["url"], new_readsqc_base_dir, new_activity_id, self.data_dir
)
logging.info(f"New file path computed for {data_object_type}: {new_file_path}")
Expand Down Expand Up @@ -221,7 +223,7 @@ def update_metagenome_assembly_set(self, db_record: Dict,
data_object_type = find_data_object_type(old_do_rec)
if not data_object_type:
continue
new_file_path = get_new_paths(old_do_rec["url"],new_assembly_base_dir, new_activity_id)
new_file_path = compute_new_paths_and_link(old_do_rec["url"], new_assembly_base_dir, new_activity_id)
updated_md5, updated_file_size = assembly_file_operations(
old_do_rec, data_object_type, new_file_path, new_activity_id,
self.data_dir)
Expand Down Expand Up @@ -284,7 +286,7 @@ def update_read_based_taxonomy_analysis_activity_set(self, db_record: Dict,
data_object_type = find_data_object_type(old_do_rec)
if not data_object_type:
continue
new_file_path = compute_new_paths(
new_file_path = compute_new_paths_and_link(
old_do_rec["url"], new_readbased_base_dir, new_activity_id, self.data_dir
)
logging.info(f"New file path computed for {data_object_type}: {new_file_path}")
Expand Down Expand Up @@ -314,7 +316,81 @@ def update_read_based_taxonomy_analysis_activity_set(self, db_record: Dict,
new_db.read_based_taxonomy_analysis_activity_set.append(new_read_based)

return new_db


def update_metatranscriptome_activity_set(self, db_record: Dict,
new_db: NmdcDatabase) -> (NmdcDatabase):
"""
Return a new Database instance with the metatranscriptome_activity_set
and its data objects updated to new IDs.
"""
logger.info(f"Updating metatranscriptome_activity_set for "
f"{db_record[OMICS_PROCESSING_SET][0]['id']}")
new_omics_processing = new_db.omics_processing_set[0]

for metatranscriptome_rec in db_record[METATRANSCRIPTOME_ACTIVITY_SET]:
# old records have non-conforming type e.g. nmdc:MetaT,
# nmdc:metaT etc. - fix it
activity_type = "nmdc:MetatranscriptomeActivity"
metatranscriptome_rec["type"] = activity_type
omics_processing_id = new_omics_processing.id
has_input = [self._get_input_do_id(new_db, "Filtered Sequencing Reads")]



new_activity_id = self.api_client.minter(activity_type) + "." + self.workflow_iteration
logging.info(f"New activity id created for {omics_processing_id} activity type {activity_type}: {new_activity_id}")
new_metatranscriptome_base_dir = os.path.join(self.data_dir, omics_processing_id,
new_activity_id)
logging.info(f"New metatranscriptome base dir: {new_metatranscriptome_base_dir}")
os.makedirs(new_metatranscriptome_base_dir, exist_ok=True)

updated_has_output = []
# Get Metatranscriptome data objects and update IDs
for old_do_id in metatranscriptome_rec["has_output"]:
logger.info(f"old_do_id: {old_do_id}")
old_do_rec = get_data_object_record_by_id(db_record, old_do_id)
# there are some data objects that are not in the database
if not old_do_rec:
logger.warning(f"Data object record not found for {old_do_id}")
continue

data_object_type = find_data_object_type(old_do_rec)
logging.info(f"data_object_type: {data_object_type}")
# TODO: how do we handle data objects w/o type?
if not data_object_type:
logger.warning(f"Data object type not found for {old_do_id}")
# continue
# link data object to new location
new_file_path = compute_new_paths_and_link(
old_do_rec["url"], new_metatranscriptome_base_dir, new_activity_id, self.data_dir)
logging.info(f"New file path computed for {data_object_type}: {new_file_path}")

new_do = self.make_new_data_object(
omics_processing_id, activity_type, new_activity_id, old_do_rec, data_object_type
)
# add new data object to new database and update has_output
new_db.data_object_set.append(new_do)
updated_has_output.append(new_do.id)

# Get new Metatranscriptome activity set
new_metatranscriptome = self._make_new_activity_set_object(
omics_processing_id, new_activity_id, metatranscriptome_rec, has_input,
updated_has_output
)
# update activity-specific properties
# get new_metatranscriptome properties with no set value
unset_properties = [
p for p in new_metatranscriptome.__dict__ if not new_metatranscriptome.__dict__[p]
]
# check for that value in old record
for p in unset_properties:
if p in metatranscriptome_rec:
setattr(new_metatranscriptome, p, metatranscriptome_rec[p])

new_db.metatranscriptome_activity_set.append(new_metatranscriptome)
return new_db


def _get_input_do_id(self, new_db, data_object_type: str):
"""Returns the string representation of a data object id given data object type"""

Expand All @@ -334,6 +410,8 @@ def _make_new_activity_set_object(self, omics_processing_id: str, new_activity_i
activity_type = activity_set_rec["type"].replace("QC", "Qc")
if activity_type == "nmdc:ReadbasedAnalysis":
activity_type = "nmdc:ReadBasedTaxonomyAnalysisActivity"
if activity_type == "nmdc:MetaT":
activity_type = "nmdc:MetatranscriptomeActivity"
template = self._workflow_template_for_type(activity_type)
activity_class = getattr(nmdc, template["ActivityRange"])

Expand Down
Loading

0 comments on commit 80c8aa7

Please sign in to comment.