Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: rename study --> study statement / statement #405

Merged
merged 14 commits into from
Nov 20, 2024
Merged
133 changes: 67 additions & 66 deletions src/metakb/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def _add_mappings_and_exts_to_obj(obj: dict, obj_keys: list[str]) -> None:
obj_keys.append(f"{name}:${name}")


def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str]) -> None:
def _add_method(tx: ManagedTransaction, method: dict, ids_in_stmts: set[str]) -> None:
"""Add Method node and its relationships to DB

:param tx: Transaction object provided to transaction functions
:param method: CDM method object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
if method["id"] not in ids_in_studies:
if method["id"] not in ids_in_stmts:
return

query = """
Expand All @@ -80,7 +80,7 @@ def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str])
# Method's documents are unique and do not currently have IDs
# They also only have one document
document = is_reported_in[0]
_add_document(tx, document, ids_in_studies)
_add_document(tx, document, ids_in_stmts)
doc_doi = document["doi"]
query += f"""
MERGE (d:Document {{ doi:'{doc_doi}' }})
Expand All @@ -91,16 +91,16 @@ def _add_method(tx: ManagedTransaction, method: dict, ids_in_studies: set[str])


def _add_gene_or_disease(
tx: ManagedTransaction, obj_in: dict, ids_in_studies: set[str]
tx: ManagedTransaction, obj_in: dict, ids_in_stmts: set[str]
) -> None:
"""Add gene or disease node and its relationships to DB

:param tx: Transaction object provided to transaction functions
:param obj_in: CDM gene or disease object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
:raises TypeError: When `obj_in` is not a disease or gene
"""
if obj_in["id"] not in ids_in_studies:
if obj_in["id"] not in ids_in_stmts:
return

obj = obj_in.copy()
Expand Down Expand Up @@ -129,16 +129,16 @@ def _add_gene_or_disease(
def _add_therapeutic_procedure(
tx: ManagedTransaction,
therapeutic_procedure: dict,
ids_in_studies: set[str],
ids_in_stmts: set[str],
) -> None:
"""Add therapeutic procedure node and its relationships

:param tx: Transaction object provided to transaction functions
:param therapeutic_procedure: Therapeutic procedure CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
:raises TypeError: When therapeutic procedure type is invalid
"""
if therapeutic_procedure["id"] not in ids_in_studies:
if therapeutic_procedure["id"] not in ids_in_stmts:
return

tp = therapeutic_procedure.copy()
Expand Down Expand Up @@ -264,15 +264,15 @@ def _add_variation(tx: ManagedTransaction, variation_in: dict) -> None:
def _add_categorical_variant(
tx: ManagedTransaction,
categorical_variant_in: dict,
ids_in_studies: set[str],
ids_in_stmts: set[str],
) -> None:
"""Add categorical variant objects to DB.

:param tx: Transaction object provided to transaction functions
:param categorical_variant_in: Categorical variant CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
if categorical_variant_in["id"] not in ids_in_studies:
if categorical_variant_in["id"] not in ids_in_stmts:
return

cv = categorical_variant_in.copy()
Expand Down Expand Up @@ -311,19 +311,19 @@ def _add_categorical_variant(


def _add_document(
tx: ManagedTransaction, document_in: dict, ids_in_studies: set[str]
tx: ManagedTransaction, document_in: dict, ids_in_stmts: set[str]
) -> None:
"""Add Document object to DB.

:param tx: Transaction object provided to transaction functions
:param document: Document CDM object
:param ids_in_studies: IDs found in studies
:param ids_in_stmts: IDs found in statements
"""
# Not all document's have IDs. These are the fields that can uniquely identify
# a document
if "id" in document_in:
query = "MATCH (n:Document {id:$id}) RETURN n"
if document_in["id"] not in ids_in_studies:
if document_in["id"] not in ids_in_stmts:
return
elif "doi" in document_in:
query = "MATCH (n:Document {doi:$doi}) RETURN n"
Expand Down Expand Up @@ -351,81 +351,81 @@ def _add_document(
tx.run(query, **document)


def _get_ids_from_studies(studies: list[dict]) -> set[str]:
"""Get unique IDs from studies
def _get_ids_from_stmts(statements: list[dict]) -> set[str]:
"""Get unique IDs from statements

:param studies: List of studies
:return: Set of IDs found in studies
:param statements: List of statements
:return: Set of IDs found in statements
"""

def _add_obj_id_to_set(obj: dict, ids_set: set[str]) -> None:
"""Add object id to set of IDs

:param obj: Object to get ID for
:param ids_set: IDs found in studies. This will be mutated.
:param ids_set: IDs found in statements. This will be mutated.
"""
obj_id = obj.get("id")
if obj_id:
ids_set.add(obj_id)

ids_in_studies = set()
ids_in_stmts = set()

for study in studies:
for statement in statements:
for obj in [
study.get("specifiedBy"), # method
study.get("reportedIn"),
study.get("subjectVariant"),
study.get("objectTherapeutic"),
study.get("conditionQualifier"),
study.get("geneContextQualifier"),
statement.get("specifiedBy"), # method
statement.get("reportedIn"),
statement.get("subjectVariant"),
statement.get("objectTherapeutic"),
statement.get("conditionQualifier"),
statement.get("geneContextQualifier"),
]:
if obj:
if isinstance(obj, list):
for item in obj:
_add_obj_id_to_set(item, ids_in_studies)
_add_obj_id_to_set(item, ids_in_stmts)
else: # This is a dictionary
_add_obj_id_to_set(obj, ids_in_studies)
_add_obj_id_to_set(obj, ids_in_stmts)

return ids_in_studies
return ids_in_stmts


def _add_study(tx: ManagedTransaction, study_in: dict) -> None:
"""Add study node and its relationships
def _add_statement(tx: ManagedTransaction, statement_in: dict) -> None:
"""Add statement node and its relationships

:param tx: Transaction object provided to transaction functions
:param study_in: Statement CDM object
:param statement_in: Statement CDM object
"""
study = study_in.copy()
study_type = study["type"]
study_keys = _create_parameterized_query(
study, ("id", "description", "direction", "predicate", "type")
statement = statement_in.copy()
statement_type = statement["type"]
statement_keys = _create_parameterized_query(
statement, ("id", "description", "direction", "predicate", "type")
)

match_line = ""
rel_line = ""

is_reported_in_docs = study.get("reportedIn", [])
is_reported_in_docs = statement.get("reportedIn", [])
for ri_doc in is_reported_in_docs:
ri_doc_id = ri_doc["id"]
name = f"doc_{ri_doc_id.split(':')[-1]}"
match_line += f"MERGE ({name} {{ id: '{ri_doc_id}'}})\n"
rel_line += f"MERGE (s) -[:IS_REPORTED_IN] -> ({name})\n"

allele_origin = study.get("alleleOriginQualifier")
allele_origin = statement.get("alleleOriginQualifier")
if allele_origin:
study["alleleOriginQualifier"] = allele_origin
statement["alleleOriginQualifier"] = allele_origin
match_line += "SET s.alleleOriginQualifier=$alleleOriginQualifier\n"

gene_context_id = study.get("geneContextQualifier", {}).get("id")
gene_context_id = statement.get("geneContextQualifier", {}).get("id")
if gene_context_id:
match_line += f"MERGE (g:Gene {{id: '{gene_context_id}'}})\n"
rel_line += "MERGE (s) -[:HAS_GENE_CONTEXT] -> (g)\n"

method_id = study["specifiedBy"]["id"]
method_id = statement["specifiedBy"]["id"]
match_line += f"MERGE (m {{ id: '{method_id}' }})\n"
rel_line += "MERGE (s) -[:IS_SPECIFIED_BY] -> (m)\n"

coding = study.get("strength")
coding = statement.get("strength")
if coding:
coding_key_fields = ("code", "label", "system")

Expand All @@ -435,75 +435,76 @@ def _add_study(tx: ManagedTransaction, study_in: dict) -> None:
for k in coding_key_fields:
v = coding.get(k)
if v:
study[f"coding_{k}"] = v
statement[f"coding_{k}"] = v

match_line += f"MERGE (c:Coding {{ {coding_keys} }})\n"
rel_line += "MERGE (s) -[:HAS_STRENGTH] -> (c)\n"

variant_id = study["subjectVariant"]["id"]
variant_id = statement["subjectVariant"]["id"]
match_line += f"MERGE (v:Variation {{ id: '{variant_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_VARIANT] -> (v)\n"

therapeutic_id = study["objectTherapeutic"]["id"]
therapeutic_id = statement["objectTherapeutic"]["id"]
match_line += f"MERGE (t:TherapeuticProcedure {{ id: '{therapeutic_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_THERAPEUTIC] -> (t)\n"

tumor_type_id = study["conditionQualifier"]["id"]
tumor_type_id = statement["conditionQualifier"]["id"]
match_line += f"MERGE (tt:Condition {{ id: '{tumor_type_id}' }})\n"
rel_line += "MERGE (s) -[:HAS_TUMOR_TYPE] -> (tt)\n"

query = f"""
MERGE (s:{study_type}:StudyStatement:Statement {{ {study_keys} }})
MERGE (s:{statement_type}:StudyStatement:Statement {{ {statement_keys} }})
{match_line}
{rel_line}
"""

tx.run(query, **study)
tx.run(query, **statement)


def add_transformed_data(driver: Driver, data: dict) -> None:
"""Add set of data formatted per Common Data Model to DB.

:param data: contains key/value pairs for data objects to add to DB, including
studies, variation, therapeutic procedures, conditions, genes, methods,
statements, variation, therapeutic procedures, conditions, genes, methods,
documents, etc.
"""
# Used to keep track of IDs that are in studies. This is used to prevent adding
# nodes that aren't associated to studies
ids_in_studies = _get_ids_from_studies(data.get("studies", []))
# Used to keep track of IDs that are in statements. This is used to prevent adding
# nodes that aren't associated to statements
statements = data.get("statements", [])
ids_in_stmts = _get_ids_from_stmts(statements)

with driver.session() as session:
loaded_study_count = 0
loaded_stmt_count = 0

for cv in data.get("categorical_variants", []):
session.execute_write(_add_categorical_variant, cv, ids_in_studies)
session.execute_write(_add_categorical_variant, cv, ids_in_stmts)

for doc in data.get("documents", []):
session.execute_write(_add_document, doc, ids_in_studies)
session.execute_write(_add_document, doc, ids_in_stmts)

for method in data.get("methods", []):
session.execute_write(_add_method, method, ids_in_studies)
session.execute_write(_add_method, method, ids_in_stmts)

for obj_type in {"genes", "conditions"}:
for obj in data.get(obj_type, []):
session.execute_write(_add_gene_or_disease, obj, ids_in_studies)
session.execute_write(_add_gene_or_disease, obj, ids_in_stmts)

for tp in data.get("therapeutic_procedures", []):
session.execute_write(_add_therapeutic_procedure, tp, ids_in_studies)
session.execute_write(_add_therapeutic_procedure, tp, ids_in_stmts)

# This should always be done last
for study in data.get("studies", []):
session.execute_write(_add_study, study)
loaded_study_count += 1
for statement in statements:
session.execute_write(_add_statement, statement)
loaded_stmt_count += 1

_logger.info("Successfully loaded %s studies.", loaded_study_count)
_logger.info("Successfully loaded %s statements.", loaded_stmt_count)


def load_from_json(src_transformed_cdm: Path, driver: Driver | None = None) -> None:
"""Load evidence into DB from given CDM JSON file.

:param src_transformed_cdm: path to file for a source's transformed data to
common data model containing studies, variation, therapeutic procedures,
common data model containing statements, variation, therapeutic procedures,
conditions, genes, methods, documents, etc.
:param driver: Neo4j graph driver, if available
"""
Expand Down
Loading