Skip to content

Commit

Permalink
fix bugs and put channels in order
Browse files Browse the repository at this point in the history
  • Loading branch information
Psy-Fer committed Jun 26, 2024
1 parent a1bc68e commit 34ee6b0
Showing 1 changed file with 21 additions and 16 deletions.
37 changes: 21 additions & 16 deletions src/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ def get_data_by_channel(args, dq):
duplex[channel].append([readID, read_num])
else:
duplex[channel] = [[readID, read_num]]

print("Number of channels:", len(list(duplex.keys())))
# now sort based on read_num so it's in time order per channel
for ch in duplex.keys():
for ch in sorted(duplex.keys()):
duplex[ch].sort(key = lambda x: x[1])
print(duplex[ch])
# print(duplex[ch])
# send channel to the queue
dq.put([ch, duplex[ch]])
# to break the reader worker
Expand Down Expand Up @@ -199,12 +199,10 @@ def duplex_read_worker(args, dq, pre_dq):
pr = cProfile.Profile()
pr.enable()


dq_names = dq.keys()
free_names = dq_names
free_names = [i for i in dq_names]
taken_names = []


# read the file once, and get all the channels into the pre_dq queue
get_data_by_channel(args, pre_dq)

Expand All @@ -222,6 +220,8 @@ def duplex_read_worker(args, dq, pre_dq):
# pull from the pre_dq
while True:
if len(free_names) == 0 or ending:
if ending and len(taken_names) == 0:
break
# push a batch for each generator till queues are full
for qname in readers.keys():
reads = readers[qname]
Expand All @@ -232,31 +232,36 @@ def duplex_read_worker(args, dq, pre_dq):
free_names.append(qname)
taken_names.remove(qname)
else:
dq[qname].put(next())
dq[qname].put(batch)
else:
continue
# remove free items from readers so we don't double up in ending state
for q in free_names:
readers.pop(q)
# if no more channels and all queues done, break
if ending and taken_names == 0:
break

# remove free items from readers so we don't double up in ending state
for qn in free_names:
if len(readers.keys()) > 0:
if qn in readers.keys():
readers.pop(qn)
elif not ending:
# populate the read generators with matched queues
ch = pre_dq.get()
if ch is None:
ending = True
continue
channel = ch[0]
print("processing channel: {}".format(channel))
data = ch[1]
read_list = [i for i, _ in data]
qname = free_names[0]
q = free_names[0]
free_names.pop(0)
taken_names.append(qname)
taken_names.append(q)
reads = s5.get_read_list_multi(read_list, threads=args.slow5_threads, batchsize=args.slow5_batchsize, aux='all')
batches = _get_slow5_batch(args, s5, reads, size=args.slow5_batchsize, slow5_filename=filename_slow5, header_array=header_array)
readers[qname] = batches
readers[q] = batches
else:
print("Some end state not known!")

for qname in dq_names:
dq[qname].put(None)

# if profiling, dump info into log files in current dir
if args.profile:
Expand Down

0 comments on commit 34ee6b0

Please sign in to comment.