Skip to content

Commit

Permalink
Merge pull request #163 from BjornFJohansson/issue_157
Browse files Browse the repository at this point in the history
Issue 157
  • Loading branch information
BjornFJohansson authored Dec 8, 2023
2 parents 97183a8 + 31f36de commit ffc55af
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 152 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,6 @@ docs/_build/
target/

.eggs

# mac files
.DS_Store
195 changes: 91 additions & 104 deletions src/pydna/dseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import math as _math

from pydna.seq import Seq as _Seq
from Bio.Restriction import FormattedSeq as _FormattedSeq
from Bio.Seq import _translate_str

from pydna._pretty import pretty_str as _pretty_str
Expand All @@ -32,7 +31,6 @@
from pydna.utils import flatten as _flatten
from pydna.common_sub_strings import common_sub_strings as _common_sub_strings

from operator import itemgetter as _itemgetter
from Bio.Restriction import RestrictionBatch as _RestrictionBatch
from Bio.Restriction import CommOnly

Expand Down Expand Up @@ -1443,114 +1441,103 @@ def cut(self, *enzymes):
"""

pad = "n" * 50
cutsites = self.get_cutsites(*enzymes)
cutsite_pairs = self.get_cutsite_pairs(cutsites)
return tuple(self.apply_cut(*cs) for cs in cutsite_pairs)

if self.circular:
dsseq = Dseq.from_string(
self._data.decode("ASCII"),
# linear=True,
circular=False,
)
else:
dsseq = self.mung()

if len(enzymes) == 1 and hasattr(enzymes[0], "intersection"):
# argument is probably a RestrictionBatch
enzymecuts = []
for e in enzymes[0]:
cuts = e.search(_Seq(pad + dsseq.watson + dsseq.watson[: e.size - 1] + pad) if self.circular else dsseq)
enzymecuts.append((cuts, e))
enzymecuts.sort()
enzymes = [e for (c, e) in enzymecuts if c]
else:
# argument is probably a list of restriction enzymes
enzymes = [
e
for e in list(dict.fromkeys(_flatten(enzymes)))
if e.search(_Seq(pad + dsseq.watson + dsseq.watson[: e.size - 1] + pad) if self.circular else dsseq)
] # flatten
def get_cutsites(self, *enzymes):
"""Returns a list of cutsites, represented by tuples ((cut_watson, cut_crick), enzyme), sorted by where they cut on the 5' strand.
Parameters
----------
enzymes : Union[_RestrictionBatch,list[_RestrictionType]]
Returns
-------
list[tuple[tuple[int,int], _RestrictionType]]
Examples
--------
>>> from Bio.Restriction import EcoRI
>>> from pydna.dseq import Dseq
>>> seq = Dseq('AAGAATTCAAGAATTC')
>>> seq.get_cutsites(EcoRI)
[((3, 7), EcoRI), ((11, 15), EcoRI)]
TODO: check that the cutsite does not fall on the ovhg and that cutsites don't crash
"""

if len(enzymes) == 1 and isinstance(enzymes[0], _RestrictionBatch):
# argument is probably a RestrictionBatch
enzymes = [e for e in enzymes[0]]

enzymes = _flatten(enzymes)
out = list()
for e in enzymes:
# Positions are 1-based, so we subtract 1 to get 0-based positions
cuts_watson = [c - 1 for c in e.search(self, linear=(not self.circular))]
cuts_crick = [(c - e.ovhg) % len(self) for c in cuts_watson]

if not enzymes:
return ()
out += [((w, c), e) for w, c in zip(cuts_watson, cuts_crick)]

return sorted(out)

def apply_cut(self, left_cut, right_cut):

left_watson, left_crick = left_cut[0] if left_cut is not None else ((self.ovhg, 0) if self.ovhg > 0 else (0, -self.ovhg))
ovhg = self.ovhg if left_cut is None else left_cut[1].ovhg
right_watson, right_crick = right_cut[0] if right_cut is not None else (len(self.watson), len(self.crick))
return Dseq(
str(self[left_watson:right_watson]),
# The line below could be easier to understand as _rc(str(self[left_crick:right_crick])), but it does not preserve the case
str(self.reverse_complement()[len(self) - right_crick:len(self) - left_crick]),
ovhg=ovhg,
)

def get_cutsite_pairs(self, cutsites):
""" Pairs the cutsites 2 by 2 to render the edges of the resulting fragments.
Special cases:
- Single cutsite on circular sequence: returns a pair where both cutsites are the same
- Linear sequence:
- creates a new left_cut on the first pair equal to `None` to represent the left edge of the sequence as it is.
- creates a new right_cut on the last pair equal to `None` to represent the right edge of the sequence as it is.
- In both new cuts, the enzyme is set to None to indicate that the cut is not made by an enzyme.
Parameters
----------
cutsites : list[tuple[tuple[int,int], _RestrictionType]]
Returns
-------
list[tuple[tuple[tuple[int,int], _RestrictionType]|None],tuple[tuple[int,int], _RestrictionType]|None]
Examples
--------
>>> from Bio.Restriction import EcoRI
>>> from pydna.dseq import Dseq
>>> seq = Dseq('AAGAATTCAAGAATTC')
>>> seq.get_cutsite_pairs(seq.get_cutsites(EcoRI))
[(None, ((3, 7), EcoRI)), (((3, 7), EcoRI), ((11, 15), EcoRI)), (((11, 15), EcoRI), None)]
>>> seq = Dseq('AAGAATTCAAGAATTC', circular=True)
>>> seq.get_cutsite_pairs(seq.get_cutsites(EcoRI))
[(((3, 7), EcoRI), ((11, 15), EcoRI)), (((11, 15), EcoRI), ((3, 7), EcoRI))]
>>> seq = Dseq('AAGAATTCAA', circular=True)
>>> seq.get_cutsite_pairs(seq.get_cutsites(EcoRI))
[(((3, 7), EcoRI), ((3, 7), EcoRI))]
"""
if len(cutsites) == 0:
return []
if not self.circular:
frags = [self]
cutsites = [None, *cutsites, None]
else:
ln = len(self)
for e in enzymes:
wpos = [x - len(pad) - 1 for x in e.search(_Seq(pad + self.watson + self.watson[: e.size - 1]) + pad)][
::-1
]
cpos = [x - len(pad) - 1 for x in e.search(_Seq(pad + self.crick + self.crick[: e.size - 1]) + pad)][
::-1
]

for w, c in _itertools.product(wpos, cpos):
if w % len(self) == (self.length - c + e.ovhg) % len(self):
frags = [
Dseq(
self.watson[w % ln :] + self.watson[: w % ln],
self.crick[c % ln :] + self.crick[: c % ln],
ovhg=e.ovhg,
pos=min(w, len(dsseq) - c),
)
]
# breakpoint()
break
else:
continue
break
# Add the first cutsite at the end, for circular cuts
cutsites.append(cutsites[0])

newfrags = []

# print(repr(frags[0]))
# print(frags[0].pos)

for enz in enzymes:
for frag in frags:
ws = [x - 1 for x in enz.search(_Seq(frag.watson + "n"))]
cs = [x - 1 for x in enz.search(_Seq(frag.crick + "n"))]

sitepairs = [
(sw, sc)
for sw, sc in _itertools.product(ws, cs[::-1])
if (
sw + max(0, frag.ovhg) - max(0, enz.ovhg)
== len(frag.crick) - sc - min(0, frag.ovhg) + min(0, enz.ovhg)
)
]

sitepairs.append((self.length, 0))

w2, c1 = sitepairs[0]
newfrags.append(Dseq(frag.watson[:w2], frag.crick[c1:], ovhg=frag.ovhg, pos=frag.pos))

for (w1, c2), (w2, c1) in zip(sitepairs[:-1], sitepairs[1:]):
newfrags.append(
Dseq(
frag.watson[w1:w2],
frag.crick[c1:c2],
ovhg=enz.ovhg,
pos=frag.pos + w1 - max(0, enz.ovhg) + max(0, frag.ovhg),
)
)
frags = newfrags
newfrags = []

return tuple(frags)

def _firstcut(self, *enzymes):
rb = _RestrictionBatch(_flatten(enzymes))
watson = _FormattedSeq(_Seq(self.watson), linear=False)
crick = _FormattedSeq(_Seq(self.crick), linear=False)
enzdict = dict(sorted(rb.search(watson).items(), key=_itemgetter(1)))
ln = self.length
for enzyme, wposlist in enzdict.items():
for cpos in enzyme.search(crick)[::-1]:
for wpos in wposlist:
if cpos == (ln - wpos + enzyme.ovhg + 2) or ln:
return (wpos - 1, cpos - 1, enzyme.ovhg)
return ()
return list(zip(cutsites, cutsites[1:]))


if __name__ == "__main__":
Expand Down
67 changes: 29 additions & 38 deletions src/pydna/dseqrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,10 +890,19 @@ def __getitem__(self, sl):
sl_stop = sl.stop or len(self.seq) # 1

if not self.circular or sl_start < sl_stop:
# TODO: special case for sl_end == 0 in circular sequences
# related to https://github.com/BjornFJohansson/pydna/issues/161
if self.circular and sl.stop == 0:
sl = slice(sl.start, len(self.seq), sl.step)
answer.features = super().__getitem__(sl).features
elif self.circular and sl_start > sl_stop:
answer.features = self.shifted(sl_start).features
answer.features = [f for f in answer.features if f.location.parts[-1].end <= answer.seq.length]
# origin-spanning features should only be included after shifting
# in cases where the slice comprises the entire sequence, but then
# sl_start == sl_stop and the second condition is not met
answer.features = [f for f in answer.features if (
f.location.parts[-1].end <= answer.seq.length and
f.location.parts[0].start <= f.location.parts[-1].end)]
else:
answer = Dseqrecord("")
identifier = "part_{id}".format(id=self.id)
Expand Down Expand Up @@ -1321,47 +1330,29 @@ def cut(self, *enzymes):
"""
from pydna.utils import shift_location

