Skip to content

Commit

Permalink
skipped reads and readsplitting for newer versions
Browse files Browse the repository at this point in the history
  • Loading branch information
Psy-Fer committed Aug 19, 2024
1 parent b4a5c99 commit cc7507d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 17 deletions.
38 changes: 25 additions & 13 deletions src/basecaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def calibration(digitisation, range):
return range / digitisation

# region submit reads
def submit_reads(args, client, batch):
def submit_reads(args, client, sk, batch):
'''
Submit batch of reads to basecaller
'''
Expand Down Expand Up @@ -164,7 +164,7 @@ def submit_reads(args, client, batch):
if tries >= 1000:
if not result:
print("Skipped a read: {}".format(read_id))
skipped.append(read_id)
skipped.append([read_id, "stage-0", "timed out trying to submit read to client"])
break
if result:
read_counter += 1
Expand All @@ -185,12 +185,15 @@ def submit_reads(args, client, batch):
))
if result:
read_counter += 1

if len(skipped) > 0:
for i in skipped:
sk.put(i)

return read_counter, read_store


# region get reads
def get_reads(args, client, read_counter, read_store):
def get_reads(args, client, read_counter, sk, read_store):
'''
Get reads from the basecaller and process them
'''
Expand All @@ -200,6 +203,7 @@ def get_reads(args, client, read_counter, read_store):
qs_cutoff = float(args.qscore)
done = 0
bcalled_list = []
skipped_list = []

while done < read_counter:
bcalled = client.get_completed_reads()
Expand Down Expand Up @@ -241,10 +245,12 @@ def get_reads(args, client, read_counter, read_store):
except Exception as error:
# handle the exception
print("An exception occurred in stage 1:", type(error).__name__, "-", error)
sys.exit(1)
skipped_list.append([read_id, "stage-1", "Failed to get initial sequence data from read record"])
continue
try:
if len(bcalled_read["sequence"]) == 0:
print("read_id: {} has a sequence length of zero, skipping".format(read_id))
skipped_list.append([read_id, "stage-1", "Sequence length of zero"])
continue
bcalled_read["qscore"] = call['datasets']['qstring']
if args.moves_out:
Expand All @@ -257,8 +263,9 @@ def get_reads(args, client, read_counter, read_store):
# handle the exception
print("An exception occurred getting sam_record/alignment_sam_record for {}:", read_id, type(error).__name__, "-", error)
bcalled_read["sam_record"] = ""
skipped_list.append([read_id, "stage-1", "Failed to get sam_record/alignment_sam_record"])
continue
if args.do_read_splitting:
if args.do_read_splitting or args.above_7310:
bcalled_read["num_samples"] = None
bcalled_read["trimmed_samples"] = None
else:
Expand All @@ -273,7 +280,8 @@ def get_reads(args, client, read_counter, read_store):
except Exception as error:
# handle the exception
print("An exception occurred in stage 2:", type(error).__name__, "-", error)
sys.exit(1)
skipped_list.append([read_id, "stage-2", "Error getting data related to sam outpout"])
continue
try:
if SPLIT_PASS:
if bcalled_read["read_qscore"] >= qs_cutoff:
Expand Down Expand Up @@ -333,15 +341,19 @@ def get_reads(args, client, read_counter, read_store):
except Exception as error:
# handle the exception
print("An exception occurred in stage 3:", type(error).__name__, "-", error)
sys.exit(1)


skipped_list.append([read_id, "stage-3", "Error splitting barcodes or sequencing summary"])
continue

if len(skipped_list) > 0:
for i in skipped_list:
sk.put(i)

read_counter = 0
done = 0
return bcalled_list

# region entry point
def basecaller_proc(args, iq, rq, address, config, params, N):
def basecaller_proc(args, iq, rq, sk, address, config, params, N):
"""
submit a read to the basecall server
"""
Expand All @@ -359,10 +371,10 @@ def basecaller_proc(args, iq, rq, address, config, params, N):
break
# print("[BASECALLER] - submitting channel: {}".format(batch[0]["channel_number"]))
# Submit to be basecalled
read_counter, read_store = submit_reads(args, client, batch)
read_counter, read_store = submit_reads(args, client, sk, batch)
# now collect the basecalled reads
# print("[BASECALLER] - getting basecalled channel: {}".format(batch[0]["channel_number"]))
bcalled_list = get_reads(args, client, read_counter, read_store)
bcalled_list = get_reads(args, client, read_counter, sk, read_store)
# TODO: make a skipped queue to handle skipped reads
# print("[BASECALLER] - writing channel: {}".format(batch[0]["channel_number"]))
rq.put(bcalled_list)
Expand Down
40 changes: 37 additions & 3 deletions src/buttery_eel.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,21 @@ def main():
if platform.system() == "Darwin":
im = mp.Manager()
rm = mp.Manager()
sm = mp.Manager()
input_queue = im.JoinableQueue()
result_queue = rm.JoinableQueue()
skip_queue = sm.JoinableQueue()
else:
input_queue = mp.JoinableQueue()
result_queue = mp.JoinableQueue()
skip_queue = mp.JoinableQueue()

processes = []

if args.duplex:
if platform.system() == "Darwin":
print("MacOS not currently supported for duplex calling")
sys.exit(1)
if args.single:
print("Duplex mode active - a duplex model must be used to output duplex reads")
print("Buttery-eel does not have checks for this, as the model names are in flux")
Expand All @@ -264,7 +270,7 @@ def main():
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker')
out_writer.start()
# set up each worker to have a unique queue, so it only processes 1 channel at a time
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, skip_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0))
basecall_worker.start()
processes.append(basecall_worker)

Expand All @@ -282,7 +288,7 @@ def main():
out_writer.start()
# set up each worker to have a unique queue, so it only processes 1 channel at a time
for name in queue_names:
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, skip_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name))
basecall_worker.start()
processes.append(basecall_worker)
else:
Expand All @@ -291,7 +297,7 @@ def main():
out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker')
out_writer.start()
for i in range(args.procs):
basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i))
basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, skip_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i))
basecall_worker.start()
processes.append(basecall_worker)

Expand All @@ -300,9 +306,37 @@ def main():
p.join()
result_queue.put(None)
out_writer.join()

if skip_queue.qsize() > 0:
print("1")
skipped = 0
skip_queue.put(None)
if "/" in args.output:
SKIPPED = open("{}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1])), "w")
print("Skipped reads detected, writing details to file: {}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1])))
else:
SKIPPED = open("./skipped_reads.txt", "w")
print("Skipped reads detected, writing details to file: ./skipped_reads.txt")

SKIPPED.write("read_id\tstage\terror\n")
print("2")

while True:
read = skip_queue.get()
if read is None:
break
read_id, stage, error = read
skipped += 1
SKIPPED.write("{}\t{}\t{}\n".format(read_id, stage, error))

print("3")
SKIPPED.close()
print("Skipped reads total: {}".format(skipped))

print("\n")
print("Basecalling complete!\n")


# ==========================================================================
# Finish up, close files, disconnect client and terminate server
# ==========================================================================
Expand Down
2 changes: 1 addition & 1 deletion src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_args():

# read splitting
read_splitting.add_argument("--do_read_splitting", action="store_true",
help="Perform read splitting based on mid-strand adapter detection")
help="Perform read splitting based on mid-strand adapter detection - On by default dorado-server >= v7.3.10")
read_splitting.add_argument("--min_score_read_splitting", type=float, default=50.0,
help="Minimum mid-strand adapter score for reads to be split")

Expand Down

0 comments on commit cc7507d

Please sign in to comment.