Skip to content

Commit

Permalink
try to improve readability and reduce redundancy
Browse files Browse the repository at this point in the history
  • Loading branch information
kedhammar committed May 24, 2024
1 parent 110b4ae commit 792d03e
Showing 1 changed file with 123 additions and 120 deletions.
243 changes: 123 additions & 120 deletions anglerfish/demux/adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,100 +4,83 @@

import yaml

index_token = re.compile(r"\<N\>")
umi_token = re.compile(r"(\<U\d+\>)")
umi_length_token = re.compile(r"\<U(\d+)\>")
# These variables correspond to the tokens used in the adaptors.yaml file
# Not compiled with re.compile to enable pattern concatenation
INDEX_TOKEN = r"(<N>)"
UMI_TOKEN = r"(\<U\d+\>)"
UMI_LENGTH_TOKEN = r"\<U(\d+)\>"

# This pattern is used to validate the adaptor sequences in the config
VALID_SEQUENCE_TOKEN_PATTERN = re.compile(f"^({INDEX_TOKEN}|{UMI_TOKEN}|([ACTG]*))*$")


class Adaptor:
def __init__(self, adaptors, delim, adaptor_type, i7_index=None, i5_index=None):
# Adaptor
self.name = f"{adaptor_type}"
self.delim = delim
def __init__(
self,
name: str,
i5_token: str,
i7_token: str,
i7_index: str | None = None,
i5_index: str | None = None,
):
self.name = name
self.index_token: str = INDEX_TOKEN

# Indices
# i5 attributes
self.i5 = AdaptorPart(
adaptors[adaptor_type]["i5"], adaptor_type, delim, i5_index
sequence_token=i5_token,
name=name,
index=i5_index,
)
self.i5_index: str | None = i5_index
self.i5_umi = self.i5.umi_token
self.i5_umi_before = self.i5.len_umi_before_index
self.i5_umi_after = self.i5.len_umi_after_index

# i7 attributes
self.i7 = AdaptorPart(
adaptors[adaptor_type]["i7"], adaptor_type, delim, i7_index
sequence_token=i7_token,
name=name,
index=i7_index,
)
self.i5_index = i5_index
self.i7_index = i7_index

# UMIs
self.i5_umi = re.findall(umi_token, self.i5.sequence)
self.i5_umi_before = 0
self.i5_umi_after = 0

self.i7_umi = re.findall(umi_token, self.i7.sequence)
self.i7_umi_before = 0
self.i7_umi_after = 0

if len(self.i5_umi) > 1 or len(self.i7_umi) > 1:
raise UserWarning(
f"Adaptor {adaptor_type} has more than one UMI in either i5 or i7. This is not supported."
)
# Check if UMI is before or after i5 index
if len(self.i5_umi) > 0 and ">" + self.i5_umi[0] in self.i5.sequence:
self.i5_umi_after = int(
re.search(umi_length_token, self.i5_umi[0]).group(1)
)
elif len(self.i5_umi) > 0 and self.i5_umi[0] + "<" in self.i5.sequence:
self.i5_umi_before = int(
re.search(umi_length_token, self.i5_umi[0]).group(1)
)
elif len(self.i5_umi) > 0:
raise UserWarning(
f"Adaptor {adaptor_type} has UMI but it does not flank an index. This is not supported."
)
# Check if UMI is before or after i7 index
if len(self.i7_umi) > 0 and ">" + self.i7_umi[0] in self.i7.sequence:
self.i7_umi_after = int(
re.search(umi_length_token, self.i7_umi[0]).group(1)
)
elif len(self.i7_umi) > 0 and self.i7_umi[0] + "<" in self.i7.sequence:
self.i7_umi_before = int(
re.search(umi_length_token, self.i7_umi[0]).group(1)
)
elif len(self.i7_umi) > 0:
raise UserWarning(
f"Adaptor {adaptor_type} has UMI but it does not flank an index. This is not supported."
)
self.i7_index: str | None = i7_index
self.i7_umi = re.findall(UMI_TOKEN, self.i7.sequence_token)
self.i7_umi_before = self.i7.len_umi_before_index
self.i7_umi_after = self.i7.len_umi_after_index