features = _copy.deepcopy(self.features)
cutsites = self.seq.get_cutsites(*enzymes)
cutsite_pairs = self.seq.get_cutsite_pairs(cutsites)
return tuple(self.apply_cut(*cs) for cs in cutsite_pairs)

if self.circular:
try:
x, y, oh = self.seq._firstcut(*enzymes)
except ValueError:
return ()
dsr = _Dseq(
self.seq.watson[x:] + self.seq.watson[:x],
self.seq.crick[y:] + self.seq.crick[:y],
oh,
)
newstart = min(x, (self.seq.length - y))
for f in features:
f.location = shift_location(f.location, -newstart, self.seq.length)
f.location, *rest = f.location.parts
for part in rest:
if 0 in part:
f.location._end = part.end + self.seq.length
else:
f.location += part
frags = dsr.cut(enzymes) or [dsr]
def apply_cut(self, left_cut, right_cut):
dseq = self.seq.apply_cut(left_cut, right_cut)
# TODO: maybe remove depending on https://github.com/BjornFJohansson/pydna/issues/161

if left_cut == right_cut:
# Not really a cut, but to handle the general case
if left_cut is None:
features = self.features
features = self.shifted(min(left_cut[0])).features
else:
frags = self.seq.cut(enzymes)
if not frags:
return ()
dsfs = []
for fr in frags:
dsf = Dseqrecord(fr, n=self.n)
start = fr.pos
end = fr.pos + fr.length
dsf.features = [
_copy.deepcopy(fe) for fe in features if start <= fe.location.start and end >= fe.location.end
]
for feature in dsf.features:
feature.location += -start
dsfs.append(dsf)
return tuple(dsfs)
left_watson, left_crick = left_cut[0] if left_cut is not None else (0, 0)
right_watson, right_crick = right_cut[0] if right_cut is not None else (None, None)

