-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathOffTarget.py
194 lines (174 loc) · 7.22 KB
/
OffTarget.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
"""This file runs the off target analysis for CASPER. Use the CASPEROfflist.txt file to set up the information you want
to run with this program.
WARNING: Running this protocol on a large number of sequences is unwise and may take significant computing power/time."""
# -------------------------------------- USER CAN IGNORE CODE BELOW THIS LINE ---------------------------------------- #
import os, sys, math, datetime
from Bio import Seq
from Bio.Seq import Seq
from Algorithms import SeqTranslate
from Bio.Alphabet import IUPAC
class OffTargetAlgorithm:
def __init__(self, threshold, endo, base_org, csf_file, other_orgs, casperofflist, output_path):
self.ST = SeqTranslate()
self.rSequences = []
self.get_rseqs(casperofflist)
self.mypath = csf_file[:csf_file.find(base_org)]
self.ref_genomes = [base_org]
self.ref_genomes += other_orgs
self.endo = endo
self.threshold = threshold
self.dSequence = str() # global to class so that all scoring functions can use it
# This is for autofilling the HsuMatrix
self.matrixKeys = ["GT", "AC", "GG", "TG", "TT", "CA", "CT", "GA", "AA", "AG", "TC", "CC"]
self.matrix = {}
self.fill_matrix()
# This is where the data is stored before it is written
self.output_data = dict()
for myseq in self.rSequences:
self.output_data[myseq[0]] = list()
# BEGIN RUNNING THROUGH SEQUENCES
for sequence in self.rSequences:
print(sequence)
for genome in self.ref_genomes:
f = open(self.mypath + genome + self.endo + ".cspr", 'r')
while True:
line = f.readline()
if line.find("CHROMOSOME") != -1:
curchrom = line[line.find("#") + 1:-1]
print("Finished checking " + curchrom)
else:
if line[0:-1] == "REPEATS":
break
# Checks for a signifcant number of mismatches:
#locseq = line[:-1].split(",")
if self.critical_similarity(sequence[0], self.ST.decompress_csf_tuple(line)[1]):
# This is where the real fun begins: off target analysis
print('found a similarity')
seqscore = self.get_scores(sequence[1],self.ST.decompress_csf_tuple(line)[1])
if seqscore > self.threshold:
self.output_data[sequence[0]].append((str(curchrom),
self.ST.decompress_csf_tuple(line[:-1]),
int(seqscore*100), genome))
# END SEQUENCES RUN
# Output the data acquired:
out = open(output_path + "off_results" + str(datetime.datetime.now().time()) + '.txt', 'w')
out.write("Off-target sequences identified. Scores are between O and 1. A higher value indicates greater"
"probability of off-target activity at that location.\n")
for sequence in self.output_data:
out.write(sequence + "\n")
for off_target in self.output_data[sequence]:
outloc = off_target[0] + "," + str(off_target[1][0]) + "," + off_target[1][1]
out.write(off_target[3] + "," + outloc + "\t" + str(off_target[2]/100) + '\n')
out.close()
def get_rseqs(self, offlist):
targets = list()
cofile = open(offlist, 'r')
cofile.readline()
while True:
t = cofile.readline()[:-1]
if t == 'EN':
break
targets.append(t)
for tar in targets:
compseed = self.ST.compress(tar[:16], 64)
comptail = self.ST.compress(tar[16:], 64)
compressed = compseed + "." + comptail
rseq = ""
for nt in tar[0:-1]:
rseq = nt + rseq
self.rSequences.append([tar, rseq])
def get_scores(self,rseq, dseq):
self.dSequence = Seq(dseq, IUPAC.unambiguous_dna).reverse_complement()
hsu = self.get_hsu_score(rseq)
qual = self.get_qualt_score(rseq)
step = self.qualt_step_score(rseq)
output = ((math.sqrt(hsu) + step) + pow(qual, 6))
return output
def fill_matrix(self):
f = open('CASPERinfo', 'r')
l = " "
while True:
l = f.readline()
if l[0] == "H":
break
i = 0
l = f.readline()
while l[0] != '-':
values = l.split("\t")
self.matrix[self.matrixKeys[i]] = values
i += 1
l = f.readline()
for element in self.matrix:
self.matrix[element][18] = self.matrix[element][18][0:-1]
def get_hsu_score(self, rSequence):
score = 1.0
for i in range(0,19):
rnt = rSequence[i]
dnt = self.dSequence[i]
lookup = str(rnt) + str(dnt)
if lookup in self.matrixKeys:
hsu = self.matrix[lookup][18-i]
score *= float(hsu)
return score
def get_qualt_score(self, rSequence):
score = 3.5477
for i in range(0, 19):
lookup = rSequence[i] + self.dSequence[i]
if lookup in self.matrixKeys:
score -= 1.0/(i+1)
return score/3.5477
def qualt_step_score(self, rSequence):
score = 1.0
for i in range(0, 19):
lookup = rSequence[i] + self.dSequence[i]
if lookup in self.matrixKeys:
if i < 6:
score -= 0.1
elif i < 12:
score -= 0.05
elif i < 20:
score -= 0.0125
return score
def separation_score(self, rSequence):
misses = []
delta = 0
for i in range(0, 19):
lookup = rSequence[i] + self.dSequence[i]
if lookup in self.matrixKeys:
misses.append(i)
if len(misses) == 2:
delta = (misses[1] - misses[0])/2.0
if len(misses) == 3:
delta = ((misses[1] - misses[0]) + (misses[2] - misses[1]))/3.0
if len(misses) == 4:
delta = ((misses[1] - misses[0]) + (misses[2] - misses[1]))/3.0
retval = 1.0 - (delta/19.0)
return retval
# If there is more than four mismatches it returns false, else it will return true
def critical_similarity(self, cseq1, cseq2):
mismatches = 0
lim = min([len(cseq1), len(cseq2)])
check = True
for i in range(lim): # Doesn't matter whether you use cseq1 or cseq2 they are the same length
if cseq1[i] != cseq2[i]:
mismatches += 1
if mismatches == 5:
check = False
break
return check
def int_to_char(self, i):
switcher = {
0: 'A',
1: 'T',
2: 'C',
3: 'G'
}
return switcher[i]
def char_to_int(self, c):
switcher = {
'A': 0,
'T': 1,
'C': 2,
'G': 3
}
return switcher[c]