diff --git a/src/basecaller.py b/src/basecaller.py index 754df1f..fa92add 100644 --- a/src/basecaller.py +++ b/src/basecaller.py @@ -237,14 +237,14 @@ def get_reads(args, client, read_counter, sk, read_store): # ending = True continue # print("Channel: {} - read {}/{}".format(call['metadata']['channel'], done, read_counter)) - # if call['metadata']['read_id'] == "d209a644-9086-4296-9a6f-7d037dcb959f": - # for i in call: - # if isinstance(call[i], dict): - # for j in call[i]: - # print("{}: {}".format(j, call[i][j])) - # else: - # print("{}: {}".format(i, call[i])) - # sys.exit(1) + # if call['metadata']['read_id'] == "6f57d0a2-cfe4-4678-b6bc-299527eec9cc": + # for i in call: + # if isinstance(call[i], dict): + # for j in call[i]: + # print("{}: {}".format(j, call[i][j])) + # else: + # print("{}: {}".format(i, call[i])) + # sys.exit(1) try: bcalled_read = {} bcalled_read["sam_record"] = "" @@ -289,7 +289,7 @@ def get_reads(args, client, read_counter, sk, read_store): bcalled_read["sam_record"] = "" skipped_list.append([read_id, "stage-1", "Failed to get sam_record/alignment_sam_record"]) continue - if args.do_read_splitting or args.above_7310: + if args.do_read_splitting and not args.above_7310: bcalled_read["num_samples"] = None bcalled_read["trimmed_samples"] = None else: @@ -325,31 +325,39 @@ def get_reads(args, client, read_counter, sk, read_store): bcalled_read["barcode_arrangement"] = call['metadata']["barcode_arrangement"] - # create summary data - if args.seq_sum: - minknow_events = call['metadata'].get('num_minknow_events', ".") - sample_rate = float(read_store[read_id]["sampling_rate"]) - duration = round(float(call['metadata']['duration'] / sample_rate), 6) - num_events = call['metadata']['num_events'] - median = round(call['metadata']['median'], 6) - med_abs_dev = round(call['metadata']['med_abs_dev'], 6) - # pore_type = read_store[read_id]["header_array"].get('pore_type', 'not_set') - experiment_id = read_store[read_id]["header_array"]['protocol_group_id'] - run_id = read_store[read_id]["header_array"]["run_id"] - sample_id = read_store[read_id]["header_array"]["sample_id"] - strand_score_template = round(call['metadata']['call_score'], 6) - sequence_length = call['metadata']['sequence_length'] - channel = read_store[read_id]["aux_data"]['channel_number'] - mux = read_store[read_id]["aux_data"]['start_mux'] - start_time = round(float(read_store[read_id]["aux_data"]['start_time']) / sample_rate, 6) - end_reason_val = read_store[read_id]["aux_data"]['end_reason'] - end_reason = read_store[read_id]["aux_data"]['end_reason_labels'][end_reason_val] - output_name = "" - sum_out = "\t".join([str(i) for i in [read_store[read_id]["slow5_filename"], bcalled_read["parent_read_id"], bcalled_read["read_id"], run_id, channel, mux, minknow_events, - start_time, duration, passes_filtering, ".", num_events, ".", - sequence_length, round(bcalled_read["read_qscore"], 6), strand_score_template, median, med_abs_dev, - experiment_id, sample_id, end_reason]]) - bcalled_read["sum_out"] = sum_out + # create summary data + # do it for every read now, but only write it if the flag is on + # if args.seq_sum: + minknow_events = call['metadata'].get('num_minknow_events', ".") + sample_rate = float(read_store[read_id]["sampling_rate"]) + duration = round(float(call['metadata']['duration'] / sample_rate), 6) + bcalled_read["duration"] = duration + num_events = call['metadata']['num_events'] + median = round(call['metadata']['median'], 6) + med_abs_dev = round(call['metadata']['med_abs_dev'], 6) + bcalled_read["scaling_median"] = round(float(call['metadata']['scaling_median']), 3) + bcalled_read["scaling_med_abs_dev"] = round(float(call['metadata']['scaling_med_abs_dev']), 8) + bcalled_read["scaling_version"] = call['metadata']['scaling_version'] + # pore_type = read_store[read_id]["header_array"].get('pore_type', 'not_set') + experiment_id = read_store[read_id]["header_array"]['protocol_group_id'] + run_id = read_store[read_id]["header_array"]["run_id"] + sample_id = read_store[read_id]["header_array"]["sample_id"] + strand_score_template = round(call['metadata'].get('call_score', 0.0), 6) + sequence_length = call['metadata']['sequence_length'] + bcalled_read["sequence_length"] = sequence_length + channel = read_store[read_id]["aux_data"]['channel_number'] + bcalled_read["channel"] = channel + mux = read_store[read_id]["aux_data"]['start_mux'] + bcalled_read["mux"] = int(mux) + start_time = round(float(read_store[read_id]["aux_data"]['start_time']) / sample_rate, 6) + end_reason_val = read_store[read_id]["aux_data"]['end_reason'] + end_reason = read_store[read_id]["aux_data"]['end_reason_labels'][end_reason_val] + output_name = "" + sum_out = "\t".join([str(i) for i in [read_store[read_id]["slow5_filename"], bcalled_read["parent_read_id"], bcalled_read["read_id"], run_id, channel, mux, minknow_events, + start_time, duration, passes_filtering, ".", num_events, ".", + sequence_length, round(bcalled_read["read_qscore"], 6), strand_score_template, median, med_abs_dev, + experiment_id, sample_id, end_reason]]) + bcalled_read["sum_out"] = sum_out # create barcode summary data if args.barcode_kits: @@ -424,7 +432,6 @@ def basecaller_proc(args, iq, rq, sk, address, config, params, N): # sys.exit(1) # increase counter by 1 to get the 1 fake read but not the other 10 bcalled_list = get_reads(args, client, read_counter, sk, read_store) - # TODO: make a skipped queue to handle skipped reads print("[BASECALLER] - writing channel: {}".format(ch)) rq.put(bcalled_list) read_counter = 0 @@ -451,7 +458,6 @@ def basecaller_proc(args, iq, rq, sk, address, config, params, N): # now collect the basecalled reads # print("[BASECALLER] - getting basecalled channel: {}".format(batch[0]["channel_number"])) bcalled_list = get_reads(args, client, read_counter, sk, read_store) - # TODO: make a skipped queue to handle skipped reads # print("[BASECALLER] - writing channel: {}".format(batch[0]["channel_number"])) rq.put(bcalled_list) iq.task_done() diff --git a/src/writer.py b/src/writer.py index 51f6a32..43f5dcd 100644 --- a/src/writer.py +++ b/src/writer.py @@ -208,26 +208,27 @@ def write_worker(args, q, files, SAM_OUT, model_version_id): # write the barcode split sam/fastq bc_writer = bc_files[barcode_name] if SAM_OUT: + # TODO: Add duplex calling to the barcoded output if args.above_7412: + sam_tags = "MN:i:{}\tqs:i:{}\tmx:i:{}\tch:i:{}\tns:i:{}\tts:i:{}\tsm:f:{}\tsd:f:{}\tsv:Z:{}\tdu:f:{}".format(read["sequence_length"], + read["int_read_qscore"], + read["mux"], + read["channel"], + read["num_samples"], + read["trimmed_samples"], + read["scaling_median"], + read["scaling_med_abs_dev"], + read["scaling_version"], + read["duration"]) if args.call_mods: - if args.do_read_splitting: - bc_writer.write("{}\tpi:Z:{}\tBC:Z:{}\n".format(read["sam_record"], read["parent_read_id"], barcode)) - else: - bc_writer.write("{}\tBC:Z:{}\n".format(read["sam_record"], barcode)) + bc_writer.write("{}\tpi:Z:{}\t{}\tBC:Z:{}\n".format(read["sam_record"], read["parent_read_id"], sam_tags, barcode)) elif args.moves_out: m = read["move_table"].tolist() move_str = ','.join(map(str, m)) - if args.do_read_splitting: - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["parent_read_id"], barcode)) - else: - # do ns and ts tags - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\t{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, sam_tags, read["parent_read_id"], barcode)) else: - if args.do_read_splitting: - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["parent_read_id"], barcode)) - else: - # do ns and ts tags - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\t{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], sam_tags, read["parent_read_id"], barcode)) + else: if args.call_mods: if args.do_read_splitting: @@ -238,16 +239,16 @@ def write_worker(args, q, files, SAM_OUT, model_version_id): m = read["move_table"].tolist() move_str = ','.join(map(str, m)) if args.do_read_splitting: - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["parent_read_id"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["parent_read_id"], barcode)) else: # do ns and ts tags - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) else: if args.do_read_splitting: - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["parent_read_id"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tqs:i:{}\tpi:Z:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["parent_read_id"], barcode)) else: # do ns and ts tags - bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) + bc_writer.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tqs:i:{}\tns:i:{}\tts:i:{}\tBC:Z:{}\n".format(read["read_id"], read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"], barcode)) else: bc_writer.write("{} barcode={}\n".format(read["header"], barcode)) @@ -285,6 +286,16 @@ def write_output(args, read, OUT, SAM_OUT): read_id = read["read_id"] if SAM_OUT: if args.above_7412: + sam_tags = "MN:i:{}\tqs:i:{}\tmx:i:{}\tch:i:{}\tns:i:{}\tts:i:{}\tsm:f:{}\tsd:f:{}\tsv:Z:{}\tdu:f:{}".format(read["sequence_length"], + read["int_read_qscore"], + read["mux"], + read["channel"], + read["num_samples"], + read["trimmed_samples"], + read["scaling_median"], + read["scaling_med_abs_dev"], + read["scaling_version"], + read["duration"]) if args.duplex: duplex_tag = "0" if read["duplex_strand_1"] is not None: @@ -292,44 +303,22 @@ def write_output(args, read, OUT, SAM_OUT): if read["duplex_parent"]: duplex_tag = "-1" if args.call_mods: - if args.do_read_splitting: - OUT.write("{}\tpi:Z:{}\tdx:i:{}\n".format(read["sam_record"], read["parent_read_id"], duplex_tag)) - else: - OUT.write("{}\n".format(read["sam_record"])) + OUT.write("{}\tpi:Z:{}\tdx:i:{}\n".format(read["sam_record"], read["parent_read_id"], duplex_tag)) elif args.moves_out: m = read["move_table"].tolist() move_str = ','.join(map(str, m)) - if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\tdx:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], read["int_read_qscore"], duplex_tag)) - else: - # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tpi:Z:{}\t{}\tdx:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], sam_tags, duplex_tag)) else: - if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\tdx:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], read["int_read_qscore"], duplex_tag)) - else: - # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tpi:Z:{}\tqs:i:{}\tdx:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], read["int_read_qscore"], duplex_tag)) else: if args.call_mods: - if args.do_read_splitting: - OUT.write("{}\tpi:Z:{}\n".format(read["sam_record"], read["parent_read_id"])) - else: - OUT.write("{}\n".format(read["sam_record"])) + OUT.write("{}\tpi:Z:{}\n".format(read["sam_record"], read["parent_read_id"])) elif args.moves_out: m = read["move_table"].tolist() move_str = ','.join(map(str, m)) - if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], read["int_read_qscore"])) - else: - # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tpi:Z:{}\t{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], sam_tags)) else: - if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], read["int_read_qscore"])) - else: - # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tpi:Z:{}\t{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], sam_tags)) else: if args.call_mods: if args.do_read_splitting: @@ -340,16 +329,16 @@ def write_output(args, read, OUT, SAM_OUT): m = read["move_table"].tolist() move_str = ','.join(map(str, m)) if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], read["int_read_qscore"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["parent_read_id"], read["int_read_qscore"])) else: # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tmv:B:c,{},{}\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["model_stride"], move_str, read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) else: if args.do_read_splitting: - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], read["int_read_qscore"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tpi:Z:{}\tqs:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["parent_read_id"], read["int_read_qscore"])) else: # do ns and ts tags - OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tNM:i:0\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) + OUT.write("{}\t4\t*\t0\t0\t*\t*\t0\t0\t{}\t{}\tqs:i:{}\tns:i:{}\tts:i:{}\n".format(read_id, read["sequence"], read["qscore"], read["int_read_qscore"], read["num_samples"], read["trimmed_samples"])) else: # write fastq OUT.write("{}\n".format(read["header"]))