diff --git a/src/basecaller.py b/src/basecaller.py index 208c681..e55ad35 100644 --- a/src/basecaller.py +++ b/src/basecaller.py @@ -121,7 +121,7 @@ def calibration(digitisation, range): return range / digitisation # region submit reads -def submit_reads(args, client, batch): +def submit_reads(args, client, sk, batch): ''' Submit batch of reads to basecaller ''' @@ -164,7 +164,7 @@ def submit_reads(args, client, batch): if tries >= 1000: if not result: print("Skipped a read: {}".format(read_id)) - skipped.append(read_id) + skipped.append([read_id, "stage-0", "timed out trying to submit read to client"]) break if result: read_counter += 1 @@ -185,12 +185,15 @@ def submit_reads(args, client, batch): )) if result: read_counter += 1 - + if len(skipped) > 0: + for i in skipped: + sk.put(i) + return read_counter, read_store # region get reads -def get_reads(args, client, read_counter, read_store): +def get_reads(args, client, read_counter, sk, read_store): ''' Get reads from the basecaller and process them ''' @@ -200,6 +203,7 @@ def get_reads(args, client, read_counter, read_store): qs_cutoff = float(args.qscore) done = 0 bcalled_list = [] + skipped_list = [] while done < read_counter: bcalled = client.get_completed_reads() @@ -241,10 +245,12 @@ def get_reads(args, client, read_counter, read_store): except Exception as error: # handle the exception print("An exception occurred in stage 1:", type(error).__name__, "-", error) - sys.exit(1) + skipped_list.append([read_id, "stage-1", "Failed to get initial sequence data from read record"]) + continue try: if len(bcalled_read["sequence"]) == 0: print("read_id: {} has a sequence length of zero, skipping".format(read_id)) + skipped_list.append([read_id, "stage-1", "Sequence length of zero"]) continue bcalled_read["qscore"] = call['datasets']['qstring'] if args.moves_out: @@ -257,8 +263,9 @@ def get_reads(args, client, read_counter, read_store): # handle the exception print("An exception occurred getting sam_record/alignment_sam_record for {}:", read_id, type(error).__name__, "-", error) bcalled_read["sam_record"] = "" + skipped_list.append([read_id, "stage-1", "Failed to get sam_record/alignment_sam_record"]) continue - if args.do_read_splitting: + if args.do_read_splitting or args.above_7310: bcalled_read["num_samples"] = None bcalled_read["trimmed_samples"] = None else: @@ -273,7 +280,8 @@ def get_reads(args, client, read_counter, read_store): except Exception as error: # handle the exception print("An exception occurred in stage 2:", type(error).__name__, "-", error) - sys.exit(1) + skipped_list.append([read_id, "stage-2", "Error getting data related to sam outpout"]) + continue try: if SPLIT_PASS: if bcalled_read["read_qscore"] >= qs_cutoff: @@ -333,15 +341,19 @@ def get_reads(args, client, read_counter, read_store): except Exception as error: # handle the exception print("An exception occurred in stage 3:", type(error).__name__, "-", error) - sys.exit(1) - - + skipped_list.append([read_id, "stage-3", "Error splitting barcodes or sequencing summary"]) + continue + + if len(skipped_list) > 0: + for i in skipped_list: + sk.put(i) + read_counter = 0 done = 0 return bcalled_list # region entry point -def basecaller_proc(args, iq, rq, address, config, params, N): +def basecaller_proc(args, iq, rq, sk, address, config, params, N): """ submit a read to the basecall server """ @@ -359,10 +371,10 @@ def basecaller_proc(args, iq, rq, address, config, params, N): break # print("[BASECALLER] - submitting channel: {}".format(batch[0]["channel_number"])) # Submit to be basecalled - read_counter, read_store = submit_reads(args, client, batch) + read_counter, read_store = submit_reads(args, client, sk, batch) # now collect the basecalled reads # print("[BASECALLER] - getting basecalled channel: {}".format(batch[0]["channel_number"])) - bcalled_list = get_reads(args, client, read_counter, read_store) + bcalled_list = get_reads(args, client, read_counter, sk, read_store) # TODO: make a skipped queue to handle skipped reads # print("[BASECALLER] - writing channel: {}".format(batch[0]["channel_number"])) rq.put(bcalled_list) diff --git a/src/buttery_eel.py b/src/buttery_eel.py index 0178230..00e10bf 100755 --- a/src/buttery_eel.py +++ b/src/buttery_eel.py @@ -240,15 +240,21 @@ def main(): if platform.system() == "Darwin": im = mp.Manager() rm = mp.Manager() + sm = mp.Manager() input_queue = im.JoinableQueue() result_queue = rm.JoinableQueue() + skip_queue = sm.JoinableQueue() else: input_queue = mp.JoinableQueue() result_queue = mp.JoinableQueue() + skip_queue = mp.JoinableQueue() processes = [] if args.duplex: + if platform.system() == "Darwin": + print("MacOS not currently supported for duplex calling") + sys.exit(1) if args.single: print("Duplex mode active - a duplex model must be used to output duplex reads") print("Buttery-eel does not have checks for this, as the model names are in flux") @@ -264,7 +270,7 @@ def main(): out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker') out_writer.start() # set up each worker to have a unique queue, so it only processes 1 channel at a time - basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0)) + basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queue, result_queue, skip_queue, address, config, params, 0), daemon=True, name='basecall_worker_{}'.format(0)) basecall_worker.start() processes.append(basecall_worker) @@ -282,7 +288,7 @@ def main(): out_writer.start() # set up each worker to have a unique queue, so it only processes 1 channel at a time for name in queue_names: - basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name)) + basecall_worker = mp.Process(target=basecaller_proc, args=(args, duplex_queues[name], result_queue, skip_queue, address, config, params, name), daemon=True, name='basecall_worker_{}'.format(name)) basecall_worker.start() processes.append(basecall_worker) else: @@ -291,7 +297,7 @@ def main(): out_writer = mp.Process(target=write_worker, args=(args, result_queue, OUT, SAM_OUT), name='write_worker') out_writer.start() for i in range(args.procs): - basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i)) + basecall_worker = mp.Process(target=basecaller_proc, args=(args, input_queue, result_queue, skip_queue, address, config, params, i), daemon=True, name='basecall_worker_{}'.format(i)) basecall_worker.start() processes.append(basecall_worker) @@ -300,9 +306,37 @@ def main(): p.join() result_queue.put(None) out_writer.join() + + if skip_queue.qsize() > 0: + print("1") + skipped = 0 + skip_queue.put(None) + if "/" in args.output: + SKIPPED = open("{}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1])), "w") + print("Skipped reads detected, writing details to file: {}/skipped_reads.txt".format("/".join(args.output.split("/")[:-1]))) + else: + SKIPPED = open("./skipped_reads.txt", "w") + print("Skipped reads detected, writing details to file: ./skipped_reads.txt") + + SKIPPED.write("read_id\tstage\terror\n") + print("2") + + while True: + read = skip_queue.get() + if read is None: + break + read_id, stage, error = read + skipped += 1 + SKIPPED.write("{}\t{}\t{}\n".format(read_id, stage, error)) + + print("3") + SKIPPED.close() + print("Skipped reads total: {}".format(skipped)) + print("\n") print("Basecalling complete!\n") + # ========================================================================== # Finish up, close files, disconnect client and terminate server # ========================================================================== diff --git a/src/cli.py b/src/cli.py index 7e6c4e2..d2494f6 100644 --- a/src/cli.py +++ b/src/cli.py @@ -58,7 +58,7 @@ def get_args(): # read splitting read_splitting.add_argument("--do_read_splitting", action="store_true", - help="Perform read splitting based on mid-strand adapter detection") + help="Perform read splitting based on mid-strand adapter detection - On by default dorado-server >= v7.3.10") read_splitting.add_argument("--min_score_read_splitting", type=float, default=50.0, help="Minimum mid-strand adapter score for reads to be split")