left_edge = left_crick if dseq.ovhg > 0 else left_watson
right_edge = right_watson if dseq.watson_ovhg() > 0 else right_crick
features = self[left_edge:right_edge].features

return Dseqrecord(dseq, features=features)

if __name__ == "__main__":
cache = _os.getenv("pydna_cache")
Expand Down
21 changes: 12 additions & 9 deletions tests/test_module_dseq.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ def test_cut1():

assert first + second + third + fourth == lds

assert (first.pos, second.pos, third.pos, fourth.pos) == (0, 4, 13, 22)
# TODO: remove
# assert (first.pos, second.pos, third.pos, fourth.pos) == (0, 4, 13, 22)

frags2 = lds.cut((KpnI, ApaI, TaiI))

first2, second2, third2, fourth2 = frags2

assert first2 + second2 + third2 + fourth2 == lds

assert (first2.pos, second2.pos, third2.pos, fourth2.pos) == (0, 4, 13, 21)
# TODO: remove
# assert (first2.pos, second2.pos, third2.pos, fourth2.pos) == (0, 4, 13, 21)


def test_cas9():
Expand Down Expand Up @@ -164,6 +166,7 @@ def cut_and_religate_Dseq(seq_string, enz, top):
if not frags:
return
a = frags.pop(0)

for f in frags:
a += f
if not top:
Expand Down Expand Up @@ -541,12 +544,12 @@ def test_dseq():
assert obj.cut(rb) == obj.cut(BamHI, BglII) == obj.cut(BglII, BamHI)

obj = Dseq("ggatccAGATCT", circular=True)

assert obj.cut(rb) == obj.cut(BamHI, BglII) != obj.cut(BglII, BamHI)
# TODO: address this test change Related to https://github.com/BjornFJohansson/pydna/issues/78
assert obj.cut(rb) == obj.cut(BamHI, BglII) == obj.cut(BglII, BamHI)

obj = Dseq("AGATCTggatcc", circular=True)

assert obj.cut(rb) == obj.cut(BglII, BamHI) != obj.cut(BamHI, BglII)
assert obj.cut(rb) == obj.cut(BglII, BamHI) == obj.cut(BamHI, BglII)


def test_Dseq_slicing():
Expand Down Expand Up @@ -575,7 +578,7 @@ def test_Dseq_slicing2():
from Bio.Restriction import BamHI, EcoRI, KpnI

a = Dseq("aaGGATCCnnnnnnnnnGAATTCccc", circular=True)

# TODO: address this test change Related to https://github.com/BjornFJohansson/pydna/issues/78
assert (
a.cut(
EcoRI,
Expand All @@ -586,7 +589,7 @@ def test_Dseq_slicing2():
BamHI,
EcoRI,
KpnI,
)[::-1]
)
)


Expand Down Expand Up @@ -718,8 +721,8 @@ def test_misc():
a, b = x.cut(NotI)

z = (a + b).looped()

assert z.shifted(5) == x
# TODO: address this test change Related to https://github.com/BjornFJohansson/pydna/issues/78
assert z.shifted(-6) == x


def test_cut_missing_enzyme():
Expand Down
Loading

0 comments on commit ffc55af

Please sign in to comment.