This repository has been archived by the owner on Mar 12, 2022. It is now read-only.
forked from rvolden/Mandalorion-Episode-II
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsensus.py
131 lines (120 loc) · 4.25 KB
/
consensus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# Roger Volden
'''
Take a FASTA file with aligned reads and a FASTQ file to make a consensus
based on quality as well as base frequency at certain positions.
FASTA file with aligned reads can also come as .fasta from EMBOSS needle.
Usage: python3 consensus.py aligned.fastq reads.fastq >consensus.fasta
'''
import sys
def consensus(sequences, qualityDict):
'''
Makes a consensus sequence based on base frequency and quality.
sequences: list of aligned sequences.
qualityDict: dictionary of sequences : quality scores.
Returns a consensus sequence.
'''
consensus = ''
seqA, seqB = sequences[0], sequences[1]
seqAq, seqBq = qualityDict[seqA.replace('-', '')], qualityDict[seqB.replace('-', '')]
seqAqual = normalizeLen(seqA, seqAq)
seqBqual = normalizeLen(seqB, seqBq)
i = 0
while i != len(seqA): # iterate by position
if seqA[i] == seqB[i]: # match
consensus += seqA[i]
if seqA[i] != seqB[i] and seqA[i] != '-' and seqB[i] != '-': # mismatch
if ord(seqAqual[i]) > ord(seqBqual[i]):
consensus += seqA[i]
else:
consensus += seqB[i]
if seqA[i] == '-' or seqB[i] == '-': # gap to bases
gapLen = 1 # where to start gap chunk
if seqA[i] == '-': # which seq to check
gapSeq = seqA
else:
gapSeq = seqB
try:
while gapSeq[i + gapLen] == '-': # extend to length of gap
gapLen += 1
except IndexError: # if gap at end
gapLen = 1
if avgQual(seqAqual, i, gapLen) > avgQual(seqBqual, i, gapLen):
consensus += seqA[i:i+gapLen]
else:
consensus += seqB[i:i+gapLen]
i += gapLen
continue
i += 1
print('>consensus')
print(consensus.replace('-', ''))
def avgQual(qual, i, gapLen):
'''Returns average quality of a segment.'''
return sum(ord(x) for x in list(qual[i:i+gapLen]))/gapLen
def normalizeLen(seq, quality):
'''
Inserts avg quality scores based on surrounding quality scores
where there are gaps in the sequence.
Returns a new quality string that's the same len as the sequence.
'''
seqIndex, qualIndex = 0, 0
newQuality = ''
while qualIndex + 1 != len(quality):
if seq[seqIndex] != '-':
newQuality += quality[qualIndex]
qualIndex += 1
seqIndex += 1
if seq[seqIndex] == '-':
newQuality += chr(int((ord(quality[qualIndex-1]) + ord(quality[qualIndex]))/2))
seqIndex += 1
newQuality += quality[-1]
if len(seq) != len(newQuality):
gapLen = 0
while seq[-1-gapLen] == '-':
newQuality += newQuality[-1]
gapLen += 1
return newQuality
def fastaReader(inFile):
'''Reads in FASTA files.'''
tempSeqs, headers, sequences = [], [], []
for line in inFile:
line = line.rstrip()
if not line:
continue
if line.startswith('>'):
headers.append(line.split()[0][1:])
# covers the case where the file ends while reading sequences
if line.startswith('>'):
sequences.append(''.join(tempSeqs).upper())
tempSeqs = []
else:
tempSeqs.append(line)
sequences.append(''.join(tempSeqs).upper())
return sequences[1:]
def fastqReader(inFile):
'''Reads in FASTQ files. Only returns sequence and quality lines.'''
sequences, quality, lineNum = [], [], 0
for line in inFile:
line = line.rstrip()
if not line:
continue
# sequences
if lineNum % 4 == 1:
sequences.append(line)
# quality lines
elif lineNum % 4 == 3:
quality.append(line)
lineNum += 1
return sequences, quality
def main():
'''
Reads in files and organizes information.
Calls consensus, which actually builds the consensus sequence.
'''
alignedSeqs = fastaReader(open(sys.argv[1]))
fastqSeqs, qualityScores = fastqReader(open(sys.argv[2]))
seqDict = {} # seq:quality
for i in range(len(fastqSeqs)):
seqDict[fastqSeqs[i]] = qualityScores[i]
consensus(alignedSeqs, seqDict)
main()