def get_i5_mask(self, insert_Ns=True):
ilen = len(self.i5_index) if self.i5_index is not None and insert_Ns else 0
ulen = max(self.i5_umi_after, self.i5_umi_before) if insert_Ns else 0
# Test if the index is specified in the adaptor sequence when it shouldn't be
if (
has_match(index_token, self.i5.sequence)
has_match(INDEX_TOKEN, self.i5.sequence_token)
and self.i5_index is None
and insert_Ns
):
raise UserWarning("Adaptor has i5 but no sequence was specified")
if self.i5_index is not None or not insert_Ns:
new_i5 = re.sub(index_token, "N" * ilen, self.i5.sequence)
new_i5 = re.sub(umi_token, "N" * ulen, new_i5)
new_i5 = re.sub(INDEX_TOKEN, "N" * ilen, self.i5.sequence_token)
new_i5 = re.sub(UMI_TOKEN, "N" * ulen, new_i5)
return new_i5
else:
return self.i5.sequence
return self.i5.sequence_token

def get_i7_mask(self, insert_Ns=True):
ilen = len(self.i7_index) if self.i7_index is not None and insert_Ns else 0
ulen = max(self.i7_umi_after, self.i7_umi_before) if insert_Ns else 0
# Test if the index is specified in the adaptor sequence when it shouldn't be
if (
has_match(index_token, self.i7.sequence)
has_match(INDEX_TOKEN, self.i7.sequence_token)
and self.i7_index is None
and insert_Ns
):
raise UserWarning("Adaptor has i7 but no sequence was specified")
if self.i7_index is not None or not insert_Ns:
new_i7 = re.sub(index_token, "N" * ilen, self.i7.sequence)
new_i7 = re.sub(umi_token, "N" * ulen, new_i7)
new_i7 = re.sub(INDEX_TOKEN, "N" * ilen, self.i7.sequence_token)
new_i7 = re.sub(UMI_TOKEN, "N" * ulen, new_i7)
return new_i7
else:
return self.i7.sequence
return self.i7.sequence_token

def get_fastastring(self, insert_Ns=True):
fasta_i5 = f">{self.name}_i5\n{self.get_i5_mask(insert_Ns)}\n"
Expand All @@ -106,85 +89,105 @@ def get_fastastring(self, insert_Ns=True):


class AdaptorPart:
"""This class is used for the i5 or i7 adaptor.
"""

def __init__(self, sequence, name, delim, index):
self.sequence = sequence
self.name = name
self.delim = delim
self.index = index
self.umi_after = 0
self.umi_before = 0
self.len_after_index = 0
self.len_before_index = 0

# Dynamically assign attributes
self.umi = re.findall(umi_token, self.sequence)

# TODO Duplicated from Adaptor class, will be merged later
# Check if UMI is before or after index
if len(self.umi) > 0 and ">" + self.umi[0] in self.sequence:
# The index region is INDEX+UMI
self.umi_after = int(re.search(umi_length_token, self.umi[0]).group(1))
self.len_before_index = len(index_token.split(self.sequence)[0])
self.len_after_index = len(umi_token.split(self.sequence)[-1])
elif len(self.umi) > 0 and self.umi[0] + "<" in self.sequence:
# The index region is UMI+INDEX
self.umi_before = int(re.search(umi_length_token, self.umi[0]).group(1))
self.len_before_index = len(umi_token.split(self.sequence)[0])
self.len_after_index = len(index_token.split(self.sequence)[-1])
elif len(self.umi) > 0:
# TODO give details which adaptor has the problem
raise UserWarning(
"Found adaptor with UMI but it does not flank an index. This is not supported."
)
# Non UMI cases
elif has_match(index_token, self.sequence):
self.len_before_index = len(index_token.split(self.sequence)[0])
self.len_after_index = len(index_token.split(self.sequence)[-1])
"""This class is used for the i5 or i7 adaptor."""

def __init__(self, sequence_token: str, name: str, index: str | None):
# Assign attributes from args
self.sequence_token: str = sequence_token
self.name: str = name
self.index: str = index

