Skip to content

Commit

Permalink
rename, annotate, explicate
Browse files Browse the repository at this point in the history
  • Loading branch information
kedhammar committed May 13, 2024
1 parent 249360c commit 86bb6cc
Showing 1 changed file with 63 additions and 35 deletions.
98 changes: 63 additions & 35 deletions anglerfish/demux/adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

import yaml

idelim = re.compile(r"\<N\>")
udelim = re.compile(r"(\<U\d+\>)")
ulen = re.compile(r"\<U(\d+)\>")
index_token = re.compile(r"\<N\>")
umi_token = re.compile(r"(\<U\d+\>)")
umi_length_token = re.compile(r"\<U(\d+)\>")


class Adaptor:
def __init__(self, adaptors, delim, adaptor_type, i7_index=None, i5_index=None):
# Adaptor
self.name = f"{adaptor_type}"
self.delim = delim

# Indices
self.i5 = AdaptorPart(
adaptors[adaptor_type]["i5"], adaptor_type, delim, i5_index
)
Expand All @@ -19,33 +24,42 @@ def __init__(self, adaptors, delim, adaptor_type, i7_index=None, i5_index=None):
)
self.i5_index = i5_index
self.i7_index = i7_index
self.i5_umi = re.findall(udelim, self.i5.sequence)

# UMIs
self.i5_umi = re.findall(umi_token, self.i5.sequence)
self.i5_umi_before = 0
self.i5_umi_after = 0
self.i7_umi = re.findall(udelim, self.i7.sequence)

self.i7_umi = re.findall(umi_token, self.i7.sequence)
self.i7_umi_before = 0
self.i7_umi_after = 0
self.name = f"{adaptor_type}"
self.delim = delim

