Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ximmer coverage #173

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ src/main/scala/org/biodatageeks/sequila/pileup/PileupRunner.scala

page/resources/_gen/
page/public/

data/
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ libraryDependencies += "io.projectglow" %% "glow-spark3" % "1.0.1" excludeAll (E
libraryDependencies += "com.intel.gkl" % "gkl" % "0.8.8"
libraryDependencies += "org.openjdk.jol" % "jol-core" % "0.16" % "provided"
libraryDependencies += "com.github.jsr203hadoop" % "jsr203hadoop" % "1.0.3"

libraryDependencies += "org.skyscreamer" % "jsonassert" % "1.5.1"



Expand Down
19 changes: 19 additions & 0 deletions gcp_testy
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash
cores=( "local[1]" "local[2]" "local[4]" "local[8]" "local[16]" )
results=()

for i in "${!cores[@]}"; do
for j in 1 2 3 4 5; do
START=$(date +%s.%N)
spark-shell -v --master ${cores[i]} --driver-memory 50g --jars /home/k_kobylinski98/sequila/target/scala-2.12/sequila-assembly-0.1.0.jar --class org.biodatageeks.sequila.apps.XimmerApp -b /home/k_kobylinski98/ximmer_3/data_gs -t /home/k_kobylinski98/ximmer_3/data_gs/big.bed -o /home/k_kobylinski98/ximmer_3/data_gs/ximmer_sequila -f /home/k_kobylinski98/ximmer_3/data/hg38.fasta xhmm conifer exomedepth codex cnmops
END=$(date +%s.%N)
exec_time=$(echo "$END - $START" | bc)
results+=( "$exec_time" )
echo "$exec_time " >> ./wyniki_xhmm
done
done

echo "-----------------------------"
for i in "${results[@]}"; do
echo "$i"
done
23 changes: 23 additions & 0 deletions src/main/scala/org/biodatageeks/sequila/apps/SequilaApp.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.biodatageeks.sequila.apps

import htsjdk.samtools.ValidationStringency
import org.apache.spark.sql.{SequilaSession, SparkSession}
import org.biodatageeks.sequila.rangejoins.IntervalTree.IntervalTreeJoinStrategyOptim
import org.biodatageeks.sequila.utils.InternalParams
import org.seqdoop.hadoop_bam.util.SAMHeaderReader

trait SequilaApp {
def createSequilaSession(): SequilaSession = {
Expand All @@ -15,4 +19,23 @@ trait SequilaApp {
spark.sparkContext.setLogLevel("WARN")
ss
}

def createSparkSessionWithExtraStrategy(sparkSave: Boolean = false): SparkSession = {
val spark = SparkSession
.builder()
.config("spark.master", "local[4]")
.config(InternalParams.saveAsSparkFormat, sparkSave)
.getOrCreate()

spark.sqlContext.setConf(InternalParams.useJoinOrder, "true")
spark.experimental.extraStrategies = new IntervalTreeJoinStrategyOptim(spark) :: Nil
spark
.sparkContext
.setLogLevel("OFF")
spark
.sparkContext
.hadoopConfiguration.set(SAMHeaderReader.VALIDATION_STRINGENCY_PROPERTY, ValidationStringency.SILENT.toString)

spark
}
}
104 changes: 104 additions & 0 deletions src/main/scala/org/biodatageeks/sequila/apps/XimmerDist.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Created by Krzysztof Kobyliński
*/

package org.biodatageeks.sequila.apps

import org.apache.spark.sql.{DataFrame, Row, SequilaSession}
import org.biodatageeks.sequila.apps.PileupApp.createSparkSessionWithExtraStrategy
import org.biodatageeks.sequila.utils.SystemFilesUtil._
import org.biodatageeks.sequila.ximmer.converters._
import org.biodatageeks.sequila.ximmer.{PerBaseCoverage, TargetCounts}
import org.rogach.scallop.ScallopConf

import java.nio.file.{Files, Paths}
import scala.collection.mutable

object XimmerDist {

class RunConf(args: Array[String]) extends ScallopConf(args) {
val bam_dir = opt[String](required = true)
val targets = opt[String](required = true)
val fasta = opt[String](required = true)
val output_path = opt[String](required = true)
val spark_save = opt[Boolean](default = Some(false))
val callers = trailArg[List[String]](required = true)
verify()
}

def main(args: Array[String]): Unit = {
val runConf = new RunConf(args)
val spark = createSparkSessionWithExtraStrategy(runConf.spark_save())
val ss = SequilaSession(spark)

val bamFiles = findBamFiles(runConf.bam_dir())

val shouldCallXhmm = runConf.callers().contains("xhmm")
val shouldCallConifer = runConf.callers().contains("conifer")
val shouldCallCodex = runConf.callers().contains("codex")
val shouldCallCnmops = runConf.callers().contains("cnmops")
val shouldCallExomedepth = runConf.callers().contains("exomedepth")

var targetCounts = mutable.SortedMap[String, (Array[Row], Long)]()

if (shouldCalculateTargetCounts(runConf)) {
val targetCountsResult = new TargetCounts().calculateTargetCounts(spark, runConf.targets(), bamFiles, shouldCallConifer)
targetCounts = targetCountsResult map {case (sampleName, (dataFrame, readsNr)) =>
(sampleName, (dataFrame.collect(), readsNr))
}
}

if (shouldCallCodex) {
val codexOutput = runConf.output_path() + "/codex"
Files.createDirectories(Paths.get(codexOutput))
new CodexConverter().convertToCodexFormat(targetCounts, codexOutput)
}

if (shouldCallCnmops) {
val cnmopsOutput = runConf.output_path() + "/cnmops"
Files.createDirectories(Paths.get(cnmopsOutput))
new CnMopsConverter().convertToCnMopsFormat(targetCounts, cnmopsOutput)
}

if (shouldCallConifer) {
convertToConiferFormat(targetCounts, runConf.output_path())
}

if (shouldCallExomedepth) {
val exomedepthOutput = runConf.output_path() + "/exomedepth"
Files.createDirectories(Paths.get(exomedepthOutput))
new ExomeDepthConverter().convertToExomeDepthFormat(targetCounts, exomedepthOutput)
}

if (shouldCallXhmm) {
val perBaseResults = new PerBaseCoverage().calculatePerBaseCoverage(ss, bamFiles, runConf.targets(), runConf.fasta())
convertToXhmmFormat(runConf.output_path(), perBaseResults)
}
}

private def shouldCalculateTargetCounts(runConf: RunConf): Boolean = {
val callers = List("cnmops", "exomedepth", "conifer", "codex")
runConf.callers().exists(callers.contains)
}

private def convertToXhmmFormat(outputPath: String, perBaseResult: mutable.Map[String, (DataFrame, DataFrame)]) : Unit = {
val xhmmOutput = outputPath + "/xhmm"
Files.createDirectories(Paths.get(xhmmOutput))
val converter = new GngsConverter()

perBaseResult.foreach(pair =>
converter.calculateStatsAndConvertToGngsFormat(xhmmOutput, pair._1, pair._2._1, pair._2._2)
)
}

private def convertToConiferFormat(targetCountResult: mutable.Map[String, (Array[Row], Long)], outputPath: String) : Unit = {
val coniferOutput = outputPath + "/conifer"
Files.createDirectories(Paths.get(coniferOutput))
val converter = new ConiferConverter()

for (sampleResult <- targetCountResult) {
converter.convertToConiferFormat(sampleResult, coniferOutput)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ trait BDGAlignFileReaderWriter [T <: BDGAlignInputFormat]{
case Columns.POS => r.getStart
case Columns.RNEXT => DataQualityFuncs.cleanContig(r.getMateReferenceName) //FIXME: to check if the mapping is correct
case Columns.PNEXT => r.getMateAlignmentStart //FIXME: to check if the mapping is correct
case Columns.TLEN => r.getLengthOnReference //FIXME: to check if the mapping is correct
case Columns.TLEN => r.getInferredInsertSize
case s if s.startsWith("tag_") => {
val fields = ScalaFuncs.classAccessors[Alignment]
val tagField = fields.filter(_.name.toString == s).head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ object PileupMethods {
val result =
if (conf.value.coverageOnly && conf.value.useVectorizedOrcWriter) {
aggregates
.toCoverageVectorizedWriter(bounds, output, conf)
.toCoverageVectorizedWriter(refPath, bounds, output, conf)
.count()
spark.sparkContext.emptyRDD[InternalRow]
}
else if (conf.value.coverageOnly){
aggregates.toCoverage(bounds)
aggregates.toCoverage(refPath,bounds)
}
else if(conf.value.useVectorizedOrcWriter) {
aggregates.toPileupVectorizedWriter(refPath, bounds, output, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,30 @@ class SequilaConverter (spark: SparkSession) extends Serializable with PileupCon
spark.createDF(perBase.collect().toList, CommonPileupFormat.schemaAltsQualsString.fields.toList )
}

def convertToPerBaseOutput(df: DataFrame): DataFrame = {
val perBase = df.rdd.flatMap( { r => {
val chr = r.getString(SequilaSchema.contig)
val start = r.getInt(SequilaSchema.position_start)
val end = r.getInt(SequilaSchema.position_end) + 1
val ref = r.getString(SequilaSchema.ref)
val cov = r.getShort(SequilaSchema.cov)
val array = new Array[(String, Int, Int, String, Short)](end - start)

var cnt = 0
var position = start

while (position < end) {
if (ref == AlignmentConstants.REF_SYMBOL)
array(cnt) = (chr, position, position, "R", cov)
else
array(cnt) = (chr, position, position, ref.substring(cnt, cnt + 1), cov)
cnt += 1
position += 1
}
array
}
})
spark.createDataFrame(perBase)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ case class AggregateRDD(rdd: RDD[ContigAggregate]) {
}
}

def toCoverage(bounds: Broadcast[Array[PartitionBounds]] ) : RDD[InternalRow] = {
def toCoverage(refPath: String, bounds: Broadcast[Array[PartitionBounds]] ) : RDD[InternalRow] = {

this.rdd.mapPartitionsWithIndex { (index, part) =>

part.map { agg => {
val contigMap = new mutable.HashMap[String, Array[Byte]]()
contigMap += (agg.contig -> UTF8String.fromString(agg.contig).getBytes)
PileupProjection.contigByteMap=contigMap
val reference = new Reference(refPath)
val contigMap = reference.getNormalizedContigMap
PileupProjection.setContigMap(contigMap)

var cov, ind, i, currPos = 0
val allPos = false
Expand Down Expand Up @@ -232,15 +232,15 @@ case class AggregateRDD(rdd: RDD[ContigAggregate]) {
}


def toCoverageVectorizedWriter(bounds: Broadcast[Array[PartitionBounds]], output: Seq[Attribute], conf: Broadcast[Conf]) : RDD[Int] = {
def toCoverageVectorizedWriter(refPath: String, bounds: Broadcast[Array[PartitionBounds]], output: Seq[Attribute], conf: Broadcast[Conf]) : RDD[Int] = {

logger.info(s"### Coverage: using Vectorized ORC output optimization with ${conf.value.orcCompressCodec} compression")
this.rdd.mapPartitionsWithIndex { (index, part) =>

part.map { agg => {
val contigMap = new mutable.HashMap[String, Array[Byte]]()
contigMap += (agg.contig -> UTF8String.fromString(agg.contig).getBytes)
PileupProjection.contigByteMap=contigMap
val reference = new Reference(refPath)
val contigMap = reference.getNormalizedContigMap
PileupProjection.setContigMap(contigMap)

var cov, ind, i, currPos = 0
val maxIndex = FastMath.findMaxIndex(agg.events)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ object InternalParams {
final val minOverlap = "spark.biodatageeks.rangejoin.minOverlap"
final val intervalHolderClass = "spark.biodatageeks.rangejoin.intervalHolderClass"


final val saveAsSparkFormat = "spark.biodatageeks.saveAsSparkFormat"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Created by Krzysztof Kobyliński
*/

package org.biodatageeks.sequila.utils

import java.io.File

object SystemFilesUtil {

def findBamFiles(dirPath: String) : List[String] = {
val dir = new File(dirPath)
if (!dir.exists() || !dir.isDirectory) {
throw new IllegalArgumentException("Directory path should be provided")
}

dir.listFiles
.filter(_.isFile)
.filter(_.getName.endsWith(".bam"))
.map(_.getPath)
.map(x => x.replace("\\", "/"))
.toList
.sorted
}

def getFilename(filePath: String) : String = {
val file = new File(filePath)
if (!file.exists() || file.isDirectory) {
throw new IllegalArgumentException("Filepath should be provided")
}

file.getName.split('.')(0)
}

}
Loading