diff --git a/src/cutadapt/adapters.py b/src/cutadapt/adapters.py index a97295d2..deba7739 100644 --- a/src/cutadapt/adapters.py +++ b/src/cutadapt/adapters.py @@ -496,10 +496,6 @@ class Matchable(ABC): def __init__(self, name: Optional[str], *args, **kwargs): self.name = name - @abstractmethod - def enable_debug(self): - pass - @abstractmethod def match_to(self, sequence: str): pass @@ -520,6 +516,10 @@ def create_statistics(self) -> AdapterStatistics: def descriptive_identifier(self) -> str: pass + @abstractmethod + def enable_debug(self) -> None: + pass + class SingleAdapter(Adapter, ABC): """ @@ -1222,11 +1222,12 @@ def match_to(self, sequence: str) -> Optional[SingleMatch]: return best_match -class IndexedAdapters(Matchable, ABC): +class AdapterIndex: """ + Index of multiple adapters + Represent multiple adapters of the same type at once and use an index data structure - to speed up matching. This acts like a "normal" Adapter as it provides a match_to - method, but is faster with lots of adapters. + to speed up matching. This is faster than iterating over multiple adapters. There are quite a few restrictions: - the error rate allows at most 2 mismatches @@ -1236,15 +1237,14 @@ class IndexedAdapters(Matchable, ABC): Use the is_acceptable() method to check individual adapters. """ - AdapterIndex = Dict[str, Tuple[SingleAdapter, int, int]] + AdapterIndexDict = Dict[str, Tuple[SingleAdapter, int, int]] - def __init__(self, adapters): + def __init__(self, adapters, prefix: bool): """All given adapters must be of the same type""" - super().__init__(name="indexed_adapters") if not adapters: raise ValueError("Adapter list is empty") for adapter in adapters: - self._accept(adapter) + self._accept(adapter, prefix) self._adapters = adapters self._lengths, self._index = self._make_index() logger.debug( @@ -1255,25 +1255,57 @@ def __init__(self, adapters): self.match_to = self._match_to_one_length else: self.match_to = self._match_to_multiple_lengths - self._make_affix = self._get_make_affix() + if prefix: + self._make_affix = self._make_prefix + self._make_match = self._make_prefix_match + else: + self._make_affix = self._make_suffix + self._make_match = self._make_suffix_match def __repr__(self): return f"{self.__class__.__name__}(adapters={self._adapters!r})" - def match_to(self, sequence: str): - """Never called because it gets overwritten in __init__""" + @staticmethod + def _make_suffix(s, n): + return s[-n:] - @abstractmethod - def _get_make_affix(self): - pass + @staticmethod + def _make_prefix(s, n): + return s[:n] - @abstractmethod - def _make_match(self, adapter, length, matches, errors, sequence) -> SingleMatch: - pass + @staticmethod + def _make_prefix_match(adapter, length, score, errors, sequence): + return RemoveBeforeMatch( + astart=0, + astop=len(adapter.sequence), + rstart=0, + rstop=length, + score=score, + errors=errors, + adapter=adapter, + sequence=sequence, + ) + + @staticmethod + def _make_suffix_match(adapter, length, score, errors, sequence): + return RemoveAfterMatch( + astart=0, + astop=len(adapter.sequence), + rstart=len(sequence) - length, + rstop=len(sequence), + score=score, + errors=errors, + adapter=adapter, + sequence=sequence, + ) @classmethod - def _accept(cls, adapter): + def _accept(cls, adapter: SingleAdapter, prefix: bool): """Raise a ValueError if the adapter is not acceptable""" + if prefix and not isinstance(adapter, PrefixAdapter): + raise ValueError("Only 5' anchored adapters are allowed") + elif not prefix and not isinstance(adapter, SuffixAdapter): + raise ValueError("Only 3' anchored adapters are allowed") if adapter.read_wildcards: raise ValueError("Wildcards in the read not supported") if adapter.adapter_wildcards: @@ -1283,7 +1315,7 @@ def _accept(cls, adapter): raise ValueError("Error rate too high") @classmethod - def is_acceptable(cls, adapter): + def is_acceptable(cls, adapter: SingleAdapter, prefix: bool): """ Return whether this adapter is acceptable for being used in an index @@ -1291,12 +1323,12 @@ def is_acceptable(cls, adapter): or would lead to a very large index. """ try: - cls._accept(adapter) + cls._accept(adapter, prefix) except ValueError: return False return True - def _make_index(self) -> Tuple[List[int], "AdapterIndex"]: + def _make_index(self) -> Tuple[List[int], "AdapterIndexDict"]: start_time = time.time() logger.info("Building index of %s adapters ...", len(self._adapters)) index: Dict[str, Tuple[SingleAdapter, int, int]] = dict() @@ -1434,62 +1466,25 @@ def _lookup_with_n(self, affix): return None return adapter, match.errors, match.score - def enable_debug(self): - pass - - -class IndexedPrefixAdapters(IndexedAdapters): - @classmethod - def _accept(cls, adapter): - if not isinstance(adapter, PrefixAdapter): - raise ValueError("Only 5' anchored adapters are allowed") - return super()._accept(adapter) - - def _make_match(self, adapter, length, score, errors, sequence): - return RemoveBeforeMatch( - astart=0, - astop=len(adapter.sequence), - rstart=0, - rstop=length, - score=score, - errors=errors, - adapter=adapter, - sequence=sequence, - ) - - def _get_make_affix(self): - return self._make_prefix - - @staticmethod - def _make_prefix(s, n): - return s[:n] +class IndexedPrefixAdapters(Matchable): + def __init__(self, adapters): + super().__init__(name="indexed_prefix_adapters") + self._index = AdapterIndex(adapters, prefix=True) + self.match_to = self._index.match_to -class IndexedSuffixAdapters(IndexedAdapters): - @classmethod - def _accept(cls, adapter): - if not isinstance(adapter, SuffixAdapter): - raise ValueError("Only anchored 3' adapters are allowed") - return super()._accept(adapter) + def match_to(self, sequence: str): + pass - def _make_match(self, adapter, length, score, errors, sequence): - return RemoveAfterMatch( - astart=0, - astop=len(adapter.sequence), - rstart=len(sequence) - length, - rstop=len(sequence), - score=score, - errors=errors, - adapter=adapter, - sequence=sequence, - ) - def _get_make_affix(self): - return self._make_suffix +class IndexedSuffixAdapters(Matchable): + def __init__(self, adapters): + super().__init__(name="indexed_suffix_adapters") + self._index = AdapterIndex(adapters, prefix=False) + self.match_to = self._index.match_to - @staticmethod - def _make_suffix(s, n): - return s[-n:] + def match_to(self, sequence: str): + pass def warn_duplicate_adapters(adapters): diff --git a/src/cutadapt/modifiers.py b/src/cutadapt/modifiers.py index e45ea118..fc6f660e 100644 --- a/src/cutadapt/modifiers.py +++ b/src/cutadapt/modifiers.py @@ -21,6 +21,7 @@ Match, remainder, Adapter, + AdapterIndex, ) from .tokenizer import tokenize_braces, TokenizeError, Token, BraceToken from .info import ModificationInfo @@ -162,9 +163,9 @@ def _split_adapters( suffix: List[SingleAdapter] = [] other: List[SingleAdapter] = [] for a in adapters: - if IndexedPrefixAdapters.is_acceptable(a): + if AdapterIndex.is_acceptable(a, prefix=True): prefix.append(a) - elif IndexedSuffixAdapters.is_acceptable(a): + elif AdapterIndex.is_acceptable(a, prefix=False): suffix.append(a) else: other.append(a)