From fcc034adb0ac293362da8cc9334e5a9e97dc3fdd Mon Sep 17 00:00:00 2001 From: kedhammar Date: Fri, 14 Jun 2024 08:02:05 +0000 Subject: [PATCH] objectify alignment --- anglerfish/anglerfish.py | 11 +- anglerfish/demux/demux.py | 202 ++++++++++-------- anglerfish/explore/explore.py | 29 ++- .../test_anglerfish/test_demux/test_demux.py | 140 +++++------- 4 files changed, 178 insertions(+), 204 deletions(-) diff --git a/anglerfish/anglerfish.py b/anglerfish/anglerfish.py index 4dfa150..4b3450d 100755 --- a/anglerfish/anglerfish.py +++ b/anglerfish/anglerfish.py @@ -15,9 +15,10 @@ from .demux.adaptor import Adaptor from .demux.demux import ( + Alignment, + categorize_matches, cluster_matches, - layout_matches, - parse_paf_lines, + parse_reads_alns_from_paf, run_minimap2, write_demuxedfastq, ) @@ -138,12 +139,12 @@ def run_demux(args): for i in f: num_fq += 1 num_fq = int(num_fq / 4) - paf_entries = parse_paf_lines(align_path) + reads_alns: dict[str, list[Alignment]] = parse_reads_alns_from_paf(align_path) # Make stats log.info(f" Searching for adaptor hits in {adaptor_bc_name}") - fragments, singletons, concats, unknowns = layout_matches( - adaptor_name + "_i5", adaptor_name + "_i7", paf_entries + fragments, singletons, concats, unknowns = categorize_matches( + adaptor_name + "_i5", adaptor_name + "_i7", reads_alns ) stats = AlignmentStat(adaptor_bc_name) stats.compute_pafstats(num_fq, fragments, singletons, concats, unknowns) diff --git a/anglerfish/demux/demux.py b/anglerfish/demux/demux.py index 4f3ddda..eeb7424 100644 --- a/anglerfish/demux/demux.py +++ b/anglerfish/demux/demux.py @@ -75,83 +75,103 @@ def run_minimap2( subprocess.run("sort", stdin=p1.stdout, stdout=ofile, check=True) -def parse_paf_lines( - paf_path: str, min_qual: int = 1, complex_identifier: bool = False -) -> dict[str, list[dict]]: +class Alignment: """ - Read and parse one paf alignment lines. - Returns a dict with the import values for later use. + Class instantiated from one paf alignment entry. - If complex_identifier is True (default False), the keys will be on the form - "{read}_{i5_or_i7}_{strand_str}". + Retains the important values for later use. """ - entries: dict = {} + + def __init__(self, paf_line: str): + paf_cols = paf_line.split() + + # Unpack cols to vars for type annotation + self.read_name: str = paf_cols[0] + self.adapter_name: str = paf_cols[5] + self.read_len: int = int(paf_cols[1]) # read length + self.read_start: int = int(paf_cols[2]) # start alignment on read + self.read_end: int = int(paf_cols[3]) # end alignment on read + self.read_strand: str = paf_cols[4] + self.cg: str = paf_cols[-2] # cigar string + self.cs: str = paf_cols[-1] # cigar diff string + self.q: int = int(paf_cols[11]) # Q score + self.iseq: str | None = None + self.sample: str | None = None + + +def parse_reads_alns_from_paf( + paf_path: str, min_qual: int = 1, complex_identifier: bool = False +) -> dict[str, list[Alignment]]: + """ + Parse a .paf file into a dict, mapping reads to their respective alignment objects. + + Outputs: + + reads_alns = { + "read1": [ + aln_read1_adaptor1_i5, + aln_read1_adaptor1_i7, + aln_read1_adaptor2_i5, + ... + ], + "read2": [ + aln_read1_adaptor1_i5, + aln_read1_adaptor1_i7, + aln_read1_adaptor2_i5, + ... + ], + ... + } + + complex_identifier = False (default) + --> The keys will be on the form "{read}". + + complex_identifier = True + --> The keys will be on the form "{read}_{i5_or_i7}_{strand_str}". + """ + reads_alns: dict = {} with open(paf_path) as paf: for paf_line in paf: - paf_cols = paf_line.split() try: - # TODO: objectify this - - # Unpack cols to vars for type annotation - read: str = paf_cols[0] - adapter: str = paf_cols[5] - rlen: int = int(paf_cols[1]) # read length - rstart: int = int(paf_cols[2]) # start alignment on read - rend: int = int(paf_cols[3]) # end alignment on read - strand: str = paf_cols[4] - cg: str = paf_cols[-2] # cigar string - cs: str = paf_cols[-1] # cigar diff string - q: int = int(paf_cols[11]) # Q score - iseq: str | None = None - sample: str | None = None + # Parse paf line into Alignment object + aln = Alignment(paf_line) # Determine identifier - if complex_identifier: - i5_or_i7 = adapter.split("_")[-1] - if strand == "+": - strand_str = "positive" - else: - strand_str = "negative" - key = f"{read}_{i5_or_i7}_{strand_str}" - else: - key = read + i5_or_i7 = aln.adapter_name.split("_")[-1] + strand_str = "positive" if aln.read_strand == "+" else "negative" + key = ( + f"{aln.read_name}_{i5_or_i7}_{strand_str}" + if complex_identifier + else aln.read_name + ) except IndexError: - log.debug(f"Could not find all paf columns: {read}") + log.debug(f"Could not find all paf columns: {aln.read_name}") continue - if q < min_qual: - log.debug(f"Low quality alignment: {read}") + if aln.q < min_qual: + log.debug(f"Low quality alignment: {aln.read_name}") continue - # Compile entry - entry = { - "read": read, - "adapter": adapter, - "rlen": rlen, - "rstart": rstart, - "rend": rend, - "strand": strand, - "cg": cg, - "cs": cs, - "q": q, - "iseq": iseq, - "sample": sample, - } - - if key in entries.keys(): - entries[key].append(entry) + if key in reads_alns.keys(): + reads_alns[key].append(aln) else: - entries[key] = [entry] + reads_alns[key] = [aln] - return entries + return reads_alns -def layout_matches( - i5_name: str, i7_name: str, paf_entries: dict[str, list[dict]] -) -> tuple[dict, dict, dict, dict]: +def categorize_matches( + i5_name: str, i7_name: str, reads_alns: dict[str, list[Alignment]] +) -> tuple[ + dict[str, list[Alignment]], + dict[str, list[Alignment]], + dict[str, list[Alignment]], + dict[str, list[Alignment]], +]: """ - Search the parsed paf alignments and layout possible Illumina library fragments + Search the parsed paf alignments and layout possible Illumina library fragments, + Returns dicts: - fragments. Reads with one I7 and one I5 - singletons. Reads with that only match either I5 or I7 adaptors @@ -163,33 +183,33 @@ def layout_matches( singletons = {} concats = {} unknowns = {} - for read, entry_list in paf_entries.items(): - sorted_entries = [] - for k in range(len(entry_list) - 1): - entry_i = entry_list[k] - entry_j = entry_list[k + 1] + for read, alns in reads_alns.items(): + sorted_alns = [] + for i in range(len(alns) - 1): + aln_i: Alignment = alns[i] + aln_j: Alignment = alns[i + 1] if ( - entry_i["adapter"] != entry_j["adapter"] - and (entry_i["adapter"] == i5_name and entry_j["adapter"] == i7_name) - or (entry_j["adapter"] == i5_name and entry_i["adapter"] == i7_name) + aln_i.adapter_name != aln_j.adapter_name + and (aln_i.adapter_name == i5_name and aln_j.adapter_name == i7_name) + or (aln_j.adapter_name == i5_name and aln_i.adapter_name == i7_name) ): - if entry_i in sorted_entries: - sorted_entries.append(entry_j) + if aln_i in sorted_alns: + sorted_alns.append(aln_j) else: - sorted_entries.extend([entry_i, entry_j]) - if len(entry_list) == 1: - singletons[read] = entry_list - elif len(sorted_entries) == 2: - fragments[read] = sorted(sorted_entries, key=lambda l: l["rstart"]) - elif len(sorted_entries) > 2: - concats[read] = sorted(sorted_entries, key=lambda l: l["rstart"]) + sorted_alns.extend([aln_i, aln_j]) + if len(alns) == 1: + singletons[read] = alns + elif len(sorted_alns) == 2: + fragments[read] = sorted(sorted_alns, key=lambda aln: aln.read_start) + elif len(sorted_alns) > 2: + concats[read] = sorted(sorted_alns, key=lambda aln: aln.read_start) log.debug( - f"Concatenated fragment: {read}, found: {[(i['adapter'],i['rstart']) for i in sorted_entries]}" + f"Concatenated fragment: {read}, found: {[(aln.adapter_name,aln.read_start) for aln in sorted_alns]}" ) else: - unknowns[read] = entry_list + unknowns[read] = alns log.debug( - f"Unknown fragment: {read}, found: {[(i['adapter'],i['rstart']) for i in entry_list]}" + f"Unknown fragment: {read}, found: {[(aln.adapter_name,aln.read_start) for aln in alns]}" ) # TODO: add minimum insert size return (fragments, singletons, concats, unknowns) @@ -208,19 +228,19 @@ def cluster_matches( unmatched_bed = [] for read, alignments in matches.items(): if ( - alignments[0]["adapter"][-2:] == "i5" - and alignments[1]["adapter"][-2:] == "i7" + alignments[0].adapter_name[-2:] == "i5" + and alignments[1].adapter_name[-2:] == "i7" ): - i5 = alignments[0] - i7 = alignments[1] + i5_aln = alignments[0] + i7_aln = alignments[1] elif ( - alignments[1]["adapter"][-2:] == "i5" - and alignments[0]["adapter"][-2:] == "i7" + alignments[1].adapter_name[-2:] == "i5" + and alignments[0].adapter_name[-2:] == "i7" ): - i5 = alignments[1] - i7 = alignments[0] + i5_aln = alignments[1] + i7_aln = alignments[0] else: - log.debug(" {read} has no valid illumina fragment") + log.debug(f" {read} has no valid illumina fragment") continue dists = [] @@ -232,7 +252,7 @@ def cluster_matches( if i5_reversed and i5_seq is not None: i5_seq = str(Seq(i5_seq).reverse_complement()) fi5, d1 = parse_cs( - i5["cs"], + i5_aln.cs, i5_seq, adaptor.i5.len_umi_before_index, adaptor.i5.len_umi_after_index, @@ -245,7 +265,7 @@ def cluster_matches( if i7_reversed and i7_seq is not None: i7_seq = str(Seq(i7_seq).reverse_complement()) fi7, d2 = parse_cs( - i7["cs"], + i7_aln.cs, i7_seq, adaptor.i7.len_umi_before_index, adaptor.i7.len_umi_after_index, @@ -259,8 +279,8 @@ def cluster_matches( # Test if two samples in the sheet is equidistant to the i5/i7 if len([i for i, j in enumerate(dists) if j == dists[index_min]]) > 1: continue - start_insert = min(i5["rend"], i7["rend"]) - end_insert = max(i7["rstart"], i5["rstart"]) + start_insert = min(i5_aln.read_end, i7_aln.read_end) + end_insert = max(i7_aln.read_start, i5_aln.read_start) if end_insert - start_insert < 10: continue if dists[index_min] > max_distance: diff --git a/anglerfish/explore/explore.py b/anglerfish/explore/explore.py index 0579b61..99f3e2c 100644 --- a/anglerfish/explore/explore.py +++ b/anglerfish/explore/explore.py @@ -6,7 +6,7 @@ import pandas as pd from anglerfish.demux.adaptor import Adaptor, load_adaptors -from anglerfish.demux.demux import parse_paf_lines, run_minimap2 +from anglerfish.demux.demux import Alignment, parse_reads_alns_from_paf, run_minimap2 from anglerfish.explore.entropy import calculate_relative_entropy logging.basicConfig(level=logging.INFO) @@ -44,13 +44,13 @@ def run_explore( log.info(f"Run uuid {run_uuid}") adaptors = cast(list[Adaptor], load_adaptors()) - alignments: list[tuple[Adaptor, str]] = [] + adaptors_and_aln_paths: list[tuple[Adaptor, str]] = [] # Map all reads against all adaptors for adaptor in adaptors: # Align aln_path = os.path.join(outdir, f"{adaptor.name}.paf") - alignments.append((adaptor, aln_path)) + adaptors_and_aln_paths.append((adaptor, aln_path)) if os.path.exists(aln_path) and use_existing: log.info(f"Skipping {adaptor.name} as alignment already exists") continue @@ -70,21 +70,18 @@ def run_explore( # Parse alignments entries: dict = {} adaptors_included = [] - for adaptor, aln_path in alignments: + for adaptor, aln_path in adaptors_and_aln_paths: log.info(f"Parsing {adaptor.name}") - aln_dict_with_lists: dict[str, list[dict]] = parse_paf_lines( + reads_alns: dict[str, list[Alignment]] = parse_reads_alns_from_paf( aln_path, complex_identifier=True ) # Choose only the highest scoring alignment for each combination of read, adaptor end and strand - aln_dict = dict( - [ - (aln_name, max(aln_dicts, key=lambda x: x["q"])) - for aln_name, aln_dicts in aln_dict_with_lists.items() - ] - ) - - df = pd.DataFrame.from_dict(aln_dict, orient="index") + reads_to_highest_q_aln_dict: dict[str, dict] = {} + for aln_name, alns in reads_alns.items(): + highest_q_aln = max(alns, key=lambda aln: aln.q) + reads_to_highest_q_aln_dict[aln_name] = vars(highest_q_aln) + df = pd.DataFrame.from_dict(reads_to_highest_q_aln_dict, orient="index") nr_good_hits = {} # Match Insert Match = mim @@ -125,7 +122,7 @@ def run_explore( insert_thres_high = insert_thres_high requirements = ( - (df_mim["adapter"] == f"{adaptor.name}_{adaptor_end_name}") + (df_mim["adapter_name"] == f"{adaptor.name}_{adaptor_end_name}") & (df_mim["match_1_len"] >= (before_thres)) & (df_mim["insert_len"] >= insert_thres_low) & (df_mim["insert_len"] <= insert_thres_high) @@ -155,8 +152,8 @@ def run_explore( # Filter multiple hits per read and adaptor end df_good_hits = df_good_hits.sort_values( - by=["read", "match_1_len"], ascending=False - ).drop_duplicates(subset=["read", "adapter"], keep="first") + by=["read_name", "match_1_len"], ascending=False + ).drop_duplicates(subset=["read_name", "adapter_name"], keep="first") if adaptor.name not in entries.keys(): entries[adaptor.name] = {} diff --git a/tests/test_anglerfish/test_demux/test_demux.py b/tests/test_anglerfish/test_demux/test_demux.py index a910688..feb8d6b 100644 --- a/tests/test_anglerfish/test_demux/test_demux.py +++ b/tests/test_anglerfish/test_demux/test_demux.py @@ -111,108 +111,64 @@ def test_run_minimap2(fixture): ) -def test_parse_paf_lines_simple(fixture): - paf_lines_simple = to_test.parse_paf_lines(fixture["paf_single"]) - expected_simple = { - "0ad8bdb6-e009-43c5-95b1-d381e699f983": [ - { - "read": "0ad8bdb6-e009-43c5-95b1-d381e699f983", - "adapter": "truseq_i7", - "rlen": 418, - "rstart": 302, - "rend": 374, - "strand": "+", - "cg": "cg:Z:11M2D33M7I21M", - "cs": "cs:Z::11-ca:6*tg:13*nt*na*na*nc*nt*nt*ng*ng*nt*nc:1*ta:1+ctagaaa:2*gt*tg:17", - "q": 25, - "iseq": None, - "sample": None, - }, - { - "read": "0ad8bdb6-e009-43c5-95b1-d381e699f983", - "adapter": "truseq_i5", - "rlen": 418, - "rstart": 45, - "rend": 110, - "strand": "+", - "cg": "cg:Z:15M1D6M7I3M1I33M", - "cs": "cs:Z::15-a*cg:5+tcccgat:3+g:33", - "q": 38, - "iseq": None, - "sample": None, - }, - ] - } - assert paf_lines_simple == expected_simple - - -def test_parse_paf_lines_complex(fixture): - paf_lines_complex = to_test.parse_paf_lines(fixture["paf_single"]) - expected_complex = { - "0ad8bdb6-e009-43c5-95b1-d381e699f983": [ - { - "read": "0ad8bdb6-e009-43c5-95b1-d381e699f983", - "adapter": "truseq_i7", - "rlen": 418, - "rstart": 302, - "rend": 374, - "strand": "+", - "cg": "cg:Z:11M2D33M7I21M", - "cs": "cs:Z::11-ca:6*tg:13*nt*na*na*nc*nt*nt*ng*ng*nt*nc:1*ta:1+ctagaaa:2*gt*tg:17", - "q": 25, - "iseq": None, - "sample": None, - }, - { - "read": "0ad8bdb6-e009-43c5-95b1-d381e699f983", - "adapter": "truseq_i5", - "rlen": 418, - "rstart": 45, - "rend": 110, - "strand": "+", - "cg": "cg:Z:15M1D6M7I3M1I33M", - "cs": "cs:Z::15-a*cg:5+tcccgat:3+g:33", - "q": 38, - "iseq": None, - "sample": None, - }, - ] - } - assert paf_lines_complex == expected_complex +def test_parse_alns_from_path(fixture): + reads_alns = to_test.parse_reads_alns_from_paf(fixture["paf_single"]) + + for read_name, alns in reads_alns.items(): + assert read_name == "0ad8bdb6-e009-43c5-95b1-d381e699f983" + for aln in alns: + if aln.adapter_name == "truseq_i7": + assert aln.read_name == "0ad8bdb6-e009-43c5-95b1-d381e699f983" + assert aln.adapter_name == "truseq_i7" + assert aln.read_len == 418 + assert aln.read_start == 302 + assert aln.read_end == 374 + assert aln.read_strand == "+" + assert aln.cg == "cg:Z:11M2D33M7I21M" + assert ( + aln.cs + == "cs:Z::11-ca:6*tg:13*nt*na*na*nc*nt*nt*ng*ng*nt*nc:1*ta:1+ctagaaa:2*gt*tg:17" + ) + assert aln.q == 25 + assert aln.iseq is None + assert aln.sample is None + else: + assert aln.read_name == "0ad8bdb6-e009-43c5-95b1-d381e699f983" + assert aln.adapter_name == "truseq_i5" + assert aln.read_len == 418 + assert aln.read_start == 45 + assert aln.read_end == 110 + assert aln.read_strand == "+" + assert aln.cg == "cg:Z:15M1D6M7I3M1I33M" + assert aln.cs == "cs:Z::15-a*cg:5+tcccgat:3+g:33" + assert aln.q == 38 + assert aln.iseq is None + assert aln.sample is None def test_parse_cs(fixture): - paf_lines_simple = to_test.parse_paf_lines(fixture["paf_single"]) - - for read_name, alignments in paf_lines_simple.items(): - for alignment in alignments: - cs_string = alignment["cs"] - cs_parsed = to_test.parse_cs( - cs_string=cs_string, - index_seq=fixture["index"], - umi_before=0, - umi_after=0, - ) + test_cs_str = ( + "cs:Z::11-ca:6*tg:13*nt*na*na*nc*nt*nt*ng*ng*nt*nc:1*ta:1+ctagaaa:2*gt*tg:17" + ) + expected_cs_parsed = ("taacttggtc", 0) - if alignment["adapter"] == "truseq_i7": - # Perfect match to index - assert cs_parsed[0] == fixture["index"].lower() - assert cs_parsed[1] == 0 - elif alignment["adapter"] == "truseq_i5": - # No index, distance the length of all concatenated substitutions of the cs string - assert cs_parsed[0] == "" - assert cs_parsed[1] == 10 - else: - raise AssertionError("Case not covered.") + cs_parsed = to_test.parse_cs( + cs_string=test_cs_str, + index_seq=fixture["index"], + umi_before=0, + umi_after=0, + ) + + assert cs_parsed == expected_cs_parsed def test_layout_matches(fixture): i5_name = "truseq_i5" i7_name = "truseq_i7" - paf_entries = to_test.parse_paf_lines(fixture["paf_multiple"]) + reads_alns = to_test.parse_reads_alns_from_paf(fixture["paf_multiple"]) - layout = to_test.layout_matches( - i5_name=i5_name, i7_name=i7_name, paf_entries=paf_entries + layout = to_test.categorize_matches( + i5_name=i5_name, i7_name=i7_name, reads_alns=reads_alns ) fragments, singletons, concats, unknowns = layout