From 7ebc0060196ae485f5111753ba4618c361cb05f9 Mon Sep 17 00:00:00 2001 From: marek Date: Fri, 12 Sep 2014 08:27:08 +0200 Subject: [PATCH 1/3] Migration to Spark 1.x and Hadoop-BAM 7.x --- .idea/uiDesigner.xml | 125 -- .idea/workspace.xml | 1244 ++++------------- .idea_modules/sparkseq-build.iml | 81 -- .idea_modules/sparkseq-core.iml | 137 -- .idea_modules/sparkseq-repl.iml | 138 -- .idea_modules/sparkseq.iml | 139 -- build.sbt | 2 +- sparkseq-core/build.sbt | 15 +- .../pl/elka/pw/sparkseq/SparkSeqBaseDE.scala | 24 +- .../elka/pw/sparkseq/SparkSeqSimpleJob.scala | 18 +- .../SparkSeqDiffExpr.scala | 46 +- .../junctions/SparkSeqJunctionAnalysis.scala | 11 +- .../seqAnalysis/SparkSeqAnalysis.scala | 121 +- .../SparkSeqKryoRegistrator.scala | 6 +- sparkseq-repl/build.sbt | 4 +- 15 files changed, 372 insertions(+), 1739 deletions(-) delete mode 100644 .idea/uiDesigner.xml delete mode 100644 .idea_modules/sparkseq-build.iml delete mode 100644 .idea_modules/sparkseq-core.iml delete mode 100644 .idea_modules/sparkseq-repl.iml delete mode 100644 .idea_modules/sparkseq.iml diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml deleted file mode 100644 index 3b00020..0000000 --- a/.idea/uiDesigner.xml +++ /dev/null @@ -1,125 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.idea/workspace.xml b/.idea/workspace.xml index bc30dc3..bc991b8 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -1,7 +1,23 @@ - + + + + + + + + + + + + + + + + + diff --git a/.idea_modules/sparkseq-build.iml b/.idea_modules/sparkseq-build.iml deleted file mode 100644 index 7c10ecb..0000000 --- a/.idea_modules/sparkseq-build.iml +++ /dev/null @@ -1,81 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.idea_modules/sparkseq-core.iml b/.idea_modules/sparkseq-core.iml deleted file mode 100644 index e06dc39..0000000 --- a/.idea_modules/sparkseq-core.iml +++ /dev/null @@ -1,137 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.idea_modules/sparkseq-repl.iml b/.idea_modules/sparkseq-repl.iml deleted file mode 100644 index 092533b..0000000 --- a/.idea_modules/sparkseq-repl.iml +++ /dev/null @@ -1,138 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.idea_modules/sparkseq.iml b/.idea_modules/sparkseq.iml deleted file mode 100644 index 6f1c236..0000000 --- a/.idea_modules/sparkseq.iml +++ /dev/null @@ -1,139 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/build.sbt b/build.sbt index d0fa04c..7c95d8d 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ organization := "pl.edu.pw.elka" version := "0.1-SNAPSHOT" -scalaVersion := "2.10.3" +scalaVersion := "2.10.4" publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/html/maven")) ) diff --git a/sparkseq-core/build.sbt b/sparkseq-core/build.sbt index 8ef5b52..f1bbf52 100644 --- a/sparkseq-core/build.sbt +++ b/sparkseq-core/build.sbt @@ -8,7 +8,7 @@ organization := "pl.edu.pw.elka" version := "0.1-SNAPSHOT" -scalaVersion := "2.10.3" +scalaVersion := "2.10.4" publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/html/maven"))) @@ -18,15 +18,16 @@ lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HA ScctPlugin.instrumentSettings libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "0.9.0-incubating", + "org.apache.spark" %% "spark-core" % "1.0.2", "org.scalatest" % "scalatest_2.10" % "2.1.0-RC2" % "test", "org.apache.commons" % "commons-math3" % "3.2", "org.apache.hadoop" % "hadoop-client" % hadoopVersion, - "fi.tkk.ics.hadoop.bam" % "hadoop-bam" % "6.1", - "picard" % "picard" % "1.93", - "samtools" % "samtools" % "1.93", - "tribble" % "tribble" % "1.93", - "variant" % "variant" % "1.93", + "org.seqdoop" % "hadoop-bam" % "7.0.0", + "org.seqdoop" % "htsjdk" % "1.118", + //"picard" % "picard" % "1.93", + //"samtools" % "samtools" % "1.93", + //"tribble" % "tribble" % "1.93", + //"variant" % "variant" % "1.93", "com.github.nscala-time" %% "nscala-time" % "0.8.0" ) diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala index 8f5ea59..0f114a2 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala @@ -15,26 +15,14 @@ */ package pl.elka.pw.sparkseq -import pl.elka.pw.sparkseq.statisticalTests._ -import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis -import pl.elka.pw.sparkseq.statisticalTests._ -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import collection.mutable.ArrayBuffer -import org.apache.hadoop.io.LongWritable -import fi.tkk.ics.hadoop.bam.{BAMInputFormat, SAMRecordWritable} -import pl.elka.pw.sparkseq.conversions.SparkSeqConversions -import scala.util.control._ -import pl.elka.pw.sparkseq.util.SparkSeqContexProperties -import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties -import scala.Array -import org.apache.spark.RangePartitioner -import org.apache.spark.HashPartitioner -import org.apache.spark.SparkConf -import java.io._ -import com.github.nscala_time.time._ import com.github.nscala_time.time.Imports._ +import org.apache.hadoop.io.LongWritable +import org.apache.spark.{SparkConf, SparkContext} +import org.seqdoop.hadoop_bam.{BAMInputFormat, SAMRecordWritable} import pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr +import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis +import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties +import pl.elka.pw.sparkseq.util.SparkSeqContexProperties /** * Created by marek on 2/8/14. diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala index 2ff2fca..e518893 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala @@ -2,30 +2,16 @@ package pl.elka.pw.sparkseq import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties -import pl.elka.pw.sparkseq.util.SparkSeqContexProperties -import pl.elka.pw.sparkseq.conversions.SparkSeqConversions._ import pl.elka.pw.sparkseq.conversions.SparkSeqConversions import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis +import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties +import pl.elka.pw.sparkseq.util.SparkSeqContexProperties //import org.apache.spark.SparkConf //import org.apache.spark._ -import org.apache.spark.storage._ //import SparkContext._ //import spark._ -import org.apache.spark.HashPartitioner -import org.apache.spark.rdd._ -import fi.tkk.ics.hadoop.bam.BAMInputFormat -import fi.tkk.ics.hadoop.bam.SAMRecordWritable -import org.apache.hadoop.io.LongWritable import org.apache.spark.SparkContext.rddToPairRDDFunctions -import com.esotericsoftware.kryo.Kryo -import collection.mutable.ArrayBuffer -import pl.elka.pw.sparkseq.statisticalTests._ -import java.util.HashMap -import scala.collection.mutable.Map -import java.io.File /** * An example of a simple job implemented using SparkSeq. diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala index b3253c3..ecf646d 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala @@ -73,43 +73,43 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn private val cmDistTable = iSC.textFile(confDir + "cm" + caseSampleNum + "_" + controlSampleNum + "_2.txt") .map(l => l.split("\t")) .map(r => (r.array(0).toDouble, r.array(1).toDouble)) - .toArray + .collect() private val cmDistTableB = iSC.broadcast(cmDistTable) private val genExonsMapB = iSC.broadcast(SparkSeqConversions.BEDFileToHashMap(iSC, confDir + iBEDFile)) private val genExonsMapLookupB = iSC.broadcast(SparkSeqConversions.BEDFileToHashMapGeneExon(iSC, confDir + iBEDFile)) - private def groupSeqAnalysis(iSeqAnalysis: SparkSeqAnalysis, iSampleNum: Int): RDD[(Long, Seq[Int])] = { + private def groupSeqAnalysis(iSeqAnalysis: SparkSeqAnalysis, iSampleNum: Int): RDD[(Long, Iterable[Int])] = { val seqGrouped = iSeqAnalysis.getCoverageBaseRegion(iChr, iStartPos, iEndPos) .map(r => (r._1 % 1000000000000L, r._2)) .groupByKey() - .mapValues(c => if ((iSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](iSampleNum - c.length)(0)) else (c)) + .mapValues(c => if ((iSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](iSampleNum - c.size)(0)) else (c)) return (seqGrouped) } - private def joinSeqAnalysisGroup(iSeqAnalysisGroup1: RDD[(Long, Seq[Int])], iSeqAnalysisGroup2: RDD[(Long, Seq[Int])]): RDD[(Long, (Seq[Int], Seq[Int]))] = { + private def joinSeqAnalysisGroup(iSeqAnalysisGroup1: RDD[(Long, Iterable[Int])], iSeqAnalysisGroup2: RDD[(Long, Iterable[Int])]): RDD[(Long, (Iterable[Int], Iterable[Int]))] = { /* val sAnalysisG1 = iSeqAnalysisGroup1.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup1)) val sAnalysisG2 = iSeqAnalysisGroup2.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup2)) val seqJoint = sAnalysisG1.cogroup(sAnalysisG2)*/ - val seqJoint: RDD[(Long, (Seq[Seq[Int]], Seq[Seq[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2) + val seqJoint: RDD[(Long, (Iterable[Iterable[Int]], Iterable[Iterable[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2) val finalSeqJoint = seqJoint // .mapValues(r=>(r._1(0),r._2(0))) .map(r => (r._1, - (if (r._2._1.length == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1(0), - if (r._2._2.length == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2(0))) + (if (r._2._1.size == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1.head, + if (r._2._2.size == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2.head)) ) return (finalSeqJoint) } - private def joinSeqAnalysisGroupRegion(iSeqAnalysisGroup1: RDD[(Long, Seq[Int])], iSeqAnalysisGroup2: RDD[(Long, Seq[Int])]): RDD[(Long, (Seq[Int], Seq[Int]))] = { + private def joinSeqAnalysisGroupRegion(iSeqAnalysisGroup1: RDD[(Long, Iterable[Int])], iSeqAnalysisGroup2: RDD[(Long, Iterable[Int])]): RDD[(Long, (Iterable[Int], Iterable[Int]))] = { /* val sAnalysisG1 = iSeqAnalysisGroup1.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup1)) val sAnalysisG2 = iSeqAnalysisGroup2.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup2)) val seqJoint = sAnalysisG1.cogroup(sAnalysisG2)*/ - val seqJoint: RDD[(Long, (Seq[Seq[Int]], Seq[Seq[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2) + val seqJoint: RDD[(Long, (Iterable[Iterable[Int]], Iterable[Iterable[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2) val finalSeqJoint = seqJoint // .mapValues(r=>(r._1(0),r._2(0))) .map(r => (r._1, - (if (r._2._1.length == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1(0), - if (r._2._2.length == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2(0))) + (if (r._2._1.size == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1.head, + if (r._2._2.size == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2.head)) ) return (finalSeqJoint) } @@ -131,7 +131,7 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn val coalRegions = iRegRDD.map(r => (((if (r._6 == "0") "NEWREG" + r._3._1 else r._5), r._6, r._3._1), (r._1, r._2, r._3, r._4, r._7, r._8, r._9))) /*( (geneId,exonId,chrName), (pval,length,(chr,startPos), foldChange, pctOverlap,avgCountA,avgCountB) ) */ .groupByKey() - .mapValues(r => (r.sortBy(_._3._2))) + .mapValues(r => (r.toArray.sortBy(_._3._2))) .mapPartitions { var k = 0 partitionIterator => @@ -216,9 +216,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn return coalRegions } - private def findContRegionsEqual(iSeq: RDD[((Int, Double), Seq[(Long, Double, Double, Double)])]): + private def findContRegionsEqual(iSeq: RDD[((Int, Double), Iterable[(Long, Double, Double, Double)])]): RDD[(Double, Int, (String, Int), Double, String, String, Double, Double, Double)] = { - val iSeqPart = iSeq.map(r => (r._1._2, r._2.sortBy(_._1))) + val iSeqPart = iSeq.map(r => (r._1._2, r._2.toSeq.sortBy(_._1))) //.partitionBy(new HashPartitioner(iNumTasks * 3)) iSeqPart .mapPartitions { @@ -295,9 +295,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn }.flatMap(r => r) } - private def findContRegionsLessEqual(iSeq: RDD[(Int, Seq[(Double, Long, Double, Double, Double)])]) /*(chrId,(pval,position,foldChange) */ + private def findContRegionsLessEqual(iSeq: RDD[(Int, Iterable[(Double, Long, Double, Double, Double)])]) /*(chrId,(pval,position,foldChange) */ : RDD[(Double, Int, (String, Int), Double, String, String, Double, Double, Double)] = { - val iSeqPart = iSeq.mapValues(r => (r.sortBy(_._2))) + val iSeqPart = iSeq.mapValues(r => (r.toSeq.sortBy(_._2))) // .partitionBy(new HashPartitioner(iNumTasks * 3)) iSeqPart .mapPartitions { @@ -495,9 +495,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn //seqGroupControl.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseControl") val seqJointCC = joinSeqAnalysisGroup(seqGroupCase, seqGroupControl) //seqJointCC.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseJoint") - val seqFilterCC = seqJointCC.filter(r => (SparkSeqStats.mean(r._2._1) >= iMinCoverage || SparkSeqStats.mean(r._2._2) >= iMinCoverage)) + val seqFilterCC = seqJointCC.filter(r => (SparkSeqStats.mean(r._2._1.toArray) >= iMinCoverage || SparkSeqStats.mean(r._2._2.toArray) >= iMinCoverage)) //seqFilterCC.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseFilter") - val seqCompTest = computeTwoSampleCvMTest(seqFilterCC) + val seqCompTest = computeTwoSampleCvMTest(seqFilterCC.map(r=>(r._1,(r._2._1.toSeq,r._2._2.toSeq)) ) ) //seqCompTest.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseTest") val seqPValGroup = seqCompTest if (iCoalesceRegDiffPVal == false) @@ -508,7 +508,7 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn .groupByKey() val seqPostPar = { - seqPrePart.partitionBy(new RangePartitioner[Int, Seq[(Double, Long, Double, Double, Double)]](iNumTasks, seqPrePart)) + seqPrePart.partitionBy(new RangePartitioner[Int, Iterable[(Double, Long, Double, Double, Double)]](iNumTasks, seqPrePart)) } seqRegContDERDD = findContRegionsLessEqual(seqPostPar) //seqRegContDERDD.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugRegCoals") @@ -536,16 +536,16 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn HashMap[String, Array[scala.collection.mutable.ArrayBuffer[(String, String, Int, Int)]]]]): RDD[((String), Double, Double)] = { val seqRegCovCase = iSeqAnalCase.getCoverageRegion(iRegions).map(r => ((SparkSeqConversions.stripSampleID(r._1), r._2))) .groupByKey() - .mapValues(c => if ((caseSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](caseSampleNum - c.length)(0)) else (c)) + .mapValues(c => if ((caseSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](caseSampleNum - c.size)(0)) else (c)) val seqRegCovControl = iSeqAnalControl.getCoverageRegion(iRegions).map(r => ((SparkSeqConversions.stripSampleID(r._1), r._2))) .groupByKey() - .mapValues(c => if ((controlSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](controlSampleNum - c.length)(0)) else (c)) + .mapValues(c => if ((controlSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](controlSampleNum - c.size)(0)) else (c)) val jointRegion = joinSeqAnalysisGroupRegion(seqRegCovCase, seqRegCovControl) val permTestRegionD = jointRegion.map { r => val statTests = Array[SparkSeqStatisticalTest](SparkSeqCvM2STest, SparkSeqKS2STest) - val permTest = new SparkSeqAdaptivePermutTest(iNPermut = 10000, iStatTests = statTests, r._2._1, r._2._2) - (SparkSeqConversions.ensemblRegionIdToExonId(r._1), permTest.getPvalue(), SparkSeqStats.mean(r._2._1) / SparkSeqStats.mean(r._2._2), r._2._1, r._2._2) + val permTest = new SparkSeqAdaptivePermutTest(iNPermut = 10000, iStatTests = statTests, r._2._1.toSeq, r._2._2.toSeq) + (SparkSeqConversions.ensemblRegionIdToExonId(r._1), permTest.getPvalue(), SparkSeqStats.mean(r._2._1.toSeq) / SparkSeqStats.mean(r._2._2.toSeq), r._2._1, r._2._2) } //permTestRegionD.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/64MB/debugTestStat.txt") val permTestRegion = permTestRegionD.map(r => (r._1, r._2, r._3)) diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala index cd06f11..7997ac1 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala @@ -16,12 +16,13 @@ package pl.elka.pw.sparkseq.junctions -import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis -import org.apache.spark.rdd.{EmptyRDD, RDD} +import htsjdk.samtools.CigarOperator +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD import pl.elka.pw.sparkseq.conversions.SparkSeqConversions +import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis + import scala.collection.mutable.ArrayBuffer -import net.sf.samtools.CigarOperator -import org.apache.spark.SparkContext._ /** * Created by mwiewiorka on 4/4/14. @@ -49,7 +50,7 @@ class SparkSeqJunctionAnalysis(seqAnalysis: SparkSeqAnalysis) extends Serializab } - private def getGapFromCigar(alignStart: Int, cigar: net.sf.samtools.Cigar): Array[Range] = { + private def getGapFromCigar(alignStart: Int, cigar: htsjdk.samtools.Cigar): Array[Range] = { var gapArray = ArrayBuffer[Range]() val numCigElem = cigar.numCigarElements() diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala index 6cc7d9f..e732b20 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala @@ -19,15 +19,16 @@ import org.apache.spark.SparkContext import SparkContext._ import org.apache.spark.rdd._ import org.apache.spark._ -import fi.tkk.ics.hadoop.bam.BAMInputFormat -import fi.tkk.ics.hadoop.bam.SAMRecordWritable +import org.seqdoop.hadoop_bam.BAMInputFormat +import org.seqdoop.hadoop_bam.SAMRecordWritable import org.apache.hadoop.io.LongWritable import scala.util.control._ import scala.collection.mutable.ArrayBuffer import pl.elka.pw.sparkseq.conversions.SparkSeqConversions import java.io.{File, PrintWriter} import pl.elka.pw.sparkseq.util.SparkSeqRegType._ -import net.sf.samtools.SAMUtils._ +import htsjdk.samtools.SAMRecord +import htsjdk.samtools.SAMUtils._ import scala.Function._ import com.sun.org.apache.xpath.internal.operations.Bool @@ -89,7 +90,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor * @param iCigar Cigar string of a read aligments * @return Array of ranges computed from Cigar string. */ - private def genBasesFromCigar(iAlignStart: Int, iCigar: net.sf.samtools.Cigar): Array[Range] = { + private def genBasesFromCigar(iAlignStart: Int, iCigar:htsjdk.samtools.Cigar): Array[Range] = { var nuclReadArray = ArrayBuffer[Range]() val numCigElem = iCigar.numCigarElements() @@ -347,9 +348,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Get all reads from all samples in format (sampleId,ReadObject) - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def getReads(): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def getReads(): RDD[(Int, htsjdk.samtools.SAMRecord)] = { return bamFileFilter } @@ -357,9 +358,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Get all reads for a specific sample in format (sampleId,ReadObject) * @param sampleID ID of a given sample - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def getSampleReads(sampleID: Int): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def getSampleReads(sampleID: Int): RDD[(Int, htsjdk.samtools.SAMRecord)] = { return getReads().filter(r => r._1 == sampleID) } @@ -368,23 +369,23 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor * Set reads of SeqAnalysis object, e.g. after external filtering * @param reads RDD of (sampleID,ReadObject) */ - def setReads(reads: RDD[(Int, net.sf.samtools.SAMRecord)]) = { + def setReads(reads: RDD[(Int, htsjdk.samtools.SAMRecord)]) = { bamFileFilter = reads } /** * Generic method for filtering out all reads using the condition provided: _.1 refers to sampleID, _.2 to ReadObject . - * @param filterCond ((Int, net.sf.samtools.SAMRecord)) => Boolean - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @param filterCond ((Int, htsjdk.samtools.SAMRecord)) => Boolean + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterReads(filterCond: ((Int, net.sf.samtools.SAMRecord)) => Boolean): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReads(filterCond: ((Int, htsjdk.samtools.SAMRecord)) => Boolean): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(filterCond) return bamFileFilter } - def filterReads(filterCond: ((Int, net.sf.samtools.SAMRecord), (Int, net.sf.samtools.SAMRecord)) => Boolean): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReads(filterCond: ((Int, htsjdk.samtools.SAMRecord), (Int, htsjdk.samtools.SAMRecord)) => Boolean): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => filterCond(r, r)) return bamFileFilter @@ -393,15 +394,15 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the sample id * @param sampleCond Condition on the sample id - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterSample(sampleCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterSample(sampleCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => sampleCond(r._1)) return bamFileFilter } - def filterSample(sampleCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterSample(sampleCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => sampleCond(r._1, r._1)) return bamFileFilter @@ -410,15 +411,15 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the quality of mapping * @param qaulityCond - Conditions on the quality of read mapping - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord] */ - def filterMappingQuality(qaulityCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterMappingQuality(qaulityCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => qaulityCond(r._2.getMappingQuality)) return bamFileFilter } - def filterMappingQuality(qaulityCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterMappingQuality(qaulityCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => qaulityCond(r._2.getMappingQuality, r._2.getMappingQuality)) return bamFileFilter @@ -427,14 +428,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the base qualities of a given read * @param baseQualCond Conditions on the quality of read bases - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterBaseQualities(baseQualCond: (Array[Int] => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterBaseQualities(baseQualCond: (Array[Int] => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => baseQualCond(r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r)))) return bamFileFilter } - def filterBaseQualities(baseQualCond: ((Array[Int], Array[Int]) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterBaseQualities(baseQualCond: ((Array[Int], Array[Int]) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => baseQualCond(r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r)), r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r)))) return bamFileFilter @@ -443,14 +444,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the reference name * @param refNameCond Condition on reference name - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterReferenceName(refNameCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReferenceName(refNameCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => refNameCond(SparkSeqConversions.standardizeChr(r._2.getReferenceName))) return bamFileFilter } - def filterReferenceName(refNameCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReferenceName(refNameCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => refNameCond(SparkSeqConversions.standardizeChr(r._2.getReferenceName), SparkSeqConversions.standardizeChr(r._2.getReferenceName))) return bamFileFilter } @@ -458,14 +459,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the start of alignment * @param alignStartCond Condition on the start of the alignment - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterAlignmentStart(alignStartCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentStart(alignStartCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignStartCond(r._2.getAlignmentStart)) return bamFileFilter } - def filterAlignmentStart(alignStartCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentStart(alignStartCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignStartCond(r._2.getAlignmentStart, r._2.getAlignmentStart)) return bamFileFilter } @@ -473,14 +474,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the end of alignment * @param alignEndCond Condition on the end of the alignment - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterAlignmentEnd(alignEndCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentEnd(alignEndCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignEndCond(r._2.getAlignmentEnd)) return bamFileFilter } - def filterAlignmentEnd(alignEndCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentEnd(alignEndCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignEndCond(r._2.getAlignmentEnd, r._2.getAlignmentEnd)) return bamFileFilter } @@ -489,14 +490,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor * Generic method for filtering reads using conditions on the merged flags. * More info http://picard.sourceforge.net/explain-flags.html * @param flagCond Condition on the merged flags - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterFlags(flagCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterFlags(flagCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => flagCond(r._2.getFlags)) return bamFileFilter } - def filterFlags(flagCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterFlags(flagCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => flagCond(r._2.getFlags, r._2.getFlags)) return bamFileFilter } @@ -504,9 +505,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the mapped flag * @param unmapFlagCond Condition on the end of the mapped flag - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, nhtsjdk.samtools.SAMRecord)] */ - def filterUnmappedFlag(unmapFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterUnmappedFlag(unmapFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => unmapFlagCond(r._2.getReadUnmappedFlag)) return bamFileFilter } @@ -514,9 +515,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the duplicate flag * @param dupFlagCond Condition on the end of the duplicate flag - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterDuplicateReadFlag(dupFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterDuplicateReadFlag(dupFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => dupFlagCond(r._2.getDuplicateReadFlag)) return bamFileFilter } @@ -524,9 +525,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the primary flag * @param notPrimFlagCond Condition on the end of the primary flag - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterNotPrimaryAlignFlag(notPrimFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterNotPrimaryAlignFlag(notPrimFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => notPrimFlagCond(r._2.getNotPrimaryAlignmentFlag)) return bamFileFilter } @@ -534,14 +535,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the read name * @param readNameCond Condition on the end of the read name - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterReadName(readNameCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReadName(readNameCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => readNameCond(r._2.getReadName)) return bamFileFilter } - def filterReadName(readNameCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReadName(readNameCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => readNameCond(r._2.getReadName, r._2.getReadName)) return bamFileFilter } @@ -549,14 +550,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the read length * @param readLengthCond Condition on the end of the read length - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterReadLength(readLengthCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReadLength(readLengthCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => readLengthCond(r._2.getReadLength)) return bamFileFilter } - def filterReadLength(readLengthCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterReadLength(readLengthCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => readLengthCond(r._2.getReadLength, r._2.getReadLength)) return bamFileFilter } @@ -564,14 +565,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the CIGAR string * @param cigarStringCond Condition on the end of the CIGAR string - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterCigarString(cigarStringCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterCigarString(cigarStringCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => cigarStringCond(r._2.getCigarString)) return bamFileFilter } - def filterCigarString(cigarStringCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterCigarString(cigarStringCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => cigarStringCond(r._2.getCigarString, r._2.getCigarString)) return bamFileFilter } @@ -580,14 +581,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the CIGAR object * @param cigarCond Condition on the end of the CIGAR object - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int,htsjdk.samtools.SAMRecord)] */ - def filterCigar(cigarCond: (net.sf.samtools.Cigar => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterCigar(cigarCond: (htsjdk.samtools.Cigar => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => cigarCond(r._2.getCigar)) return bamFileFilter } - def filterCigar(cigarCond: ((net.sf.samtools.Cigar, net.sf.samtools.Cigar) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterCigar(cigarCond: ((htsjdk.samtools.Cigar, htsjdk.samtools.Cigar) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => cigarCond(r._2.getCigar, r._2.getCigar)) return bamFileFilter } @@ -595,14 +596,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor /** * Method for filtering reads using conditions on the alignment length covered by read incl. gaps * @param alignLenCond Condition on the end of the alignmrnent length covered by read incl. gaps - * @return RDD[(Int, net.sf.samtools.SAMRecord)] + * @return RDD[(Int, htsjdk.samtools.SAMRecord)] */ - def filterAlignmentLength(alignLenCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentLength(alignLenCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignLenCond(r._2.getCigar.getPaddedReferenceLength)) return bamFileFilter } - def filterAlignmentLength(alignLenCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = { + def filterAlignmentLength(alignLenCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = { bamFileFilter = bamFileFilter.filter(r => alignLenCond(r._2.getCigar.getPaddedReferenceLength, r._2.getCigar.getPaddedReferenceLength)) return bamFileFilter } @@ -613,7 +614,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor .map(r => (r._1._2, (r._1._1, r._2))) .groupByKey() .sortByKey() - .mapValues(r => r.sortBy(r => r._1)) + .mapValues(r => r.toSeq.sortBy(r => r._1)) .collect() var samplesHeader: String = "" val samplesIDSort = samplesID.sortBy(r => r) @@ -661,7 +662,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor .map(r => (r._1._2, (r._1._1, r._2))) .groupByKey() .sortByKey() - .mapValues(r => r.sortBy(r => r._1)) + .mapValues(r => r.toSeq.sortBy(r => r._1)) .collect() var samplesHeader: String = "" val samplesIDSort = samplesID.sortBy(r => r) @@ -722,7 +723,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor .map(r => (r._1._2, (r._1._1, r._2))) .groupByKey() .sortByKey() - .mapValues(r => r.sortBy(r => r._1)) + .mapValues(r => r.toSeq.sortBy(r => r._1)) .collect() val samplesIDSort = samplesID.sortBy(r => r) var i = 0 @@ -925,7 +926,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor * @param iAsc If results should be sorted ascending (by default) * @return RDD of((sampleID,(chrName,alignStart)),read object) */ - def sortReadsByAlign(iAsc: Boolean = true): RDD[((Int, (String, Int)), net.sf.samtools.SAMRecord)] = { + def sortReadsByAlign(iAsc: Boolean = true): RDD[((Int, (String, Int)), htsjdk.samtools.SAMRecord)] = { val sortReads = getReads .map(r => ((r._1, SparkSeqConversions.chrToLong(r._2.getReferenceName), r._2.getAlignmentStart), r._2)) @@ -941,7 +942,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor * @param iAsc If results should be sorted ascending (by default) * @return RDD of((sampleID,(chrName,alignStart)),read object) */ - def sortSampleReadsByAlign(iSampleID: Int, iAsc: Boolean = true): RDD[((Int, (String, Int)), net.sf.samtools.SAMRecord)] = { + def sortSampleReadsByAlign(iSampleID: Int, iAsc: Boolean = true): RDD[((Int, (String, Int)), htsjdk.samtools.SAMRecord)] = { val sortReads = getSampleReads(iSampleID) .map(r => ((r._1, SparkSeqConversions.chrToLong(r._2.getReferenceName), r._2.getAlignmentStart), r._2)) diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala index 6a1c53e..583f164 100644 --- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala +++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala @@ -16,8 +16,6 @@ package pl.elka.pw.sparkseq.serialization import com.esotericsoftware.kryo.Kryo -import org.apache.spark.serializer.KryoRegistrator -import pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr /** * Class for registering various classes with KryoSerializer. @@ -29,10 +27,10 @@ class SparkSeqKryoRegistrator extends org.apache.spark.serializer.KryoRegistrato */ override def registerClasses(kryo: Kryo) { kryo.register(classOf[fi.tkk.ics.hadoop.bam.SAMRecordWritable]) - kryo.register(classOf[net.sf.samtools.Cigar]) + kryo.register(classOf[htsjdk.samtools.Cigar]) kryo.register(classOf[fi.tkk.ics.hadoop.bam.BAMInputFormat]) kryo.register(classOf[org.apache.hadoop.io.LongWritable]) - kryo.register(classOf[net.sf.samtools.BAMRecord]) + kryo.register(classOf[htsjdk.samtools.BAMRecord]) //kryo.register(classOf[pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr]) //kryo.register(classOf[scala.collection.Traversable[_]], new ScalaCollectionSerializer(kryo)) //kryo.register(classOf[scala.Product], new ScalaProductSerializer(kryo)) diff --git a/sparkseq-repl/build.sbt b/sparkseq-repl/build.sbt index bc52cca..32423b7 100644 --- a/sparkseq-repl/build.sbt +++ b/sparkseq-repl/build.sbt @@ -4,14 +4,14 @@ version := "0.1-SNAPSHOT" organization := "pl.edu.pw.elka" -scalaVersion := "2.10.3" +scalaVersion := "2.10.4" publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/maven")) ) ScctPlugin.instrumentSettings libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "0.9.0-incubating", + "org.apache.spark" %% "spark-core" % "1.0.2", "org.scalatest" % "scalatest_2.10" % "2.1.0-RC2" % "test", "com.github.nscala-time" %% "nscala-time" % "0.8.0", "pl.edu.pw.elka" %% "sparkseq-core" % "0.1-SNAPSHOT" From c3630faa169bdcd6309e5709874c109866da354e Mon Sep 17 00:00:00 2001 From: marek Date: Mon, 15 Sep 2014 21:17:43 +0200 Subject: [PATCH 2/3] Migration to Apache Spark 1.1 --- .idea/workspace.xml | 198 ++++++++++++++++++++++------------------ sparkseq-core/build.sbt | 2 +- 2 files changed, 112 insertions(+), 88 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index bc991b8..27cd518 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -2,20 +2,6 @@ - - - - - - - - - - - - - - @@ -42,11 +28,9 @@ - + - - - + @@ -54,7 +38,7 @@ - + @@ -64,11 +48,9 @@ - + - - - + @@ -76,7 +58,7 @@ - + @@ -86,7 +68,7 @@ - + @@ -98,9 +80,7 @@ - - - + @@ -108,12 +88,9 @@ - + - - - - + @@ -173,7 +150,6 @@ - @@ -380,6 +356,7 @@ + @@ -419,6 +396,14 @@ - + @@ -640,7 +617,6 @@ - @@ -649,11 +625,12 @@ + + - - + @@ -681,13 +658,67 @@