-
Notifications
You must be signed in to change notification settings - Fork 39
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: generate diplotype comparison data for clinical outcomes
- Loading branch information
Showing
7 changed files
with
1,340 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
colorama >= 0.4.6 | ||
pandas >= 1.5.1 | ||
pandas >= 2.1.3 | ||
scikit-allel >= 1.3.7 | ||
packaging~=21.3 |
174 changes: 174 additions & 0 deletions
174
src/scripts/diplotype_comparison/compare_diplotype_definition.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
#!/usr/bin/env python3 | ||
|
||
__author__ = 'BinglanLi' | ||
|
||
import sys | ||
from glob import glob | ||
from pathlib import Path | ||
from timeit import default_timer as timer | ||
|
||
import pandas as pd | ||
|
||
import utilities as util | ||
|
||
|
||
if __name__ == "__main__": | ||
import argparse | ||
|
||
# describe the tool | ||
parser = argparse.ArgumentParser(description='Identify overlapping diplotypes in unphased genetic data.') | ||
|
||
# input arguments | ||
parser.add_argument("-i", "--input", | ||
type=str, | ||
required=False, | ||
help="File name or the directory to the PharmCAT JSONs of allele definitions.") | ||
# input arguments | ||
parser.add_argument("-m", "--missing", | ||
type=bool, | ||
required=False, | ||
default=False, | ||
help="To evaluate diplotypes with missing positions, True or False.") | ||
|
||
# output args | ||
parser.add_argument("-o", "--output-dir", | ||
type=str, | ||
required=False, | ||
metavar='<dir>', | ||
help="(Optional) directory for outputs. Defaults to the current working directory.") | ||
parser.add_argument("-bf", "--base-filename", | ||
type=str, | ||
required=False, | ||
default='predicted_pharmcat_calls', | ||
help="(Optional) base filename of the output file.") | ||
|
||
parser.add_argument('-c', '--clinical-outcome', | ||
action='store_true', | ||
required=False, | ||
help='Generate data for clinical outcome') | ||
|
||
# parse arguments | ||
args = parser.parse_args() | ||
|
||
try: | ||
# define output file name | ||
m_output_basename: str = args.base_filename | ||
# define output file path | ||
m_output_dir: Path = Path(args.output_dir) if args.output_dir else Path.cwd() | ||
if not args.output_dir: | ||
print(f'Output directory is not specified.\n' | ||
f'\tUse the current working directory: {m_output_dir}') | ||
|
||
# define input | ||
m_input: Path | ||
if args.input: | ||
m_input = Path(args.input).absolute() | ||
else: | ||
script_dir: Path = Path(globals().get("__file__", "./_")).absolute().parent | ||
is_repo: bool = script_dir.absolute().as_posix().endswith("scripts/diplotype_comparison") | ||
if is_repo: | ||
m_input = script_dir.parent.parent / 'main/resources/org/pharmgkb/pharmcat/definition/alleles' | ||
print(f'Running in repo.') | ||
else: | ||
m_input = Path.cwd() | ||
print(f'No input provided. ' | ||
f'Looking for the allele definition JSON files under the current working directory: {m_input}') | ||
allele_definition_jsons: list = [] | ||
if m_input.is_file(): | ||
allele_definition_jsons = [m_input] | ||
elif m_input.is_dir(): | ||
print(f'Looking for the allele definition JSON files under {m_input}') | ||
# get the list allele definition files | ||
allele_definition_jsons = glob(str(m_input.joinpath('*_translation.json'))) | ||
# check whether there are any json files | ||
if len(allele_definition_jsons) == 0: | ||
print(f'No allele definition JSON files are found under {m_input}') | ||
sys.exit(1) | ||
|
||
if args.missing: | ||
m_output_file: Path = m_output_dir / (m_output_basename + '_missing.tsv') | ||
else: | ||
m_output_file: Path = m_output_dir / (m_output_basename + '.tsv') | ||
# delete file | ||
m_output_file.unlink(missing_ok=True) | ||
print() | ||
if args.clinical_outcome: | ||
print(f'Generating clinical outcome data') | ||
print(f'Saving to {m_output_file}\n') | ||
|
||
# process each gene | ||
for json_file in allele_definition_jsons: | ||
# read the json file | ||
start = timer() | ||
filename: str = json_file.split('/')[-1] | ||
# get the gene name | ||
gene: str = filename.split('_')[0] | ||
|
||
if args.clinical_outcome and (gene == 'CYP2D6' or gene == 'DPYD'): | ||
continue | ||
|
||
# read files | ||
print(f'Processing {filename}') | ||
json_data: dict = util.read_json(json_file) | ||
|
||
# get necessary information of the allele-defining positions, including hgvs names, positions, references | ||
allele_defining_variants: dict = util.get_allele_defining_variants(json_data) | ||
|
||
# read in the list of alleles, allele-defining positions, and defining genotypes | ||
allele_definitions = util.get_allele_definitions(json_data, allele_defining_variants) | ||
|
||
# find all possible outcomes of alternative calls for each diplotype | ||
dict_predicted_calls = dict() | ||
if args.missing: | ||
if gene == 'CYP2D6': | ||
print(f'\tSkipping - too complex') | ||
continue | ||
|
||
# identify the combinations of missing positions to evaluate | ||
missing_combinations = util.find_missingness_combinations(allele_defining_variants, allele_definitions) | ||
|
||
# if the gene does not have positions whose absence will cause ambiguous pharmcat calls, then skip | ||
if not missing_combinations: | ||
print(f'\tSkipping - no ambiguous calls') | ||
continue | ||
else: | ||
print(f'\tNumber of combinations = {len(missing_combinations)}') | ||
for m in missing_combinations: | ||
# find possible calls for each missing position | ||
dict_predicted_calls_m = util.predict_pharmcat_calls(allele_defining_variants, | ||
allele_definitions, m) | ||
|
||
# append to dict_predicted_calls | ||
for key in dict_predicted_calls_m: | ||
dict_predicted_calls[key] = dict_predicted_calls.get(key, []) + dict_predicted_calls_m[key] | ||
else: | ||
dict_predicted_calls = util.predict_pharmcat_calls(allele_defining_variants, allele_definitions) | ||
|
||
# convert the python dictionary to a pandas data frame for output | ||
df_predicted_calls = pd.DataFrame.from_dict(dict_predicted_calls) | ||
# add gene name to the data frame for readability in the output | ||
df_predicted_calls['gene'] = gene | ||
# write to an output | ||
if len(df_predicted_calls): | ||
if args.clinical_outcome: | ||
rows = df_predicted_calls[df_predicted_calls['actual'].str.contains(';')] | ||
cols = ['gene', 'actual'] | ||
else: | ||
rows = df_predicted_calls | ||
cols = rows.columns | ||
if m_output_file.is_file(): | ||
mode = 'a' | ||
header = False | ||
else: | ||
mode = 'w' | ||
header = True | ||
rows.to_csv(m_output_file.absolute(), mode=mode, sep="\t", columns=cols, header=header, index=False) | ||
else: | ||
print(f'\tNo alternative calls') | ||
|
||
# show processing time | ||
end = timer() | ||
print(f'\tFinished in %.2f seconds' % (end - start)) | ||
except Exception as e: | ||
print(e) | ||
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
#!/usr/bin/env python3 | ||
|
||
__author__ = 'BinglanLi' | ||
|
||
import re | ||
import pandas as pd | ||
from pathlib import Path | ||
from glob import glob | ||
|
||
import utilities as util | ||
|
||
# read reference predictions | ||
reference_file_dir: Path = Path(globals().get("__file__", "./_")).absolute().parent | ||
reference_file_pattern: Path = Path('predicted_pharmcat_calls_*tsv') | ||
reference_files: list[str] = glob(str(reference_file_dir.joinpath(reference_file_pattern))) | ||
reference_predictions = pd.DataFrame() | ||
# iteratively read predictions | ||
for file in reference_files: | ||
# read the file | ||
reference_prediction_i: pd.DataFrame = pd.read_csv(file, delimiter='\t') | ||
# if no content, continue to the next file | ||
if len(reference_prediction_i) == 0: | ||
continue | ||
|
||
# concatenate the reference predictions to the summary data frame | ||
reference_predictions = pd.concat([reference_predictions, reference_prediction_i], ignore_index=True) | ||
# replace "NaN" values with '' | ||
reference_predictions = reference_predictions.fillna('') | ||
|
||
|
||
# load alleles from pharmcat definition json files | ||
json_dir: str = '../../main/resources/org/pharmgkb/pharmcat/definition/alleles/' | ||
json_file_pattern: str = '*_translation.json' | ||
allele_definition_jsons: list[str] = glob(json_dir + json_file_pattern) | ||
# set up an empty dictionary | ||
allele_definitions = dict() | ||
for json_file in allele_definition_jsons: | ||
# get the gene name | ||
gene: str = json_file.split('/')[-1].split('_')[0] | ||
# read the json file | ||
print(f'Processing {json_file}') | ||
json_data: dict = util.read_json(Path(json_file)) | ||
|
||
# get allele list | ||
allele_list: list[str] = [entry['name'] for entry in json_data['namedAlleles']] | ||
|
||
# add to dictionary | ||
allele_definitions[gene] = allele_list | ||
|
||
|
||
# read in test files | ||
test_dir: Path = Path('~/Downloads/autogeneratedTestResults_without_missing/').expanduser() | ||
test_file_pattern: str = 'autogenerated_test_report.tsv' | ||
test_file: str = str(test_dir) + '/' + test_file_pattern | ||
autogenerated_tests: pd.DataFrame = pd.read_csv(test_file, delimiter='\t') | ||
# replace "NaN" values with '' | ||
autogenerated_tests = autogenerated_tests.fillna('') | ||
|
||
# format the autogenerated test data frame - fix the expected diplotype | ||
failed_tests = pd.DataFrame(columns=autogenerated_tests.columns) | ||
for i in range(len(autogenerated_tests)): | ||
# identify the gene | ||
gene: str = autogenerated_tests.loc[i, 'Gene'] | ||
|
||
# fix the Expected diplotype | ||
# get the list of diplotypes in a haplotype | ||
test_diplotype: str = autogenerated_tests.loc[i, 'Expected'] | ||
# order the haplotypes and diplotype | ||
ordered_test_haplotypes: list[str] = util.order_list(test_diplotype.split('/'), allele_definitions[gene]) | ||
ordered_test_diplotype: str = '/'.join(ordered_test_haplotypes) | ||
|
||
# get the diplotype(s) in the Actual column | ||
test_actual: set[str] = set(autogenerated_tests.loc[i, 'Actual'].split(', ')) | ||
# get the diplotype(s) in the Alt column | ||
test_alt_str: str = re.sub(r' \(\d+\)(, )?', ';', autogenerated_tests.loc[i, 'Alt']) | ||
test_alt: set[str] = set(test_alt_str.split(';')) | ||
test_alt.add('') # add '' so that accidental '' introduced by splitting will not affect comparison | ||
|
||
# identify predictions for the Expected diplotype | ||
conditions = (reference_predictions['gene'] == gene) & \ | ||
(reference_predictions['expected'] == ordered_test_diplotype) | ||
matches = reference_predictions.loc[conditions].reset_index() | ||
|
||
# check whether alternative diplotypes match with the predictions | ||
n_matches: int = len(matches) | ||
# initiate a variable to determine whether the alternative calls match with any of the predictions | ||
has_matching_alt: bool = False | ||
# iterate over predictions | ||
for j in range(n_matches): | ||
# get the list of actual calls based on diplotype definitions | ||
predicted_actual: set[str] = set(matches.loc[j, 'actual'].split(';')) | ||
# get the list of alternative calls based on diplotype definitions | ||
predicted_alt: set[str] = set(matches.loc[j, 'alternative'].split(';')) | ||
predicted_alt.add('') # add '' so that accidental '' introduced by splitting will not affect comparison | ||
|
||
# check whether the alternative calls are the same between the test and the prediction | ||
if predicted_actual == test_actual and test_alt == predicted_alt: | ||
has_matching_alt = True | ||
break | ||
# skip or add to failed tests | ||
if not has_matching_alt: | ||
failed_tests = pd.concat([failed_tests, autogenerated_tests.loc[i].to_frame().T], axis=0, ignore_index=True) | ||
|
||
|
||
# output failed tests | ||
output_dir: str = '/Users/binglan/Documents/GitHub/PharmCAT/src/scripts/diplotype_comparison/' | ||
output_prefix: str = 'failed_tests' | ||
failed_tests.to_csv(output_dir + output_prefix + '.tsv', sep="\t", index=False) |
Oops, something went wrong.