# Parse index, if any
self.has_index: bool = has_match(INDEX_TOKEN, self.sequence_token)

# Parse UMI, if any
umi_token_matches = re.findall(UMI_TOKEN, self.sequence_token)
if umi_token_matches > 0:
assert (
umi_token_matches == 1
), f"Multiple UMIs found in {self.name}, not supported."
self.umi_token = umi_token_matches[0]

# Check if UMI is before or after index
if INDEX_TOKEN + UMI_TOKEN in self.sequence_token:
# The index region is INDEX+UMI
self.len_umi_after_index = int(
re.search(UMI_LENGTH_TOKEN, self.umi_token).group(1)
)
self.len_before_index = len(INDEX_TOKEN.split(self.sequence_token)[0])
self.len_after_index = len(UMI_TOKEN.split(self.sequence_token)[-1])

def has_index(self):
return self.sequence.find(self.delim) > -1
elif INDEX_TOKEN + UMI_TOKEN in self.sequence_token:
# The index region is UMI+INDEX
self.len_umi_before_index = int(
re.search(UMI_LENGTH_TOKEN, self.umi_token[0]).group(1)
)
self.len_before_index = len(UMI_TOKEN.split(self.sequence_token)[0])
self.len_after_index = len(INDEX_TOKEN.split(self.sequence_token)[-1])

def len_before_index_region(self):
return self.len_before_index
else:
raise UserWarning(
f"Found adaptor {self.name} with UMI but it does not flank an index. This is not supported."
)

def len_after_index_region(self):
return self.len_after_index
else:
self.umi_token = None
self.len_before_index = len(INDEX_TOKEN.split(self.sequence_token)[0])
self.len_after_index = len(INDEX_TOKEN.split(self.sequence_token)[-1])


def has_match(delim, seq):
"""General function to check if a string contains a pattern.
"""
match = re.search(delim, seq)
def has_match(pattern: re.Pattern, query: str) -> bool:
"""General function to check if a string contains a pattern."""
match = re.search(pattern, query)
if match is None:
return False
return True


def load_adaptors(raw=False) -> list[Adaptor] | list[dict]:
def validate_adaptors(adaptors_dict: dict):
"""Validate that the adaptor config sequences only consist of expected patterns."""

for adaptor_name in adaptors_dict:
for i in ["i5", "i7"]:
sequence_token = adaptors_dict[adaptor_name][i]
match = re.match(VALID_SEQUENCE_TOKEN_PATTERN, sequence_token)
if not match:
raise UserWarning(
f"Adaptor {adaptor_name} has an invalid sequence for {i}: {sequence_token}. Does not conform to the pattern {VALID_SEQUENCE_TOKEN_PATTERN}."
)


def load_adaptors(raw: bool = False) -> list[Adaptor] | dict:
"""Fetch all adaptors.
Return them as a list of adaptor objects or optionally as a raw yaml dict.
"""
p = importlib.resources.files("anglerfish.config").joinpath("adaptors.yaml")
assert isinstance(p, os.PathLike)

adaptors_raw = []
with open(p) as f:
adaptors_raw = yaml.safe_load(f)
# Load adaptors from config file
adaptors_config_path = importlib.resources.files("anglerfish.config").joinpath(
"adaptors.yaml"
)
assert isinstance(adaptors_config_path, os.PathLike)

with open(adaptors_config_path) as f:
adaptors_dict = yaml.safe_load(f)

# Validate input
validate_adaptors(adaptors_dict)

# Optionally, return raw dict
if raw:
return adaptors_raw
return adaptors_dict

# By default, return list of Adaptor objects
else:
adaptors = []
for adaptor in adaptors_raw:
for adaptor_name in adaptors_dict:
adaptors.append(
Adaptor(
adaptors=adaptors_raw,
delim="N",
adaptor_type=adaptor,
i7_index=None,
i5_index=None,
name=adaptor_name,
i5_token=adaptors_dict[adaptor_name]["i5"],
i7_token=adaptors_dict[adaptor_name]["i7"],
)
)
return adaptors

0 comments on commit 792d03e

Please sign in to comment.