Skip to content

Commit

Permalink
Make use of dataclasses
Browse files Browse the repository at this point in the history
    * Requires python 3.7+
  • Loading branch information
sposadac authored and DrYak committed May 8, 2021
1 parent 17705bd commit fa7be52
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 51 deletions.
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,5 @@ def run_tests(self):
# 'Programming Language :: Python :: 3',
# 'Programming Language :: Python :: 3.2',
# 'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6']
'Programming Language :: Python :: 3.7']
)
122 changes: 73 additions & 49 deletions src/shorah/shorah_snv.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import warnings
import shlex
from collections import namedtuple
from dataclasses import dataclass

import logging

Expand All @@ -64,6 +65,18 @@
logging.error('Executable fil not found, compile first.')
sys.exit('Executable fil not found, compile first.')

SNP_id = namedtuple('SNP_id', ['pos', 'var'])


@dataclass
class SNV:
chrom: str
pos: int
ref: str
var: str
freq: float = 0.0
support: float = 0.0


def segments(incr):
"""How many times is a window segment covered?
Expand Down Expand Up @@ -92,7 +105,8 @@ def segments(incr):


def deletion_length(seq):
"""Determines the length of the deletion
"""Determines the length of the deletion. Note that a sequence migth have
more than one deletion
seq: substring of the reconstructed haplotype
"""
count = 0
Expand All @@ -116,7 +130,6 @@ def parseWindow(line, ref1, threshold=0.9):
from re import search

snp = {}
SNP_id = namedtuple('SNP_id', ['pos', 'var'])
reads = 0.0
winFile, chrom, beg, end, cov = line.rstrip().split('\t')
del([winFile, cov])
Expand Down Expand Up @@ -175,30 +188,30 @@ def parseWindow(line, ref1, threshold=0.9):
if snp_id in snp:
# Aggregate counts for long deletions which
# are observed in multiple haplotypes
snp[snp_id][4] += av
snp[snp_id][5] += post * av
snp[snp_id].freq += av
snp[snp_id].support += post * av
else:
snp[snp_id] = [
snp[snp_id] = SNV(
chrom, pos, v, seq[idx:aux_del], av,
post * av]
post * av)
else:
tot_snv += 1
snp_id = SNP_id(pos=pos, var=seq[idx])
if snp_id in snp:
snp[snp_id][4] += av
snp[snp_id][5] += post * av
snp[snp_id].freq += av
snp[snp_id].support += post * av
else:
snp[snp_id] = [
chrom, pos, v, seq[idx], av, post * av]
snp[snp_id] = SNV(
chrom, pos, v, seq[idx], av, post * av)
pos += 1
if tot_snv > max_snv:
max_snv = tot_snv

logging.info('max number of snvs per sequence found: %d', max_snv)
# normalize
for k, v in snp.items():
v[5] /= v[4]
v[4] /= reads
v.support /= v.freq
v.freq /= reads

return snp

Expand All @@ -217,65 +230,75 @@ def getSNV(ref, segCov, incr, window_thresh=0.9):

# cycle over all windows reported in coverage.txt
for f in cov_file:
snp = parseWindow(f, ref, window_thresh) # snvs found on corresponding support file
# snvs found on corresponding support file
snp = parseWindow(f, ref, window_thresh)
beg = int(f.split('\t')[2])
end = int(f.split('\t')[3])
if incr == 1:
incr = end - beg + 1
single_window = True
logging.info('working on single window as invoked by amplian')

for k in sorted(snp.keys()):
for k, val in sorted(snp.items()):
# reference name, position, reference_base, mutated base,
# average number of reads, posterior times average n of reads
chrom, p, rf, var, av, post = snp[k]
if k in snpD:
if p < (beg + incr):
snpD[k][4][2] = av
snpD[k][5][2] = post
elif p < (beg + incr * 2):
snpD[k][4][1] = av
snpD[k][5][1] = post
if val.pos < (beg + incr):
snpD[k][4][2] = val.freq
snpD[k][5][2] = val.support
elif val.pos < (beg + incr * 2):
snpD[k][4][1] = val.freq
snpD[k][5][1] = val.support
else:
snpD[k][4][0] = av
snpD[k][5][0] = post
snpD[k][4][0] = val.freq
snpD[k][5][0] = val.support
else:
if p < (beg + incr):
cov = segCov[chrom + str(beg)]
if val.pos < (beg + incr):
cov = segCov[val.chrom + str(beg)]
if cov == [1, 1, 1]:
snpD[k] = [chrom, p, rf, var, ['-', '-', av],
['-', '-', post]]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['-', '-', val.freq], ['-', '-', val.support]]
elif cov == [1, 0, 0]:
snpD[k] = [chrom, p, rf, var, ['*', '*', av],
['*', '*', post]]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['*', '*', val.freq], ['*', '*', val.support]]
elif cov == [1, 1, 0]:
snpD[k] = [chrom, p, rf, var, ['*', '-', av],
['*', '-', post]]
elif p < (beg + incr * 2):
cov = segCov[chrom + str(beg + incr)]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['*', '-', val.freq], ['*', '-', val.support]]
elif val.pos < (beg + incr * 2):
cov = segCov[val.chrom + str(beg + incr)]
if cov == [1, 1, 1]:
snpD[k] = [chrom, p, rf, var, ['-', av, '-'],
['-', post, '-']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['-', val.freq, '-'], ['-', val.support, '-']]
elif cov == [1, 1, 0]:
snpD[k] = [chrom, p, rf, var, ['*', av, '-'],
['*', post, '-']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['*', val.freq, '-'], ['*', val.support, '-']]
elif cov == [0, 1, 1]:
snpD[k] = [chrom, p, rf, var, ['-', av, '*'],
['-', post, '*']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['-', val.freq, '*'], ['-', val.support, '*']]
elif cov == [0, 1, 0]:
snpD[k] = [chrom, p, rf, var, ['*', av, '*'],
['*', post, '*']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
['*', val.freq, '*'], ['*', val.support, '*']]
else:
cov = segCov[chrom + str(beg + incr * 2)]
cov = segCov[val.chrom + str(beg + incr * 2)]
if cov == [1, 1, 1]:
snpD[k] = [chrom, p, rf, var, [av, '-', '-'],
[post, '-', '-']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
[val.freq, '-', '-'], [val.support, '-', '-']]
elif cov == [0, 1, 1]:
snpD[k] = [chrom, p, rf, var, [av, '-', '*'],
[post, '-', '*']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
[val.freq, '-', '*'], [val.support, '-', '*']]
elif cov == [0, 0, 1]:
snpD[k] = [chrom, p, rf, var, [av, '*', '*'],
[post, '*', '*']]
snpD[k] = [
val.chrom, val.pos, val.ref, val.var,
[val.freq, '*', '*'], [val.support, '*', '*']]

if single_window:
break
Expand Down Expand Up @@ -356,7 +379,8 @@ def printRaw(snpD2, incr):
out1.close()


def sb_filter(in_bam, sigma, amplimode="", drop_indels="", max_coverage=100000):
def sb_filter(in_bam, sigma, amplimode="", drop_indels="",
max_coverage=100000):
"""run strand bias filter calling the external program 'fil'
"""
import subprocess
Expand Down

0 comments on commit fa7be52

Please sign in to comment.