if len(self.i5_umi) > 1 or len(self.i7_umi) > 1:
raise UserWarning(
f"Adaptor {adaptor_type} has more than one UMI in either i5 or i7. This is not supported."
)
# Check if UMI is before or after i5 index
if len(self.i5_umi) > 0 and ">" + self.i5_umi[0] in self.i5.sequence:
self.i5_umi_after = int(re.search(ulen, self.i5_umi[0]).group(1))
self.i5_umi_after = int(
re.search(umi_length_token, self.i5_umi[0]).group(1)
)
elif len(self.i5_umi) > 0 and self.i5_umi[0] + "<" in self.i5.sequence:
self.i5_umi_before = int(re.search(ulen, self.i5_umi[0]).group(1))
self.i5_umi_before = int(
re.search(umi_length_token, self.i5_umi[0]).group(1)
)
elif len(self.i5_umi) > 0:
raise UserWarning(
f"Adaptor {adaptor_type} has UMI but it does not flank an index. This is not supported."
)
# Check if UMI is before or after i7 index
if len(self.i7_umi) > 0 and ">" + self.i7_umi[0] in self.i7.sequence:
self.i7_umi_after = int(re.search(ulen, self.i7_umi[0]).group(1))
self.i7_umi_after = int(
re.search(umi_length_token, self.i7_umi[0]).group(1)
)
elif len(self.i7_umi) > 0 and self.i7_umi[0] + "<" in self.i7.sequence:
self.i7_umi_before = int(re.search(ulen, self.i7_umi[0]).group(1))
self.i7_umi_before = int(
re.search(umi_length_token, self.i7_umi[0]).group(1)
)
elif len(self.i7_umi) > 0:
raise UserWarning(
f"Adaptor {adaptor_type} has UMI but it does not flank an index. This is not supported."
Expand All @@ -55,11 +69,15 @@ def get_i5_mask(self, insert_Ns=True):
ilen = len(self.i5_index) if self.i5_index is not None and insert_Ns else 0
ulen = max(self.i5_umi_after, self.i5_umi_before) if insert_Ns else 0
# Test if the index is specified in the adaptor sequence when it shouldn't be
if has_match(idelim, self.i5.sequence) and self.i5_index is None and insert_Ns:
if (
has_match(index_token, self.i5.sequence)
and self.i5_index is None
and insert_Ns
):
raise UserWarning("Adaptor has i5 but no sequence was specified")
if self.i5_index is not None or not insert_Ns:
new_i5 = re.sub(idelim, "N" * ilen, self.i5.sequence)
new_i5 = re.sub(udelim, "N" * ulen, new_i5)
new_i5 = re.sub(index_token, "N" * ilen, self.i5.sequence)
new_i5 = re.sub(umi_token, "N" * ulen, new_i5)
return new_i5
else:
return self.i5.sequence
Expand All @@ -68,11 +86,15 @@ def get_i7_mask(self, insert_Ns=True):
ilen = len(self.i7_index) if self.i7_index is not None and insert_Ns else 0
ulen = max(self.i7_umi_after, self.i7_umi_before) if insert_Ns else 0
# Test if the index is specified in the adaptor sequence when it shouldn't be
if has_match(idelim, self.i7.sequence) and self.i7_index is None and insert_Ns:
if (
has_match(index_token, self.i7.sequence)
and self.i7_index is None
and insert_Ns
):
raise UserWarning("Adaptor has i7 but no sequence was specified")
if self.i7_index is not None or not insert_Ns:
new_i7 = re.sub(idelim, "N" * ilen, self.i7.sequence)
new_i7 = re.sub(udelim, "N" * ulen, new_i7)
new_i7 = re.sub(index_token, "N" * ilen, self.i7.sequence)
new_i7 = re.sub(umi_token, "N" * ulen, new_i7)
return new_i7
else:
return self.i7.sequence
Expand All @@ -96,29 +118,29 @@ def __init__(self, sequence, name, delim, index):
self.len_before_index = 0

# Dynamically assign attributes
self.umi = re.findall(udelim, self.sequence)
self.umi = re.findall(umi_token, self.sequence)

# TODO Duplicated from Adaptor class, will be merged later
# Check if UMI is before or after index
if len(self.umi) > 0 and ">" + self.umi[0] in self.sequence:
# The index region is INDEX+UMI
self.umi_after = int(re.search(ulen, self.umi[0]).group(1))
self.len_before_index = len(idelim.split(self.sequence)[0])
self.len_after_index = len(udelim.split(self.sequence)[-1])
self.umi_after = int(re.search(umi_length_token, self.umi[0]).group(1))
self.len_before_index = len(index_token.split(self.sequence)[0])
self.len_after_index = len(umi_token.split(self.sequence)[-1])
elif len(self.umi) > 0 and self.umi[0] + "<" in self.sequence:
# The index region is UMI+INDEX
self.umi_before = int(re.search(ulen, self.umi[0]).group(1))
self.len_before_index = len(udelim.split(self.sequence)[0])
self.len_after_index = len(idelim.split(self.sequence)[-1])
self.umi_before = int(re.search(umi_length_token, self.umi[0]).group(1))
self.len_before_index = len(umi_token.split(self.sequence)[0])
self.len_after_index = len(index_token.split(self.sequence)[-1])
elif len(self.umi) > 0:
# TODO give details which adaptor has the problem
raise UserWarning(
"Found adaptor with UMI but it does not flank an index. This is not supported."
)
# Non UMI cases
elif has_match(idelim, self.sequence):
self.len_before_index = len(idelim.split(self.sequence)[0])
self.len_after_index = len(idelim.split(self.sequence)[-1])
elif has_match(index_token, self.sequence):
self.len_before_index = len(index_token.split(self.sequence)[0])
self.len_after_index = len(index_token.split(self.sequence)[-1])

def has_index(self):
return self.sequence.find(self.delim) > -1
Expand All @@ -139,7 +161,7 @@ def has_match(delim, seq):


# Fetch all adaptors
def load_adaptors(raw=False):
def load_adaptors(raw=False) -> list[Adaptor] | list[dict]:
p = importlib.resources.files("anglerfish.config").joinpath("adaptors.yaml")
assert isinstance(p, os.PathLike)

Expand All @@ -149,10 +171,16 @@ def load_adaptors(raw=False):

if raw:
return adaptors_raw
adaptors = []
for adaptor in adaptors_raw:
adaptors.append(
Adaptor(adaptors_raw, "N", adaptor, i7_index=None, i5_index=None)
)

return adaptors
else:
adaptors = []
for adaptor in adaptors_raw:
adaptors.append(
Adaptor(
adaptors=adaptors_raw,
delim="N",
adaptor_type=adaptor,
i7_index=None,
i5_index=None,
)
)
return adaptors

0 comments on commit 86bb6cc

Please sign in to comment.