-
Notifications
You must be signed in to change notification settings - Fork 1
/
fastq.py
162 lines (121 loc) · 3.31 KB
/
fastq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import os,gzip,random,sys
def write_fastq(fq,f):
'''
write fastq record
'''
f.write('@' + fq['header'] + '\n')
f.write(fq['seq'] + '\n')
f.write('+\n')
f.write(fq['qual'] + '\n')
def random_fastq(fname):
'''
return a (more or less) random record
'''
size = os.stat(fname).st_size
while True:
f = open(fname,'rb')
f.seek(random.randint(0,size))
find_record(f)
fq = read_fastq(f)
if fq != None:
return fq
def read_fastq(f):
'''
read next fastq record
file must be at start of record or end of file
returns record or None
'''
header = f.readline()
if header == '': return None #eof
assert header[0] == '@'
header = header[1:].strip()
seq = f.readline()
assert seq != ''
seq = seq.strip()
redundant = f.readline()
assert redundant[0] == '+'
qual = f.readline()
assert qual != ''
qual = qual.strip()
rec = {}
rec['header'] = header
rec['seq'] = seq
rec['qual'] = qual
return rec
def next_fastq(fname):
'''
generator to yield next fastq sequence
'''
if fname == sys.stdin:
f = sys.stdin
elif fname.endswith('.gz'):
#from subprocess import Popen, PIPE
#f = Popen(['zcat', fname], stdout=PIPE).stdout #not significantly faster
f = gzip.open(fname,'rb')
else:
f = open(fname,'rb')
while True:
rec = read_fastq(f)
#end of file
if rec == None: break
yield rec
f.close()
def next_fastqs(fname1,fname2):
'''
iterate through a pair of matched fastqs
'''
gen1 = next_fastq(fname1)
gen2 = next_fastq(fname2)
while True:
try:
fq1 = gen1.next()
except:
break
fq2 = gen2.next()
yield fq1,fq2
def next_fastq_split(fname,offset,chunks):
'''
generator to yield next fastq sequence from a file
split records into groups
eg offset=0, chunks=10 means yield first of every ten records
seeking in a gzip file and determining the original file size
are not straightforward, therefore I'm not using my original
design for file spliting which involved seeking to a point in the
file and returning consecutive records
'''
if fname.endswith('.gz'):
f = gzip.open(fname,'rb')
else:
f = open(fname,'rb')
ct = -1
while True:
ct += 1
rec = read_fastq(f)
#end of file
if rec == None: break
if ct%chunks == offset:
yield rec
f.close()
def find_record(f):
'''
find first complete record starting at current position
'''
f.readline()
while True:
posn = f.tell()
header = f.readline()
posn2 = f.tell()
seq = f.readline()
redundant = f.readline()
qual = f.readline()
if qual == '': break #eof, leave file at end
if header[0] == '@' and redundant[0] == '+':
f.seek(posn)
break
f.seek(posn2)
def load_fastq(fname):
'''
load all fastq records into a list
each sequence record has: header, seq, qual
'''
return [x for x in next_fastq(fname)]