diff --git a/.gitmodules b/.gitmodules
index 51c058a..12ddf0a 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,6 @@
[submodule "miqScoreNGSReadCountPublic"]
path = miqScoreNGSReadCountPublic
url = https://github.com/Zymo-Research/miqScoreNGSReadCountPublic.git
+[submodule "miqScoreShotgunPublicSupport"]
+ path = miqScoreShotgunPublicSupport
+ url = https://github.com/Zymo-Research/miqScoreShotgunPublicSupport.git
diff --git a/miqScoreShotgunPublicSupport b/miqScoreShotgunPublicSupport
new file mode 160000
index 0000000..236e250
--- /dev/null
+++ b/miqScoreShotgunPublicSupport
@@ -0,0 +1 @@
+Subproject commit 236e250b33824a52c7f3b833ce3241eff1a101b7
diff --git a/miqScoreShotgunPublicSupport/__init__.py b/miqScoreShotgunPublicSupport/__init__.py
deleted file mode 100644
index f4a6318..0000000
--- a/miqScoreShotgunPublicSupport/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-__all__ = ["parameters",
- "projectData",
- "formatReaders",
- "reporting",
- "alignmentAnalysis"]
-
-from . import parameters
-from . import projectData
-from . import formatReaders
-from . import reporting
-from . import alignmentAnalysis
diff --git a/miqScoreShotgunPublicSupport/alignmentAnalysis/__init__.py b/miqScoreShotgunPublicSupport/alignmentAnalysis/__init__.py
deleted file mode 100644
index 065eb84..0000000
--- a/miqScoreShotgunPublicSupport/alignmentAnalysis/__init__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-__all__ = ["alignmentAnalysisPE",
- "alignmentAnalysisSE",
- "bwaHandler",
- "minimap2"]
-
-from . import alignmentAnalysisPE
-from . import alignmentAnalysisSE
-from . import bwaHandler
-from . import minimap2
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisPE.py b/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisPE.py
deleted file mode 100644
index ebe02cb..0000000
--- a/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisPE.py
+++ /dev/null
@@ -1,301 +0,0 @@
-import os
-import pysam
-import typing
-
-'''
-Read species calls:
-Chimera_like: Both paired ends disagree, no good single-species explanation
-Poor_quality: Depending on if call was being made on single or both ends, quality of read(s) for making call was insufficient
-Unmapped_reads: No usable alignment data
-Ambiguous_reads: Both reads support two or more species without a big enough delta in confidence to be decisive
-If a species is given, that was a successful call
-Under certain parameter settings, a tuple of multiple species may be given indicating reads that support multiple species
-'''
-
-
-class ReadAlignmentData(object):
- __slots__ = ["qname",
- "mapq",
- "secondaryAlignment",
- "isRead2",
- "species"]
-
- def __init__(self, contig:str, mapq:int, qname:str, secondaryAlignment:bool, isRead2:bool):
- self.species = extractSpecies(contig)
- self.mapq = mapq
- self.qname = qname
- self.secondaryAlignment = secondaryAlignment
- self.isRead2 = isRead2
-
- def __str__(self):
- secondary = ""
- read = "R1"
- if self.isRead2:
- read = "R2"
- if self.secondaryAlignment:
- secondary = ", Secondary"
- return "%s, %s, %s%s, %s " %(self.species, self.mapq, read, secondary, self.qname)
-
-
-class ReadPairData(object):
- __slots__ = ["forwardRead",
- "reverseRead",
- "speciesConflict",
- "secondaryAlignment",
- "mapped",
- "bothMapped",
- "calledSpecies",
- "poorQualityDrop"]
-
- def __init__(self, read1:ReadAlignmentData, read2:ReadAlignmentData):
- forwardRead = None
- reverseRead = None
- for read in [read1, read2]:
- if read1.isRead2:
- reverseRead = read1
- else:
- forwardRead = read1
- if read2.isRead2:
- reverseRead = read2
- else:
- forwardRead = read2
- if not forwardRead and reverseRead:
- raise MispairedReadError("Two reads of the same direction were given: %s and %s" %(read1, read2))
- self.forwardRead = forwardRead
- self.reverseRead = reverseRead
- self.speciesConflict = self.checkSpeciesConflict()
- self.secondaryAlignment = self.checkSecondaryAlignment()
-
- def checkSpeciesConflict(self, readPairDisputeDecisionThresholdQDelta:int=40, minimumMapqForSingleReadConfidence:int=40, minimumMapqForPairedReadConfidence:int=30):
- if bool(self.forwardRead.species) != bool(self.reverseRead.species): #Functionally a logical XOR, handles cases where one read was unmappable
- self.mapped = True
- self.bothMapped = False
- if self.forwardRead.species:
- if self.forwardRead.mapq < minimumMapqForSingleReadConfidence:
- self.mapped = False
- self.bothMapped = False
- self.calledSpecies = None
- self.poorQualityDrop = True
- return False
- self.calledSpecies = (self.forwardRead.species)
- else:
- if self.reverseRead.mapq < minimumMapqForSingleReadConfidence:
- self.mapped = False
- self.bothMapped = False
- self.calledSpecies = None
- self.poorQualityDrop = True
- return False
- self.calledSpecies = (self.reverseRead.species)
- return False
- if not self.forwardRead.species and not self.reverseRead.species:
- self.mapped = False
- self.bothMapped = False
- else:
- self.mapped = True
- self.bothMapped = True
- if self.forwardRead.species == self.reverseRead.species:
- if self.forwardRead.mapq < minimumMapqForPairedReadConfidence or self.reverseRead.mapq < minimumMapqForPairedReadConfidence:
- self.calledSpecies = None
- self.poorQualityDrop = True
- else:
- self.poorQualityDrop = False
- self.calledSpecies = (self.forwardRead.species)
- return False
- else:
- mapqs = (self.forwardRead.mapq, self.reverseRead.mapq)
- mapqDelta = abs(mapqs[0] - mapqs[1])
- if max(mapqs) < minimumMapqForSingleReadConfidence:
- self.calledSpecies = None
- self.poorQualityDrop = True
- if mapqDelta >= readPairDisputeDecisionThresholdQDelta:
- if mapqs[0] > mapqs[1]:
- self.calledSpecies = self.forwardRead.species
- else:
- self.calledSpecies = self.reverseRead.species
- self.poorQualityDrop = False
- return False
- self.calledSpecies = (self.forwardRead.species, self.reverseRead.species)
- self.poorQualityDrop = False
- return True
-
- def checkSecondaryAlignment(self):
- return self.forwardRead.secondaryAlignment or self.reverseRead.secondaryAlignment
-
-
-class MispairedReadError(Exception):
- pass
-
-
-def extractSpecies(referenceName:str):
- if referenceName is None:
- return None
- return "_".join(referenceName.split("_")[:2])
-
-
-def readParallelProcessor(read:pysam.AlignedRead):
- return ReadAlignmentData(read.reference_name, read.mapping_quality, read.query_name, read.is_secondary, read.is_read2)
-
-
-def listBamFiles(folder:str):
- folderFiles = os.listdir(folder)
- bamFiles = [os.path.join(folder, file) for file in folderFiles if file.endswith(".bam")]
- return bamFiles
-
-
-def generateAnalyzedReadList(bamFilePath:str):
- import datetime
- startTime = datetime.datetime.now()
- bamFile = pysam.AlignmentFile(bamFilePath, "rb")
- analyzedReads = []
- readCount = 0
- for read in bamFile:
- analyzedReads.append(ReadAlignmentData(read.reference_name, read.mapping_quality, read.query_name, read.is_secondary, read.is_read2))
- readCount += 1
- if readCount % 500000 == 0:
- analysisTime = datetime.datetime.now() - startTime
- print("Analyzed %s reads in %s" %(readCount, analysisTime), flush=True)
- bamFile.close()
- analysisTime = datetime.datetime.now() - startTime
- print("Analyzed %s reads in %s" % (readCount, analysisTime))
- return analyzedReads
-
-
-def readSorter(readList:typing.List[ReadAlignmentData]):
- sortedReads = {}
- readList.insert(0, None)
- read = readList.pop()
- while read is not None:
- if not read.qname in sortedReads:
- sortedReads[read.qname] = []
- sortedReads[read.qname].append(read)
- read = readList.pop()
- return sortedReads
-
-
-def sortedReadsDictToList(readDict:dict):
- readList = []
- qnames = list(readDict.keys())
- qnames.insert(0, None)
- qname = qnames.pop()
- while qname:
- readList.append(readDict[qname])
- del readDict[qname]
- qname = qnames.pop()
- return readList
-
-
-def getSpeciesCallCounts(readSetList:typing.List[typing.List[ReadAlignmentData]]):
- import collections
- speciesCalls = [callSpeciesFromReadSet(readSet) for readSet in readSetList]
- speciesCounts = collections.Counter(speciesCalls)
- return speciesCounts
-
-
-def callSpeciesFromReadSet(readList:typing.List[ReadAlignmentData], reportSpeciesConflictListAsChimeraLike:bool=True):
- if len(readList) == 2:
- readPair = ReadPairData(*readList)
- if readPair.speciesConflict:
- if reportSpeciesConflictListAsChimeraLike:
- return "Chimera_like"
- else:
- return readPair.calledSpecies
- if not readPair.mapped:
- return "Unaligned_reads"
- if readPair.poorQualityDrop:
- return "Poor_quality"
- if not readPair.calledSpecies:
- return "Unaligned_reads"
- return readPair.calledSpecies
- else:
- return multimapDisputeResolution(readList)
-
-
-def splitForwardAndReverse(readList:typing.List[ReadAlignmentData], removeUnaligned:bool=True):
- forward = []
- reverse = []
- if removeUnaligned:
- forward = [read for read in readList if not read.isRead2 and read.species]
- reverse = [read for read in readList if read.isRead2 and read.species]
- else:
- forward = [read for read in readList if not read.isRead2]
- reverse = [read for read in readList if read.isRead2]
- return forward, reverse
-
-
-def getConfidentReads(readList:typing.List[ReadAlignmentData], multimapDisputeResolutionQDelta:int=20):
- if len(readList) == 1:
- if readList[0].species:
- return set([readList[0]])
- else: #Handles a case where one of the sets came back as unaligned
- return set()
- else:
- mapqs = [read.mapq for read in readList]
- topMapq = max(mapqs)
- confidentReads = [read for read in readList if topMapq - read.mapq <= multimapDisputeResolutionQDelta]
- return set(confidentReads)
-
-
-def getConfidentSpecies(readList:typing.List[ReadAlignmentData], multimapDisputeResolutionQDelta:int=20):
- if len(readList) == 1:
- if readList[0].species:
- return set([readList[0]])
- else: #Handles a case where one of the sets came back as unaligned
- return set()
- else:
- mapqs = [read.mapq for read in readList]
- topMapq = max(mapqs)
- confidentSpecies = [read.species for read in readList if topMapq - read.mapq <= multimapDisputeResolutionQDelta]
- return set(confidentSpecies)
-
-
-def getMaxMapqFromReadList(readList:typing.List[ReadAlignmentData]):
- return max([read.mapq for read in readList])
-
-
-def multimapDisputeResolution(readList:typing.List[ReadAlignmentData], minimumMapqForSingleReadConfidence:int=40, minimumMapqForPairedReadConfidence:int=30):
- forwardReads, reverseReads = splitForwardAndReverse(readList, removeUnaligned=True)
- if not reverseReads:
- forwardMapqMax = getMaxMapqFromReadList(forwardReads)
- if not forwardMapqMax >= minimumMapqForSingleReadConfidence:
- return "Poor_quality"
- confidentForwards = getConfidentReads(forwardReads)
- if len(confidentForwards) == 1:
- confidentForwards = list(confidentForwards)
- return confidentForwards[0].species
- else:
- return "Ambiguous_reads"
- if not forwardReads:
- reverseMapqMax = getMaxMapqFromReadList(reverseReads)
- if not reverseMapqMax >= minimumMapqForSingleReadConfidence:
- return "Poor_quality"
- confidentReverses = getConfidentReads(reverseReads)
- if len(confidentReverses) == 1:
- confidentReverses = list(confidentReverses)
- return confidentReverses[0].species
- else:
- return "Ambiguous_reads"
- forwardMapqMax = getMaxMapqFromReadList(forwardReads)
- reverseMapqMax = getMaxMapqFromReadList(reverseReads)
- if not forwardMapqMax >= minimumMapqForPairedReadConfidence or reverseMapqMax >= minimumMapqForPairedReadConfidence:
- return "Poor_quality"
- forwardConfidentSpecies = getConfidentSpecies(forwardReads)
- reverseConfidentSpecies = getConfidentSpecies(reverseReads)
- speciesIntersection = forwardConfidentSpecies.intersection(reverseConfidentSpecies)
- if len(speciesIntersection) == 0:
- return "Chimera_like"
- elif len(speciesIntersection) == 1:
- speciesIntersection = list(speciesIntersection)
- return speciesIntersection[0]
- else:
- return "Ambiguous_reads"
-
-
-def bamFileProcessor(bamFile:str):
- print("Starting analysis of %s" %bamFile, flush=True)
- readList = generateAnalyzedReadList(bamFile)
- sortedReads = readSorter(readList)
- del readList
- readSets = sortedReadsDictToList(sortedReads)
- del sortedReads
- speciesCallCounts = getSpeciesCallCounts(readSets)
- return speciesCallCounts
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisSE.py b/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisSE.py
deleted file mode 100644
index f8682be..0000000
--- a/miqScoreShotgunPublicSupport/alignmentAnalysis/alignmentAnalysisSE.py
+++ /dev/null
@@ -1,173 +0,0 @@
-import os
-import pysam
-import typing
-
-'''
-Read species calls:
-Chimera_like: Both paired ends disagree, no good single-species explanation
-Poor_quality: Depending on if call was being made on single or both ends, quality of read(s) for making call was insufficient
-Unmapped_reads: No usable alignment data
-Ambiguous_reads: Both reads support two or more species without a big enough delta in confidence to be decisive
-If a species is given, that was a successful call
-Under certain parameter settings, a tuple of multiple species may be given indicating reads that support multiple species
-'''
-
-
-class ReadAlignmentData(object):
- __slots__ = ["qname",
- "mapq",
- "secondaryAlignment",
- "isRead2",
- "species"]
-
- def __init__(self, contig:str, mapq:int, qname:str, secondaryAlignment:bool, isRead2:bool):
- self.species = extractSpecies(contig)
- self.mapq = mapq
- self.qname = qname
- self.secondaryAlignment = secondaryAlignment
- self.isRead2 = isRead2
-
- def __str__(self):
- secondary = ""
- read = "R1"
- if self.isRead2:
- read = "R2"
- if self.secondaryAlignment:
- secondary = ", Secondary"
- return "%s, %s, %s%s, %s " %(self.species, self.mapq, read, secondary, self.qname)
-
-
-
-class MispairedReadError(Exception):
- pass
-
-
-def extractSpecies(referenceName:str):
- if referenceName is None:
- return None
- return "_".join(referenceName.split("_")[:2])
-
-
-def readParallelProcessor(read:pysam.AlignedRead):
- return ReadAlignmentData(read.reference_name, read.mapping_quality, read.query_name, read.is_secondary, read.is_read2)
-
-
-def listBamFiles(folder:str):
- folderFiles = os.listdir(folder)
- bamFiles = [os.path.join(folder, file) for file in folderFiles if file.endswith(".bam")]
- return bamFiles
-
-
-def generateAnalyzedReadList(bamFilePath:str):
- import datetime
- startTime = datetime.datetime.now()
- bamFile = pysam.AlignmentFile(bamFilePath, "rb")
- analyzedReads = []
- readCount = 0
- for read in bamFile:
- analyzedReads.append(ReadAlignmentData(read.reference_name, read.mapping_quality, read.query_name, read.is_secondary, read.is_read2))
- readCount += 1
- if readCount % 500000 == 0:
- analysisTime = datetime.datetime.now() - startTime
- print("Analyzed %s reads in %s" %(readCount, analysisTime), flush=True)
- bamFile.close()
- analysisTime = datetime.datetime.now() - startTime
- print("Analyzed %s reads in %s" % (readCount, analysisTime))
- return analyzedReads
-
-
-def readSorter(readList:typing.List[ReadAlignmentData]):
- sortedReads = {}
- readList.insert(0, None)
- read = readList.pop()
- while read is not None:
- if not read.qname in sortedReads:
- sortedReads[read.qname] = []
- sortedReads[read.qname].append(read)
- read = readList.pop()
- return sortedReads
-
-
-def sortedReadsDictToList(readDict:dict):
- readList = []
- qnames = list(readDict.keys())
- qnames.insert(0, None)
- qname = qnames.pop()
- while qname:
- readList.append(readDict[qname])
- del readDict[qname]
- qname = qnames.pop()
- return readList
-
-
-def getSpeciesCallCounts(readSetList:typing.List[typing.List[ReadAlignmentData]]):
- import collections
- speciesCalls = [callSpeciesFromReadSet(readSet) for readSet in readSetList]
- speciesCounts = collections.Counter(speciesCalls)
- return speciesCounts
-
-
-def callSpeciesFromReadSet(readList:typing.List[ReadAlignmentData], minimumMapqForSingleReadConfidence:int=40):
- if len(readList) == 1:
- if readList[0].species:
- if readList[0].mapq >= minimumMapqForSingleReadConfidence:
- return readList[0].species
- else:
- return "Poor_quality"
- else:
- return "Unaligned_reads"
- else:
- return multimapDisputeResolution(readList, minimumMapqForSingleReadConfidence)
-
-
-def getConfidentReads(readList:typing.List[ReadAlignmentData], multimapDisputeResolutionQDelta:int=20):
- if len(readList) == 1:
- if readList[0].species:
- return set([readList[0]])
- else: #Handles a case where one of the sets came back as unaligned
- return set()
- else:
- mapqs = [read.mapq for read in readList]
- topMapq = max(mapqs)
- confidentReads = [read for read in readList if topMapq - read.mapq <= multimapDisputeResolutionQDelta]
- return set(confidentReads)
-
-
-def getConfidentSpecies(readList:typing.List[ReadAlignmentData], multimapDisputeResolutionQDelta:int=20):
- if len(readList) == 1:
- if readList[0].species:
- return set([readList[0]])
- else: #Handles a case where one of the sets came back as unaligned
- return set()
- else:
- mapqs = [read.mapq for read in readList]
- topMapq = max(mapqs)
- confidentSpecies = [read.species for read in readList if topMapq - read.mapq <= multimapDisputeResolutionQDelta]
- return set(confidentSpecies)
-
-
-def getMaxMapqFromReadList(readList:typing.List[ReadAlignmentData]):
- return max([read.mapq for read in readList])
-
-
-def multimapDisputeResolution(readList:typing.List[ReadAlignmentData], minimumMapqForSingleReadConfidence:int=40):
- mapqMax = getMaxMapqFromReadList(readList)
- if not mapqMax >= minimumMapqForSingleReadConfidence:
- return "Poor_quality"
- confidentReads = getConfidentReads(readList)
- if len(confidentReads) == 1:
- confidentReads = list(confidentReads)
- return confidentReads[0].species
- else:
- return "Ambiguous_reads"
-
-
-def bamFileProcessor(bamFile:str):
- print("Starting analysis of %s" %bamFile, flush=True)
- readList = generateAnalyzedReadList(bamFile)
- sortedReads = readSorter(readList)
- del readList
- readSets = sortedReadsDictToList(sortedReads)
- del sortedReads
- speciesCallCounts = getSpeciesCallCounts(readSets)
- return speciesCallCounts
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/alignmentAnalysis/bwaHandler.py b/miqScoreShotgunPublicSupport/alignmentAnalysis/bwaHandler.py
deleted file mode 100644
index cd6ba8d..0000000
--- a/miqScoreShotgunPublicSupport/alignmentAnalysis/bwaHandler.py
+++ /dev/null
@@ -1,86 +0,0 @@
-def bwaAlignPE(forwardReads:str, reverseReads:str, workingFolder:str, outputBAM:str, refGenome:str, coreLimit:int=None, compressionCoresPercentage:float=0.15, mock:bool=False):
- import multiprocessing
- import os
- availableCores = multiprocessing.cpu_count() - 1
- if coreLimit and coreLimit > 0:
- availableCores = max([availableCores, coreLimit])
- compressionCores = round(availableCores * compressionCoresPercentage)
- compressionCores = max([compressionCores, 1])
- alignmentCores = availableCores - compressionCores
- if alignmentCores < 1:
- streaming = False
- else:
- streaming = True
- if streaming:
- bwaCommand = "bwa mem -t %s %s %s %s" %(alignmentCores, refGenome, forwardReads, reverseReads)
- samtoolsCommand = "samtools view -b -@ %s -o %s" %(compressionCores, outputBAM)
- combinedCommand = "%s | %s" %(bwaCommand, samtoolsCommand)
- print("RUN: %s" %combinedCommand, flush=True)
- if "MOCK" in os.environ:
- mock = True
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" %exitCode, flush=True)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
- else:
- tempSAM = os.path.join(workingFolder, "temp.sam")
- bwaCommand = "bwa mem %s %s %s > %s" % (refGenome, forwardReads, reverseReads, tempSAM)
- samtoolsCommand = "samtools view -b -@ %s -o %s %s" % (compressionCores, outputBAM, tempSAM)
- combinedCommand = "%s && %s && rm %s" % (bwaCommand, samtoolsCommand, tempSAM)
- print("RUN: %s" % combinedCommand)
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" % exitCode)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
-
-
-def bwaAlignSE(forwardReads:str, workingFolder:str, outputBAM:str, refGenome:str, coreLimit:int=None, compressionCoresPercentage:float=0.15, mock:bool=False):
- import multiprocessing
- import os
- availableCores = multiprocessing.cpu_count() - 1
- if coreLimit and coreLimit > 0:
- availableCores = max([availableCores, coreLimit])
- compressionCores = round(availableCores * compressionCoresPercentage)
- compressionCores = max([compressionCores, 1])
- alignmentCores = availableCores - compressionCores
- if alignmentCores < 1:
- streaming = False
- else:
- streaming = True
- if streaming:
- bwaCommand = "bwa mem -t %s %s %s" %(alignmentCores, refGenome, forwardReads)
- samtoolsCommand = "samtools view -b -@ %s -o %s" %(compressionCores, outputBAM)
- combinedCommand = "%s | %s" %(bwaCommand, samtoolsCommand)
- print("RUN: %s" %combinedCommand, flush=True)
- if "MOCK" in os.environ:
- mock = True
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" %exitCode, flush=True)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
- else:
- tempSAM = os.path.join(workingFolder, "temp.sam")
- bwaCommand = "bwa mem %s %s > %s" % (refGenome, forwardReads, tempSAM)
- samtoolsCommand = "samtools view -b -@ %s -o %s %s" % (compressionCores, outputBAM, tempSAM)
- combinedCommand = "%s && %s && rm %s" % (bwaCommand, samtoolsCommand, tempSAM)
- print("RUN: %s" % combinedCommand)
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" % exitCode)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/alignmentAnalysis/minimap2.py b/miqScoreShotgunPublicSupport/alignmentAnalysis/minimap2.py
deleted file mode 100644
index 63fd8a3..0000000
--- a/miqScoreShotgunPublicSupport/alignmentAnalysis/minimap2.py
+++ /dev/null
@@ -1,42 +0,0 @@
-def minimapAlign(forwardReads:str, workingFolder:str, outputBAM:str, refGenome:str, coreLimit:int=None, compressionCoresPercentage:float=0.15, mock:bool=False):
- import multiprocessing
- import os
- availableCores = multiprocessing.cpu_count() - 1
- if coreLimit and coreLimit > 0:
- availableCores = max([availableCores, coreLimit])
- compressionCores = round(availableCores * compressionCoresPercentage)
- compressionCores = max([compressionCores, 1])
- alignmentCores = availableCores - compressionCores
- if alignmentCores < 1:
- streaming = False
- else:
- streaming = True
- if streaming:
- minimapCommand = "minimap2 -L -ax map-ont -t %s %s %s" %(alignmentCores, refGenome, forwardReads) #the -L command clips CIGARs that are too long for BAM standards
- samtoolsCommand = "samtools view -b -@ %s -o %s" %(compressionCores, outputBAM)
- combinedCommand = "%s | %s" %(minimapCommand, samtoolsCommand)
- print("RUN: %s" %combinedCommand, flush=True)
- if "MOCK" in os.environ:
- mock = True
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" %exitCode, flush=True)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
- else:
- tempSAM = os.path.join(workingFolder, "temp.sam")
- minimapCommand = "minimap2 -L -ax map-ont %s %s > %s" % (refGenome, forwardReads, tempSAM)
- samtoolsCommand = "samtools view -b -@ %s -o %s %s" % (compressionCores, outputBAM, tempSAM)
- combinedCommand = "%s && %s && rm %s" % (minimapCommand, samtoolsCommand, tempSAM)
- print("RUN: %s" % combinedCommand)
- if not mock:
- exitCode = os.system(combinedCommand)
- else:
- exitCode = 0
- print("MOCK RUN: %s" %combinedCommand)
- print("Completed with status %s" % exitCode)
- if exitCode:
- raise RuntimeError("Running alignment and compression returned a non-zero exit status")
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/__init__.py b/miqScoreShotgunPublicSupport/formatReaders/__init__.py
deleted file mode 100644
index 0d9712b..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/__init__.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from . import fastq
-from . import qualityScore
-from . import gzipIdentifier
-
-__all__ = ["fastq",
- "qualityScore",
- "gzipIdentifier"]
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/fastq/__init__.py b/miqScoreShotgunPublicSupport/formatReaders/fastq/__init__.py
deleted file mode 100644
index 21f4fa4..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/fastq/__init__.py
+++ /dev/null
@@ -1,7 +0,0 @@
-from . import fastqHandler
-from . import fastqAnalysis
-from . import fileNamingStandards
-
-__all__ = ["fastqHandler",
- "fastqAnalysis",
- "fileNamingStandards"]
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqAnalysis.py b/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqAnalysis.py
deleted file mode 100644
index dcb217f..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqAnalysis.py
+++ /dev/null
@@ -1,314 +0,0 @@
-import logging
-logger = logging.getLogger(__name__)
-from . import fileNamingStandards
-
-def buildQualityMatrix(path:str):
- import numpy
- from .fastqHandler import FastqFile
- fastq = FastqFile(path, depth=1)
- qualityMatrix = []
- for read in fastq:
- qualityMatrix.append(read.quality.phredScores)
- fastq.close()
- return numpy.matrix(qualityMatrix, dtype='uint8') #Memory efficient, but if someone feeds in a phred score > 255, this will break. PacBio, I'm looking at you.
-
-
-def buildQualityMatrixPaired(forward:str, reverse:str):
- return buildQualityMatrix(forward), buildQualityMatrix(reverse)
-
-
-def buildExpectedErrorMatrix(path:str, superLean:bool = False, startPosition:int = 0, subsample:int=0):
- import numpy
- from .. import qualityScore
- from .fastqHandler import FastqFile
- fastq = FastqFile(path, depth = 0, subsample = subsample)
- expectedErrorMatrix = []
- dataType = 'float16'
- if superLean:
- dataType = 'uint8'
- for line in fastq:
- expectedErrorLineList = qualityScore.qualityScoreHandler.cumulativeExpectedErrorArray(line.quality, fastq.qualityScoreScheme)[startPosition:]
- expectedErrorMatrix.append(expectedErrorLineList) #low precision floating point. Usually users are looking for whole numbers anyway
- fastq.close()
- return numpy.array(expectedErrorMatrix, dataType, order='F')
-
-
-def buildExpectedErrorMatrixPaired(forward:str, reverse:str, superLean:bool = False, startPositions:tuple = (0, 0), subsample:int=0):
- return buildExpectedErrorMatrix(forward, superLean, startPositions[0]), buildExpectedErrorMatrix(reverse, superLean, startPositions[1])
-
-
-def findCutoffByPercentile(path:str, phredScore:int, percentile:int):
- '''
- This will analyze a fastq file to find where the given percentile of reads is at or below the given phred score (such as finding the read where the 10th percentile of reads is phred=10.
- Value returned is the position *INDEXED TO ZERO*
- :param path: path of the Fastq to analyze
- :param phredScore: score to use in cutoff
- :param percentile: percentile to use in cutoff
- :return:base position (integer)
- '''
- import numpy
- qualityMatrix = buildQualityMatrix(path).transpose() #faster calclation of percentiles if we have positions as rows and reads as columns
- for position, row in enumerate(qualityMatrix):
- nthPercentile = numpy.percentile(row, percentile)
- if nthPercentile < percentile:
- return position
- return numpy.size(qualityMatrix, 0)
-
-
-def makeQualityMatrix(path:str):
- import numpy
- from . import fastqHandler
- readLength, variance = fastqHandler.estimateReadLength(path, getVariance=True)
- if variance != 0:
- readLength = fastqHandler.getLongestReadInFile(path)
- fastq = fastqHandler.FastqFile(path, depth=1)
- qualityRange = fastq.qualityScoreScheme.range
- readLengthMatrix = [0] * readLength
- qualityCountMatrix = []
- for i in range(qualityRange + 1):
- qualityCountMatrix.append(readLengthMatrix.copy())
- '''
- Building a matrix here where the correspond to all possibly quality scores and columns represent each base position of each read (indexed to zero)
- Calling a specific value is done by qualityMatrix[qualityScore][readPosition]
- '''
- for read in fastq:
- for position, phred in enumerate(read.quality.phredScores):
- qualityCountMatrix[phred][position] = qualityCountMatrix[phred][position] + 1
- fastq.close()
- qualityCountMatrix = numpy.matrix(qualityCountMatrix)
- return qualityCountMatrix
- # plt.imshow(qualityCountMatrix, origin='lower', aspect='auto')
- # plt.xlabel("Position")
- # plt.ylabel("Quality (Phred)")
- # plt.title("Read quality for %s" %path)
- # if not testingOnly:
- # if outputFile:
- # plt.savefig(outputFile)
- # else:
- # plt.show()
- # return qualityCountMatrix
-
-
-def makeAverageExpectedErrorLine(path:str):
- import numpy
- expectedErrorMatrix = buildExpectedErrorMatrix(path)
- expectedErrorMatrix = expectedErrorMatrix.transpose()
- means = []
- for line in expectedErrorMatrix:
- means.append(numpy.mean(line))
- return means
- # plt.plot(means, 'k-')
- # plt.xlabel("Position")
- # plt.ylabel("Average Expected Error")
- # plt.show()
-
-
-def getDataForFastqPlots(forwardFastq:fileNamingStandards.NamingStandard, reverseFastq:fileNamingStandards.NamingStandard = None):
- forwardQualityMatrix = makeQualityMatrix(forwardFastq.filePath)
- forwardExpectedErrorLine = makeAverageExpectedErrorLine(forwardFastq.filePath)
- if reverseFastq is None:
- reverseQualityMatrix = None
- reverseExpectedErrorLine = None
- else:
- reverseQualityMatrix = makeQualityMatrix(reverseFastq.filePath)
- reverseExpectedErrorLine = makeAverageExpectedErrorLine(reverseFastq.filePath)
- return forwardQualityMatrix, reverseQualityMatrix, forwardExpectedErrorLine, reverseExpectedErrorLine
-
-
-def generateFastqPlotPaired(forwardFastq:fileNamingStandards.NamingStandard, reverseFastq:fileNamingStandards.NamingStandard, sampleTitle:str = None, outputFile:str = None, base64Format:str = None):
- import matplotlib.pyplot as plt
- if base64Format:
- import base64
- if outputFile and base64Format:
- outputFileFormat = outputFile.split(".")[-1]
- if not outputFileFormat == base64Format:
- logger.error(
- "Cannot save plot in one format and return base64 in a different format. Returning file save format. Save in %s. Return base64 %s" % (
- outputFileFormat, base64Format))
- if sampleTitle is None:
- sampleTitle = " ".join([str(item) for item in forwardFastq.sampleID])
- else:
- sampleTitle = str(sampleTitle)
- forwardQualityMatrix, reverseQualityMatrix, forwardExpectedErrorLine, reverseExpectedErrorLine = getDataForFastqPlots(forwardFastq, reverseFastq)
- plt.suptitle("Analysis of %s" % sampleTitle, horizontalalignment="center", fontsize=18, fontweight="bold")
-
- #make plots for forward reads
- plt.subplot(221)
- plt.imshow(forwardQualityMatrix, origin='lower', aspect='auto')
- plt.xlabel("Read 1 Position")
- plt.ylabel("Quality (Phred)")
- plt.title(" ", fontsize = 16) #making a whitespace buffer
- plt.subplot(222)
- plt.plot(forwardExpectedErrorLine, 'k-')
- plt.xlabel("Read 1 Position")
- plt.ylabel("Average Expected Error")
- plt.title(" ", fontsize = 16) #making a whitespace buffer
-
- #make plots for reverse reads
- plt.subplot(223)
- plt.imshow(reverseQualityMatrix, origin='lower', aspect='auto')
- plt.xlabel("Read 2 Position")
- plt.ylabel("Quality (Phred)")
- #plt.title("Read quality for %s" %reverseFastq.fileName)
- plt.subplot(224)
- plt.plot(reverseExpectedErrorLine, 'k-')
- plt.xlabel("Read 2 Position")
- plt.ylabel("Average Expected Error")
- #plt.title("Expected error for %s" % reverseFastq.fileName)
-
- plt.tight_layout()
- if outputFile:
- plt.savefig(outputFile)
- if base64Format:
- imageFile = open(outputFile)
- encodedFile = base64.b64encode(imageFile.read())
- imageFile.close()
- return encodedFile
- elif base64Format:
- import io
- byteStream = io.BytesIO()
- plt.savefig(byteStream, format=base64Format)
- byteStream.seek(0)
- encodedFile = base64.b64encode(byteStream.read())
- return encodedFile
- else:
- plt.show()
-
-
-def generateFastqPlotSingle(forwardFastq: fileNamingStandards.NamingStandard, sampleTitle: str = None, outputFile: str = None, base64Format:str = None):
- import matplotlib.pyplot as plt
- if base64Format:
- import base64
- if outputFile and base64Format:
- outputFileFormat = outputFile.split(".")[-1]
- if not outputFileFormat == base64Format:
- logger.error("Cannot save plot in one format and return base64 in a different format. Returning file save format. Save in %s. Return base64 %s" %(outputFileFormat, base64Format))
- if sampleTitle is None:
- sampleTitle = " ".join([str(item) for item in forwardFastq.sampleID])
- else:
- sampleTitle = str(sampleTitle)
- forwardQualityMatrix, reverseQualityMatrix, forwardExpectedErrorLine, reverseExpectedErrorLine = getDataForFastqPlots(forwardFastq)
- plt.suptitle(sampleTitle, horizontalalignment="center", fontsize = 18, fontweight = "bold")
-
- # make plots for reads
- plt.subplot(211)
- plt.imshow(forwardQualityMatrix, origin='lower', aspect='auto')
- plt.xlabel("Position")
- plt.ylabel("Quality (Phred)")
- plt.title(" ", fontsize = 16) #making a whitespace buffer
- plt.subplot(212)
- plt.plot(forwardExpectedErrorLine, 'k-')
- plt.xlabel("Position")
- plt.ylabel("Average Expected Error")
-
- plt.tight_layout()
- if outputFile:
- plt.savefig(outputFile)
- if base64Format:
- imageFile = open(outputFile)
- encodedFile = base64.b64encode(imageFile.read())
- imageFile.close()
- return encodedFile
- elif base64Format:
- import io
- byteStream = io.BytesIO()
- plt.savefig(byteStream, format=base64Format)
- byteStream.seek(0)
- encodedFile = base64.b64encode(byteStream.read())
- return encodedFile
- else:
- plt.show()
-
-
-class ParallelPlotAgent(object):
-
- def __init__(self, outputDirectory:str = None, base64Output:bool = False, outputFormat:str = None):
- self.outputDirectory = outputDirectory
- self.outputFormat = outputFormat
- self.base64Output = base64Output
- if outputDirectory or base64Output:
- if not outputFormat:
- raise ValueError("If output to file (directory) or base64 is set, an output format must be provided, but none was.")
-
- def parallelPlotter(self, fastq:[tuple, fileNamingStandards.NamingStandard]):
- import os
- if type(fastq) == tuple:
- sampleName = "_".join([str(item) for item in fastq[0].sampleID])
- returnFastq = fastq[0]
- else:
- sampleName = "_".join([str(item) for item in fastq.sampleID])
- returnFastq = fastq
- if self.outputDirectory:
- outputFileName = os.path.join(self.outputDirectory, sampleName + ".%s" %self.outputFormat)
- else:
- outputFileName = None
- if self.base64Output:
- base64Format = self.outputFormat
- else:
- base64Format = None
- if type(fastq) == tuple:
- base64EncodedPlot = generateFastqPlotPaired(fastq[0], fastq[1], outputFile=outputFileName, base64Format=base64Format)
- else:
- base64EncodedPlot = generateFastqPlotSingle(fastq, outputFile=outputFileName, base64Format=base64Format)
- return returnFastq, outputFileName, base64EncodedPlot #returnValue will be None unless a base64 encoded image was returned
-
-
-def plotFastqFilesInFolder(directory:str, namingStandard:fileNamingStandards.NamingStandard, outputDirectory:str = None, base64Output:bool = False, outputFormat:str = None):
- import os
- from . import fastqHandler
- from ... import easyMultiprocessing
- if outputDirectory and not os.path.isdir(directory):
- raise NotADirectoryError("Unable to find a directory at %s" %directory)
- if outputDirectory and not os.path.isdir(outputDirectory):
- raise NotADirectoryError("Unable to find a directory at %s" %outputDirectory)
- if outputDirectory or base64Output:
- if not outputFormat:
- raise ValueError(
- "If output to file (directory) or base64 is set, an output format must be provided, but none was.")
- fastqTable = fastqHandler.getSamplePairTableFromFolder(directory, namingStandard)
- fastqSetList = []
- for key in fastqTable:
- if key == "unpaired":
- for fastq in fastqTable["unpaired"]:
- fastqSetList.append(fastq)
- else:
- fastqSetList.append(fastqTable[key])
- parallelPlotAgent = ParallelPlotAgent(outputDirectory=outputDirectory, base64Output=base64Output, outputFormat=outputFormat)
- if outputDirectory or base64Output:
- plotReturnValues = easyMultiprocessing.parallelProcessRunner(parallelPlotAgent.parallelPlotter, fastqSetList)
- else:
- plotReturnValues = [parallelPlotAgent.parallelPlotter(fastq) for fastq in fastqSetList] #can't do parallel plotting if plotting to a display window
- returnTable = {}
- if outputDirectory and base64Output:
- for fastq, outputFile, base64EncodedPlot in plotReturnValues:
- returnTable[fastq] = (outputFile, base64EncodedPlot)
- elif base64Output:
- for fastq, outputFile, base64EncodedPlot in plotReturnValues:
- returnTable[fastq] = base64EncodedPlot
- elif outputDirectory:
- for fastq, outputFile, base64EncodedPlot in plotReturnValues:
- returnTable[fastq] = base64EncodedPlot
- return returnTable
-
-
-def getEstimatedFastqFileSizeSumFromList(fastqList:list):
- import os
- from .. import gzipIdentifier
- sum = 0
- for fastq in fastqList:
- fileSize = os.path.getsize(fastq.filePath)
- if gzipIdentifier.isGzipped(fastq.filePath):
- fileSize = round(fileSize * 3.5)
- sum += fileSize
- return sum
-
-def getEstimatedFastqSizeSumFromDirectory(path:str):
- import os
- from . import fastqHandler
- if not os.path.isdir(path):
- raise NotADirectoryError("Unable to find a directory at %s" %path)
- fastqList = fastqHandler.findSamplesInFolder(path)
- return getEstimatedFastqFileSizeSumFromList(fastqList)
-
-
-
diff --git a/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqHandler.py b/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqHandler.py
deleted file mode 100644
index af75c9c..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/fastq/fastqHandler.py
+++ /dev/null
@@ -1,553 +0,0 @@
-import os
-import logging
-import typing
-logger = logging.getLogger(__name__)
-from .. import qualityScore
-from . import fileNamingStandards
-
-class ReadMetadataLine(object):
-
- def __init__(self, rawMetadata):
- self.rawMetadata = rawMetadata
- if not rawMetadata.startswith("@"):
- logger.warning("Got a metadata line that did not start with an @ symobol. This goes against the fastq standard and may suggest a corrupt file. Line: %s" %rawMetadata)
- metadataSplit = rawMetadata.strip().split(" ")
- if not len(metadataSplit) == 2:
- errorMessage = "Got a metadata line that appears to have more than two elements divided by space. %s" %rawMetadata
- logger.critical(errorMessage)
- raise FastqFormatError(errorMessage)
- equipmentInfo, readInfo = metadataSplit
- self.validEquipmentInfo = self.processEquipmentInfo(equipmentInfo, rawMetadata)
- self.validReadInfo = self.processReadInfo(readInfo, rawMetadata)
- self.allValidInfo = self.validEquipmentInfo and self.validReadInfo
-
- def processReadInfo(self, readInfo:str, rawMetadata:str=""):
- validFields = True
- readInfo = readInfo.split(":")
- if not len(readInfo) == 4:
- errorMessage = "Got a read info section of metadata that did not have 4 elements. Line: %s" %rawMetadata
- logger.critical(errorMessage)
- raise FastqFormatError(errorMessage)
- self.direction, self.filtered, self.controlBits, self.index = readInfo
- try:
- self.direction = int(self.direction)
- if self.direction not in [1, 2]:
- validFields = False
- logger.error("Read direction found that was not 1 or 2. Line: %s" %rawMetadata)
- except ValueError:
- validFields = False
- logger.error("Read direction could not be cast to integer. Line: %s" %rawMetadata)
- if self.filtered.upper() == "Y":
- self.filtered = True
- self.passedFilter = False
- elif self.filtered.upper() == "N":
- self.filtered = False
- self.passedFilter = True
- else:
- self.passedFilter = None
- validFields = False
- logger.error("Got a value for filtered that was not Y or N. Line: %s" %rawMetadata)
- try:
- self.controlBits = int(self.controlBits)
- if not self.controlBits % 2 == 0:
- validFields = False
- logger.error("Got a control bits value of %s. Control bits should be an even number. Line: %s " %(self.controlBits, rawMetadata))
- except ValueError:
- validFields = False
- logger.error("Unable to cast control bits to an integer. Line: %s " %rawMetadata)
- return validFields
-
- def processEquipmentInfo(self, equipmentInfo:str, rawMetadata:str=""):
- validFields = True
- equipmentInfo = equipmentInfo.replace("@", "")
- equipmentInfo = equipmentInfo.split(":")
- if not len(equipmentInfo) == 7:
- logger.critical("Equipment info section of metadata did not have 7 elements. Line: %s" %rawMetadata)
- raise FastqFormatError("Equipment info section of metadata did not have 7 elements. Line: %s" %rawMetadata)
- self.instrumentName, self.runID, self.flowcellID, self.tileNumber, self.laneNumber, self.xCoordinate, self.yCoordinate = equipmentInfo
- try:
- self.runID = int(self.runID)
- except ValueError:
- validFields = False
- logger.error("Run ID number could not be cast to integer. Metadata line: %s" %rawMetadata)
- try:
- self.laneNumber = int(self.laneNumber)
- except ValueError:
- validFields = False
- logger.error("Lane number could not be cast to integer. Metadata line: %s" %rawMetadata)
- try:
- self.tileNumber = int(self.tileNumber)
- except ValueError:
- validFields = False
- logger.error("Tile number could not be cast to integer. Metadata line: %s" %rawMetadata)
- try:
- self.xCoordinate = int(self.xCoordinate)
- except ValueError:
- validFields = False
- logger.error("X-coordinate could not be cast to integer. Metadata line: %s" %rawMetadata)
- try:
- self.yCoordinate = int(self.yCoordinate)
- except ValueError:
- validFields = False
- logger.error("Y-coordinate could not be cast to integer. Metadata line: %s" %rawMetadata)
- return validFields
-
- def __str__(self):
- return self.rawMetadata
-
-
-class QualityScoreLine(object):
-
- def __init__(self, rawQualityLine:str, base:int = 33):
- self.qualityString = rawQualityLine
- self.phredScores = self.calculatePhredScores(base)
-
- def calculatePhredScores(self, base:int = 33):
- from .. import qualityScore
- return qualityScore.qualityScoreHandler.convertToNumericArray(self.qualityString, base)
-
- def __str__(self):
- return self.qualityString
-
- def __getitem__(self, item):
- return self.phredScores[item]
-
- def __iter__(self):
- for value in self.phredScores:
- yield value
-
-
-class SequenceLine(object):
-
- def __init__(self, rawSequence, runAnalysis:bool=False):
- self.sequence = rawSequence.strip().upper().replace(".", "N")
- self.length = len(self.sequence)
- if runAnalysis:
- self.baseFrequency = self.getBaseFrequencyTable()
- self.gcContent = self.calculateGCContent()
-
- def getBaseFrequencyTable(self):
- freq = {"A" : 0,
- "G" : 0,
- "C" : 0,
- "T" : 0,
- "N" : 0}
- for base in self.sequence:
- try:
- freq[base] += 1
- except KeyError:
- logger.error("Found a sequence with an invalid character. Character: %s Sequence: %s" %(base, self.sequence))
- return freq
-
- def calculateGCContent(self):
- totalReadBases = 0
- gcBases = 0
- for base in "ATGC":
- totalReadBases += self.baseFrequency[base]
- if base in "GC":
- gcBases += self.baseFrequency[base]
- if totalReadBases == 0:
- return 0
- return gcBases/totalReadBases
-
- def __len__(self):
- return self.length
-
- def __str__(self):
- return self.sequence
-
- def __eq__(self, other):
- if type(other) == SequenceLine:
- return self.sequence == other.sequence
- elif type(other) == str:
- return self.sequence == SequenceLine(other).sequence
- else:
- logger.critical("Attempted to compare a sequence to something that is not a sequence line type or string. Value in question was type %s: %s" %(type(other), other))
-
-
-class FastqLineSet(object):
-
- def __init__(self, metadata:str, sequence:str, spacer:str, quality:str, depth:int=0, analyzeMetadata:bool=False, analyzeSequence:bool=False, analyzeSequenceInDepth:bool=False, analyzeQuality:bool=False, qualityBase:int=33):
- self.metadata = metadata.strip()
- self.sequence = sequence.strip()
- self.spacer = spacer.strip()
- self.quality = quality.strip()
- if depth >= 1 or analyzeQuality:
- self.quality = QualityScoreLine(quality, qualityBase)
- if depth >= 2 or analyzeSequence or analyzeSequenceInDepth:
- if depth >= 4 or analyzeSequenceInDepth:
- self.sequence = SequenceLine(self.sequence, runAnalysis=True)
- else:
- self.sequence = SequenceLine(self.sequence)
- if depth >= 3 or analyzeMetadata:
- self.metadata = ReadMetadataLine(self.metadata)
-
- def __str__(self):
- return "%s\n%s\n%s\n%s" %(self.metadata, self.sequence, self.spacer, self.quality)
-
-def reanalyzeFastqLineSet(fastqLineSet:FastqLineSet, depth:int=0, analyzeMetadata:bool=False, analyzeSequence:bool=False, analyzeSequenceInDepth:bool=False, analyzeQuality:bool=False, qualityBase:int=33):
- return FastqLineSet(str(fastqLineSet.metadata),
- str(fastqLineSet.sequence),
- str(fastqLineSet.spacer),
- str(fastqLineSet.quality),
- depth, analyzeMetadata, analyzeSequence, analyzeSequenceInDepth, analyzeQuality, qualityBase)
-
-class FastqFile(object):
-
- def __init__(self, path:str, depth:int=0, analyzeMetadata:bool=False, analyzeSequence:bool=False, analyzeSequenceInDepth:bool=False, analyzeQuality:bool=False, fullValidation:bool=False, qualityScoreScheme:[qualityScore.qualityScoreHandler.EncodingScheme, None]=None, subsample:int = 0):
- self.path = path
- if not os.path.isfile(path):
- logger.critical("Unable to find fastq file at %s" %path)
- raise FileNotFoundError("Unable to find fastq file at %s" %path)
- follower = qualityScoreScheme
- if not qualityScoreScheme:
- qualityScoreScheme = findQualityScoreEncoding(path)
- if type(qualityScoreScheme) == qualityScore.qualityScoreHandler.EncodingScheme:
- self.qualityScoreScheme = qualityScoreScheme
- else:
- raise TypeError("Quality score scheme must be of qualityScoreHandler.EncodingScheme type. Passed: %s of type %s." %(qualityScoreScheme, type(qualityScoreScheme)))
- self.depth = depth
- self.analyzeMetadata = analyzeMetadata
- self.analyzeSequence = analyzeSequence
- self.analyzeSequenceInDepth = analyzeSequenceInDepth
- self.analyzeQuality = analyzeQuality
- self.fullValidation = fullValidation
- self.reachedEnd = False
- self.gzipped = self.checkGzip(path)
- if self.gzipped:
- import gzip
- self.filehandle = gzip.open(path, "rt")
- else:
- self.filehandle = open(path, "r")
- self.open = True
- subsample = int(subsample)
- if subsample == 0:
- subsample = 1
- self.subsample = subsample
- self.currentLine = 0
-
- def checkGzip(self, path):
- from .. import gzipIdentifier
- return gzipIdentifier.isGzipped(path)
-
- def getNextRead(self):
-
- def read4Lines():
- readBuffer = []
- for i in range(4):
- nextLine = self.filehandle.readline()
- if not nextLine:
- self.reachedEnd = True
- break
- nextLine = nextLine.strip()
- if nextLine:
- readBuffer.append(nextLine)
- if self.reachedEnd:
- if readBuffer:
- logger.error(
- "Fastq file at %s appears to me missing lines (found something not a multiple of 4." % self.path)
- for i in range(4 - len(readBuffer)):
- readBuffer.append("")
- return readBuffer
-
- if not self.open:
- logger.critical("Attempting to read from a closed fastq file at %s" %self.path)
- raise ValueError("I/O operation on a closed file")
- readBuffer = None
- includedLine = False
- while not includedLine:
- readBuffer = read4Lines()
- self.currentLine += 1
- includedLine = (self.currentLine - 1) % self.subsample == 0 or self.reachedEnd
- if not readBuffer:
- return readBuffer
- else:
- fastqLineSet = FastqLineSet(*readBuffer, depth=self.depth, analyzeMetadata=self.analyzeMetadata, analyzeSequence=self.analyzeSequence, analyzeSequenceInDepth=self.analyzeSequenceInDepth, analyzeQuality=self.analyzeQuality, qualityBase=self.qualityScoreScheme.base)
- if self.fullValidation:
- if not len(readBuffer[1]) == len(readBuffer[3]):
- raise FastqValidationError("Got mismatched sequence and quality line lengths for line %s" %readBuffer)
- if type(fastqLineSet.metadata) == str:
- metadata = ReadMetadataLine(str(fastqLineSet.metadata))
- else:
- metadata = fastqLineSet.metadata
- if not metadata.allValidInfo:
- raise FastqValidationError("Got some invalid metadata for line %s" %readBuffer)
- return fastqLineSet
-
- def close(self):
- if not self.filehandle.closed:
- self.filehandle.close()
-
- def __iter__(self):
- return self
-
- def __next__(self):
- returnValue = self.getNextRead()
- if self.reachedEnd:
- self.close()
- raise StopIteration
- else:
- return returnValue
-
- def __str__(self):
- return "Fastq file object at %s" %self.path
-
-
-class FastqFilePair(object):
-
- def __init__(self, pe1Path:str, pe2Path:str, depth:int=0, analyzeMetadata:bool=False, analyzeSequence:bool=False, analyzeSequenceInDepth:bool=False, analyzeQuality:bool=False, fullValidation:bool=False, qualityScoreScheme:qualityScore.qualityScoreHandler=None, subsample:int=0):
- self.pe1Path = pe1Path
- if not os.path.isfile(pe1Path):
- logger.critical("Unable to find fastq file at %s" %pe1Path)
- raise FileNotFoundError("Unable to find paired-end 1 fastq file at %s" %pe1Path)
- self.pe2Path = pe2Path
- if not os.path.isfile(pe2Path):
- logger.critical("Unable to find fastq file at %s" %pe2Path)
- raise FileNotFoundError("Unable to find paired-end 1 fastq file at %s" %pe2Path)
- self.depth = depth
- self.analyzeMetadata = analyzeMetadata
- self.analyzeSequence = analyzeSequence
- self.analyzeSequenceInDepth = analyzeSequenceInDepth
- self.analyzeQuality = analyzeQuality
- self.fullValidation = fullValidation
- self.reachedEnd = False
- if subsample == 0:
- subsample = 1
- self.subsample = subsample
- self.pe1FileHandle = FastqFile(pe1Path, depth=depth, analyzeMetadata=analyzeMetadata, analyzeSequence=analyzeSequence, analyzeSequenceInDepth=analyzeSequenceInDepth, analyzeQuality=analyzeQuality, fullValidation=fullValidation, qualityScoreScheme=qualityScoreScheme, subsample=subsample)
- self.pe2FileHandle = FastqFile(pe2Path, depth=depth, analyzeMetadata=analyzeMetadata, analyzeSequence=analyzeSequence, analyzeSequenceInDepth=analyzeSequenceInDepth, analyzeQuality=analyzeQuality, fullValidation=fullValidation, qualityScoreScheme=qualityScoreScheme, subsample=subsample)
- if not self.pe1FileHandle.qualityScoreScheme == self.pe2FileHandle.qualityScoreScheme:
- logger.warning("Paired end files appear to have different quality score encodings. Pe1: %s:%s. Pe2: %s%s" %(self.pe1FileHandle.qualityScoreScheme, self.pe1FileHandle.path, self.pe2FileHandle.qualityScoreScheme, self.pe2FileHandle.path))
- self.open = True
- self.reportedReadMismatch = False
-
- def getNextReadPair(self):
- if not self.open:
- logger.critical("Attempting to read from a closed fastq files at %s and %s" %(self.pe1Path, self.pe2Path))
- raise ValueError("I/O operation on a closed file")
- nextPe1 = self.pe1FileHandle.getNextRead()
- nextPe2 = self.pe2FileHandle.getNextRead()
- if (nextPe1 and not nextPe2) or (not nextPe1 and nextPe2):
- if nextPe1:
- logger.error("Ran out of paired-end 2 reads with remaining paired-end 1 reads for file pair %s and %s" %(self.pe1Path, self.pe2Path))
- else:
- logger.error("Ran out of paired-end 1 reads with remaining paired-end 2 reads for file pair %s and %s" %(self.pe1Path, self.pe2Path))
- if self.fullValidation:
- raise FastqValidationError("Reached end of one paired-end file before the other. Files: %s and %s" %(self.pe1Path, self.pe2Path))
- if not nextPe1 and not nextPe2:
- self.reachedEnd = True
- return None
- if nextPe1 and nextPe2 and self.fullValidation:
- self.runValidation(nextPe1, nextPe2)
- return nextPe1, nextPe2
-
- def runValidation(self, pe1:FastqLineSet, pe2:FastqLineSet):
- if type(pe1.metadata) == str:
- pe1Metadata = ReadMetadataLine(str(pe1.metadata))
- elif type(pe1.metadata) == ReadMetadataLine:
- pe1Metadata = pe1.metadata
- else:
- raise TypeError("Only able to compare metadata as string or metadata objects")
- if type(pe2.metadata) == str:
- pe2Metadata = ReadMetadataLine(str(pe2.metadata))
- elif type(pe1.metadata) == ReadMetadataLine:
- pe2Metadata = pe2.metadata
- else:
- raise TypeError("Only able to compare metadata as string or metadata objects")
- if not pe1Metadata.allValidInfo or not pe2Metadata.allValidInfo:
- raise FastqValidationError("Got invalid metadata field for at least one read in paired end mates:\n%s\n%s" %(pe1, pe2))
- if not validPairedEndMetadata(pe1Metadata, pe2Metadata):
- raise FastqValidationError("Got invalid metadata match for paired end mates:\n%s\n%s" %(pe1, pe2))
-
- def close(self):
- self.pe1FileHandle.close()
- self.pe2FileHandle.close()
- self.open = False
-
- def __iter__(self):
- return self
-
- def __next__(self):
- returnValue = self.getNextReadPair()
- if self.reachedEnd:
- raise StopIteration
- else:
- return returnValue
-
- def __str__(self):
- return "Fastq file pair object at %s and %s" %(self.pe1Path, self.pe2Path)
-
-
-class FastqValidationError(Exception):
- pass
-
-
-class FastqFormatError(Exception):
- pass
-
-
-def validPairedEndMetadata(pe1:ReadMetadataLine, pe2:ReadMetadataLine):
- matchFields = ["instrumentName",
- "runID",
- "flowcellID",
- "laneNumber",
- "tileNumber",
- "xCoordinate",
- "yCoordinate",
- "index"]
- for field in matchFields:
- pe1Value = getattr(pe1, field)
- pe2Value = getattr(pe2, field)
- if not pe1Value == pe2Value:
- logger.error("Mismatch on %s" %matchFields)
- return False
- if not ((pe1.direction == 1 and pe2.direction == 2) or (pe2.direction == 1 and pe1.direction == 2)):
- return False
- return True
-
-
-def validFastqFile(path:str):
- readCount = 0
- fastq = FastqFile(path, fullValidation=True)
- read = fastq.getNextRead()
- while read:
- try:
- read = fastq.getNextRead()
- readCount += 1
- except Exception as error:
- logger.error(error)
- return False
- fastq.close()
- return readCount
-
-
-def validFastqPair(pe1Path:str, pe2Path:str):
- readCount = 0
- fastqPair = FastqFilePair(pe1Path, pe2Path, fullValidation=True)
- read = fastqPair.getNextReadPair()
- while read:
- try:
- read = fastqPair.getNextReadPair()
- readCount += 1
- except Exception as error:
- logger.error(error)
- fastqPair.close()
- return False
- fastqPair.close()
- return readCount
-
-
-def estimateReadLength(path:str, samplesize:int=100, getVariance = False):
- lengths = []
- fastq = FastqFile(path)
- read = fastq.getNextRead()
- while read:
- lengths.append(len(read.sequence))
- if len(lengths) >= samplesize:
- break
- read = fastq.getNextRead()
- meanReadLength = sum(lengths)/len(lengths)
- if getVariance:
- import statistics
- lengthVariance = statistics.variance(lengths)
- return round(meanReadLength), lengthVariance
- return round(meanReadLength)
-
-
-def getLongestReadInFile(path:str):
- longestReadLength = 0
- fastq = FastqFile(path)
- for read in fastq:
- if len(read.sequence) > longestReadLength:
- longestReadLength = len(read.sequence)
- fastq.close()
- return longestReadLength
-
-
-def countReads(path:str):
- readCount = 0
- fastq = FastqFile(path)
- read = fastq.getNextRead()
- while read:
- readCount += 1
- read = fastq.getNextRead()
- fastq.close()
- return readCount
-
-
-def findQualityScoreEncoding(path:str, lineLimit:int=100):
- from .. import qualityScore
- candidates = qualityScore.qualityScoreHandler.makeEncodingTable()
- for i in range(len(candidates)):
- candidates[i].eliminated = False
- fastq = FastqFile(path, qualityScoreScheme=qualityScore.qualityScoreHandler.encodingSchemes.sanger)
- line = fastq.getNextRead()
- lineCount = 0
- while line:
- for candidate in candidates:
- candidate.qualifyWithQualityString(line.quality)
- remaining = len([scheme for scheme in candidates if not scheme.eliminated])
- lineCount += 1
- if lineLimit > 0:
- if lineCount >= lineLimit:
- break
- if remaining == 0:
- logger.error("No valid quality scoring scheme found for fastq file %s" %path)
- fastq.close()
- return None
- elif remaining == 1:
- break
- for candidate in candidates:
- if not candidate.eliminated:
- del candidate.eliminated
- fastq.close()
- return candidate
-
-
-def findSamplesInFolder(directory:str, namingStandard:typing.Type[fileNamingStandards.NamingStandard] = fileNamingStandards.ZymoServicesNamingStandard):
- import os
- if not os.path.isdir(directory):
- raise NotADirectoryError("%s is not a directory or not found." % directory)
- fastqFileInfoList = []
- expectedEndings = fileNamingStandards.expectedEndings
- for item in os.listdir(directory):
- isFastqFile = False
- for expectedEnding in expectedEndings:
- if item.endswith(expectedEnding):
- isFastqFile = True
- break
- if not isFastqFile:
- continue
- filePath = os.path.join(directory, item)
- fastqFileInfoList.append(namingStandard(filePath))
- return fastqFileInfoList
-
-
-def getSamplePairTableFromFolder(directory:str, namingStandard:typing.Type[fileNamingStandards.NamingStandard] = fileNamingStandards.ZymoServicesNamingStandard):
- def hasMate(fastq:fileNamingStandards.NamingStandard, potentialMates:list):
- for potentialMate in potentialMates:
- if fastq.sameSample(potentialMate):
- return potentialMate
- return False
- allFastqs = findSamplesInFolder(directory, namingStandard)
- pairedFastqs = {"unpaired":[]}
- forwardFiles = [fastq for fastq in allFastqs if fastq.direction == 1]
- reverseFiles = [fastq for fastq in allFastqs if fastq.direction == 2]
- for fastq in forwardFiles:
- foundMate = hasMate(fastq, reverseFiles)
- if foundMate:
- reverseFiles.remove(foundMate)
- pairedFastqs[fastq.sampleID] = (fastq, foundMate)
- else:
- pairedFastqs["unpaired"].append(fastq)
- for fastq in reverseFiles:
- pairedFastqs["unpaired"].append(fastq)
- if not pairedFastqs["unpaired"]:
- del pairedFastqs["unpaired"]
- return pairedFastqs
-
-
-
-
-if __name__ == "__main__":
- test = getSamplePairTableFromFolder("c:/Users/mweinstein/dada2big/input/sequence")
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/fastq/fileNamingStandards.py b/miqScoreShotgunPublicSupport/formatReaders/fastq/fileNamingStandards.py
deleted file mode 100644
index 92965c5..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/fastq/fileNamingStandards.py
+++ /dev/null
@@ -1,75 +0,0 @@
-expectedEndings = [".fastq", ".fq", ".fastq.gz", ".fq.gz"]
-
-class NamingStandard(object):
-
- __slots__ = ["fileName", "fileDirectory", "filePath", "sampleNumber", "group", "direction", "sampleID"]
-
- def __init__(self, filePath:str):
- self.filePath = filePath
- self.fileDirectory, self.fileName = self.separateNameAndDirectory(filePath)
- self.group, self.sampleNumber, self.direction = self.getSampleInfo(self.fileName)
- self.sampleID = (self.group, self.sampleNumber)
-
- def separateNameAndDirectory(self, path:str):
- import os
- directory, name = os.path.split(path)
- return directory, name
-
- def getSampleInfo(self, fileName:str):
- raise RuntimeError("This function should always be getting overridden. If you see this, someone called the base class by mistake.")
-
- def sameSample(self, other):
- if not isinstance(other, NamingStandard):
- raise TypeError("Can only check for same sample in another naming standard type")
- if self.group == other.group and self.sampleNumber == other.sampleNumber:
- return True
- return False
-
- def __str__(self):
- return self.filePath
-
- def __hash__(self):
- return hash(self.filePath)
-
- def __eq__(self, other):
- return self.group == other.group and self.sampleNumber == other.sample and self.direction == other.direction
-
- def __ne__(self, other):
- return not self.__eq__(other)
-
- def __xor__(self, other):
- return self.sameSample(other)
-
-
-class ZymoServicesNamingStandard(NamingStandard):
-
- def getSampleInfo(self, fileName:str):
- baseName = fileName.split(".")[0]
- group, sample, direction = baseName.split("_")
- direction = int(direction.replace("R",""))
- return group, sample, direction
-
-
-class IlluminaStandard(NamingStandard):
-
- def getSampleInfo(self, fileName:str):
- baseName = fileName.split(".")[0]
- baseSplit = baseName.split("_")
- group = baseSplit[0]
- sample = baseSplit[1]
- direction = int(baseSplit[2].replace("R",""))
- return group, sample, direction
-
-
-class ManualNamingStandard(NamingStandard):
- __slots__ = ["fileName", "fileDirectory", "filePath", "sampleNumber", "group", "direction", "sampleID"]
-
- def __init__(self, filePath: str, group:str, number:int, direction:int):
- self.filePath = filePath
- self.fileDirectory, self.fileName = self.separateNameAndDirectory(filePath)
- self.group = group
- self.sampleNumber = number
- self.direction = direction
- if direction not in [1, 2]:
- raise ValueError("Read direction must be either 1 or 2. %s was given" %direction)
- self.sampleID = (self.group, self.sampleNumber)
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/gzipIdentifier.py b/miqScoreShotgunPublicSupport/formatReaders/gzipIdentifier.py
deleted file mode 100644
index 4674712..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/gzipIdentifier.py
+++ /dev/null
@@ -1,18 +0,0 @@
-import os
-import gzip
-import binascii
-
-def isGzipped(path:str):
- if not os.path.isfile(path):
- raise FileNotFoundError("Unable to determine if file %s is gzipped because that file does not exist." %path)
- file = open(path, 'rb')
- firstTwoBytes = file.read(2)
- file.close()
- if not binascii.hexlify(firstTwoBytes) == b'1f8b':
- return False
- try:
- file = gzip.open(path, 'rb')
- tenBytes = file.read(10)
- except OSError:
- return False
- return True
diff --git a/miqScoreShotgunPublicSupport/formatReaders/qualityScore/__init__.py b/miqScoreShotgunPublicSupport/formatReaders/qualityScore/__init__.py
deleted file mode 100644
index 5f50203..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/qualityScore/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-from . import qualityScoreHandler
-
-__all__ = ["qualityScoreHandler"]
\ No newline at end of file
diff --git a/miqScoreShotgunPublicSupport/formatReaders/qualityScore/qualityScoreHandler.py b/miqScoreShotgunPublicSupport/formatReaders/qualityScore/qualityScoreHandler.py
deleted file mode 100644
index 0c7f4b6..0000000
--- a/miqScoreShotgunPublicSupport/formatReaders/qualityScore/qualityScoreHandler.py
+++ /dev/null
@@ -1,165 +0,0 @@
-import logging
-import math
-import typing
-logger = logging.getLogger(__name__)
-
-class EncodingScheme(object):
-
- def __init__(self, name:str, base:int, startCharacter:str, endCharacter:str, pErrorToScore:typing.Callable, scoreToPError:typing.Callable):
- self.name = name
- self.base = base
- self.characterSet = self.makeCharacterSet(startCharacter, endCharacter)
- self.range = self.calculateRange(startCharacter, endCharacter)
- self.fromPErrorFormula = pErrorToScore
- self.toPErrorFormula = scoreToPError
-
- def makeCharacterSet(self, start:str, end:str):
- rangeStart = ord(start)
- rangeEnd = ord(end) + 1
- return [chr(asciiValue) for asciiValue in range(rangeStart, rangeEnd)]
-
- def calculateRange(self, start:str, end:str):
- rangeStart = ord(start)
- rangeEnd = ord(end)
- return rangeEnd - rangeStart
-
- def toPError(self, score:[int, str]):
- if type(score) == str:
- if len(score) == 1:
- score = convertCharacterToScore(score, self.base)
- else:
- logger.critical("Attempt to convert multiple characters to error probability. Function can only handle one conversion per call.")
- raise ValueError("Attempt to get pError for entire string. Need one value at a time. String: %s" %score)
- return self.toPErrorFormula(score)
-
- def scoreFromPError(self, pError:float, round:bool=True):
- return self.fromPErrorFormula(pError, round)
-
- def encodedFromPError(self, pError:float):
- return chr(self.scoreFromPError(pError, round=True) + self.base)
-
- def qualifyWithQualityString(self, qualityString:str):
- try:
- throwaway = self.eliminated
- except AttributeError:
- self.eliminated = False
- if not self.eliminated:
- qualityString = str(qualityString)
- for character in qualityString:
- if not character in self.characterSet:
- self.eliminated = True
- break
-
- def __str__(self):
- return self.name
-
- def __eq__(self, other:[str]):
- if not type(other) in [str, EncodingScheme]:
- raise TypeError("Unable to compare encoding scheme types with anything but string or other EncodingScheme objects")
- return self.name == str(other)
-
-
-def makeEncodingTable():
- encodingTable = [ # In order of likelihood
- EncodingScheme("Sanger/Illumina 1.8+", 33, "!", "I", pErrorToPhred, phredToPError),
- EncodingScheme("Illumina 1.8+", 33, "!", "J", pErrorToPhred, phredToPError),
- EncodingScheme("Illumina 1.5-7", 64, "B", "i", pErrorToPhred, phredToPError),
- EncodingScheme("Illumina 1.3-4", 64, "@", "h", pErrorToPhred, phredToPError),
- EncodingScheme("Solexa", 64, ";", "h", pErrorToSolexa, solexaToPError),
- EncodingScheme("Pacbio", 33, "!", "~", pErrorToPhred, phredToPError)
- ]
- return encodingTable
-
-def convertCharacterToScore(character, base:int=33):
- return ord(character) - base
-
-
-def convertToNumericArray(qualityString, base: int = 33):
- phredScores = []
- for character in qualityString:
- phredScores.append(convertCharacterToScore(character, base))
- return tuple(phredScores)
-
-
-def pErrorToPhred(pError:float, roundValue:bool=True):
- score = -10 * (math.log(pError, 10))
- if roundValue:
- score = round(score)
- return score
-
-
-def phredToPError(phred:[int, float]):
- return 10 ** (-phred/10)
-
-
-def pErrorToSolexa(pError:float, roundValue:bool=True): #google the definition of "arcane"
- score = -10 * (math.log(pError/(1-pError), 10))
- if roundValue:
- score = round(score)
- return score
-
-
-def solexaToPError(solexa:[int, float]): #seriously, who uses this encoding anymore, and who realizes that it's a slightly different formula?
- return 1 / ((10 ** (solexa/10)) + 1) #Let's hope I don't have to derive that one again
-
-
-class _Encodings(object):
-
- def __init__(self):
- self.sanger = EncodingScheme("Sanger/Illumina 1.8+", 33, "!", "I", pErrorToPhred, phredToPError)
- self.illumina = EncodingScheme("Illumina 1.8+", 33, "!", "J", pErrorToPhred, phredToPError)
- self.illumina1_8 = self.illumina
- self.illumina1_5 = EncodingScheme("Illumina 1.5-7", 64, "B", "i", pErrorToPhred, phredToPError)
- self.illumina1_3 = EncodingScheme("Illumina 1.3-4", 64, "@", "h", pErrorToPhred, phredToPError)
- self.solexa = EncodingScheme("Solexa", 64, ";", "h", pErrorToSolexa, solexaToPError)
- self.pacbio = EncodingScheme("Pacbio", 33, "!", "~", pErrorToPhred, phredToPError)
-
-
-encodingSchemes = _Encodings()
-
-
-def cumulativeExpectedErrorArray(qualityString:str, encoding:EncodingScheme=encodingSchemes.illumina):
- cumulativeExpectedErrorArray = []
- cumulativeExpectedError = 0.0 #ask me no questions, I'll tell you no lies/errors
- qualityString = str(qualityString)
- for character in qualityString:
- cumulativeExpectedError += encoding.toPError(character)
- cumulativeExpectedErrorArray.append(cumulativeExpectedError)
- return cumulativeExpectedErrorArray
-
-
-def cumulativeExpectedErrorArrayDada2Exact(qualityString:str, encoding:EncodingScheme=encodingSchemes.illumina):
- cumulativeExpectedErrorArray = []
- cumulativeExpectedError = 0.0 #ask me no questions, I'll tell you no lies/errors
- qualityString = str(qualityString)
- for character in qualityString:
- score = ord(character) - encoding.base
- cumulativeExpectedError += 10 ** (-score/10)
- cumulativeExpectedErrorArray.append(cumulativeExpectedError)
- return cumulativeExpectedErrorArray
-
-
-def convertQualityString(qualityString:str, inputScheme:EncodingScheme, outputScheme:EncodingScheme):
- qualityString = str(qualityString)
- if inputScheme.fromPErrorFormula == outputScheme.fromPErrorFormula:
- baseDifference = inputScheme.base - outputScheme.base
- outputString = ""
- for character in qualityString:
- outputString += chr(ord(character) - baseDifference)
- return outputString
- else:
- outputString = ""
- for character in qualityString:
- pError = inputScheme.toPError(character)
- outputString += outputScheme.encodedFromPError(pError)
- return outputString
-
-
-if __name__ == "__main__":
- test = makeEncodingTable()
- convert = encodingSchemes.illumina.encodedFromPError(0.0001)
- func1 = phredToPError == phredToPError
- func2 = solexaToPError == phredToPError
- testString = "CCCCCGG7FGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGGFGGGGGGFFGGGGGG9EGGF8F8FGGFGGGFGGFGGGGFG8FFGEGGGGGGGEFGGGGGGCFGGFFGEGEDFGGGGCFGFGG9,CACGGGGEGGEGGFGFGFGGEEGGGF8EGDDGGGGFGGGFFGGEGGGD*:>ECCB:AFG>)::+@>CFFG?FFD><>FE8DFF>>F31CEC*1<)9FF=**.68*: