diff --git a/.idea/uiDesigner.xml b/.idea/uiDesigner.xml
deleted file mode 100644
index 3b00020..0000000
--- a/.idea/uiDesigner.xml
+++ /dev/null
@@ -1,125 +0,0 @@
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
-
-
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
deleted file mode 100644
index bc30dc3..0000000
--- a/.idea/workspace.xml
+++ /dev/null
@@ -1,1473 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- localhost
- 5050
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- false
-
-
-
-
- 1393147893320
- 1393147893320
-
-
- 1393263619948
- 1393263619948
-
-
- 1393266837548
- 1393266837548
-
-
- 1393269323749
- 1393269323749
-
-
- 1393275179167
- 1393275179167
-
-
- 1393294369717
- 1393294369717
-
-
- 1393294784781
- 1393294784781
-
-
- 1393325474842
- 1393325474843
-
-
- 1393330140287
- 1393330140287
-
-
- 1393337417070
- 1393337417070
-
-
- 1393459608212
- 1393459608212
-
-
- 1393494579557
- 1393494579557
-
-
- 1393766892529
- 1393766892529
-
-
- 1393768324706
- 1393768324706
-
-
- 1393768764778
- 1393768764778
-
-
- 1393787228446
- 1393787228446
-
-
- 1393848377053
- 1393848377053
-
-
- 1393856195252
- 1393856195252
-
-
- 1393957330919
- 1393957330920
-
-
- 1393959188581
- 1393959188581
-
-
- 1393963276915
- 1393963276915
-
-
- 1393969557303
- 1393969557303
-
-
- 1394030246069
- 1394030246069
-
-
- 1394064129949
- 1394064129949
-
-
- 1394105500067
- 1394105500067
-
-
- 1394128021954
- 1394128021954
-
-
- 1394204286019
- 1394204286019
-
-
- 1394228821420
- 1394228821420
-
-
- 1394236973254
- 1394236973254
-
-
- 1394282321607
- 1394282321607
-
-
- 1394300394201
- 1394300394201
-
-
- 1394300497685
- 1394300497685
-
-
- 1394525889694
- 1394525889694
-
-
- 1394715101473
- 1394715101473
-
-
- 1394723841222
- 1394723841222
-
-
- 1394753140358
- 1394753140358
-
-
- 1394787415270
- 1394787415270
-
-
- 1394790653437
- 1394790653437
-
-
- 1394802804257
- 1394802804257
-
-
- 1394885846441
- 1394885846441
-
-
- 1394889128612
- 1394889128612
-
-
- 1394889209250
- 1394889209250
-
-
- 1394913565053
- 1394913565053
-
-
- 1394914870689
- 1394914870689
-
-
- 1394956773651
- 1394956773651
-
-
- 1394958954194
- 1394958954194
-
-
- 1394959773034
- 1394959773034
-
-
- 1394976589610
- 1394976589610
-
-
- 1394994616235
- 1394994616235
-
-
- 1395089544037
- 1395089544037
-
-
- 1395089723455
- 1395089723456
-
-
- 1395481713499
- 1395481713499
-
-
- 1395483543245
- 1395483543245
-
-
- 1395484574774
- 1395484574774
-
-
- 1395484913198
- 1395484913198
-
-
- 1395484992832
- 1395484992832
-
-
- 1395485046787
- 1395485046787
-
-
- 1395485221876
- 1395485221876
-
-
- 1395485296640
- 1395485296640
-
-
- 1395485423398
- 1395485423399
-
-
- 1395485524782
- 1395485524782
-
-
- 1395485882032
- 1395485882033
-
-
- 1395485970190
- 1395485970190
-
-
- 1395486062030
- 1395486062030
-
-
- 1395487020194
- 1395487020195
-
-
- 1395487213985
- 1395487213986
-
-
- 1395499289767
- 1395499289768
-
-
- 1395506408800
- 1395506408801
-
-
- 1395506670245
- 1395506670246
-
-
- 1395579292801
- 1395579292802
-
-
- 1395588144101
- 1395588144102
-
-
- 1395594086458
- 1395594086458
-
-
- 1395596449746
- 1395596449746
-
-
- 1395679719015
- 1395679719016
-
-
- 1395697405776
- 1395697405777
-
-
- 1395769306813
- 1395769306814
-
-
- 1395841763827
- 1395841763828
-
-
- 1395849745568
- 1395849745569
-
-
- 1395853163274
- 1395853163274
-
-
- 1395871433467
- 1395871433468
-
-
- 1395925893037
- 1395925893038
-
-
- 1395925998994
- 1395925998994
-
-
- 1396022327584
- 1396022327585
-
-
- 1396278199303
- 1396278199303
-
-
- 1396354347282
- 1396354347283
-
-
- 1396442798424
- 1396442798424
-
-
- 1396604536711
- 1396604536711
-
-
- 1396616464289
- 1396616464289
-
-
- 1396696268359
- 1396696268359
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- Scala
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 1.6
-
-
-
-
-
-
-
-
-
-
-
- sparkseq-repl
-
-
-
-
-
-
-
-
-
-
-
- SBT: asm:asm:3.1
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.idea_modules/sparkseq-build.iml b/.idea_modules/sparkseq-build.iml
deleted file mode 100644
index 7c10ecb..0000000
--- a/.idea_modules/sparkseq-build.iml
+++ /dev/null
@@ -1,81 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.idea_modules/sparkseq-core.iml b/.idea_modules/sparkseq-core.iml
deleted file mode 100644
index e06dc39..0000000
--- a/.idea_modules/sparkseq-core.iml
+++ /dev/null
@@ -1,137 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.idea_modules/sparkseq-repl.iml b/.idea_modules/sparkseq-repl.iml
deleted file mode 100644
index 092533b..0000000
--- a/.idea_modules/sparkseq-repl.iml
+++ /dev/null
@@ -1,138 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.idea_modules/sparkseq.iml b/.idea_modules/sparkseq.iml
deleted file mode 100644
index 6f1c236..0000000
--- a/.idea_modules/sparkseq.iml
+++ /dev/null
@@ -1,139 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/build.sbt b/build.sbt
index d0fa04c..7c95d8d 100644
--- a/build.sbt
+++ b/build.sbt
@@ -6,7 +6,7 @@ organization := "pl.edu.pw.elka"
version := "0.1-SNAPSHOT"
-scalaVersion := "2.10.3"
+scalaVersion := "2.10.4"
publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/html/maven")) )
diff --git a/sparkseq-core/build.sbt b/sparkseq-core/build.sbt
index 8ef5b52..0fe669b 100644
--- a/sparkseq-core/build.sbt
+++ b/sparkseq-core/build.sbt
@@ -8,7 +8,7 @@ organization := "pl.edu.pw.elka"
version := "0.1-SNAPSHOT"
-scalaVersion := "2.10.3"
+scalaVersion := "2.10.4"
publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/html/maven")))
@@ -18,15 +18,16 @@ lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HA
ScctPlugin.instrumentSettings
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % "0.9.0-incubating",
+ "org.apache.spark" %% "spark-core" % "1.1.0",
"org.scalatest" % "scalatest_2.10" % "2.1.0-RC2" % "test",
"org.apache.commons" % "commons-math3" % "3.2",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
- "fi.tkk.ics.hadoop.bam" % "hadoop-bam" % "6.1",
- "picard" % "picard" % "1.93",
- "samtools" % "samtools" % "1.93",
- "tribble" % "tribble" % "1.93",
- "variant" % "variant" % "1.93",
+ "org.seqdoop" % "hadoop-bam" % "7.0.0",
+ "org.seqdoop" % "htsjdk" % "1.118",
+ //"picard" % "picard" % "1.93",
+ //"samtools" % "samtools" % "1.93",
+ //"tribble" % "tribble" % "1.93",
+ //"variant" % "variant" % "1.93",
"com.github.nscala-time" %% "nscala-time" % "0.8.0"
)
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala
index 8f5ea59..0f114a2 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqBaseDE.scala
@@ -15,26 +15,14 @@
*/
package pl.elka.pw.sparkseq
-import pl.elka.pw.sparkseq.statisticalTests._
-import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
-import pl.elka.pw.sparkseq.statisticalTests._
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import collection.mutable.ArrayBuffer
-import org.apache.hadoop.io.LongWritable
-import fi.tkk.ics.hadoop.bam.{BAMInputFormat, SAMRecordWritable}
-import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
-import scala.util.control._
-import pl.elka.pw.sparkseq.util.SparkSeqContexProperties
-import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties
-import scala.Array
-import org.apache.spark.RangePartitioner
-import org.apache.spark.HashPartitioner
-import org.apache.spark.SparkConf
-import java.io._
-import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._
+import org.apache.hadoop.io.LongWritable
+import org.apache.spark.{SparkConf, SparkContext}
+import org.seqdoop.hadoop_bam.{BAMInputFormat, SAMRecordWritable}
import pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr
+import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
+import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties
+import pl.elka.pw.sparkseq.util.SparkSeqContexProperties
/**
* Created by marek on 2/8/14.
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala
index 2ff2fca..e518893 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/SparkSeqSimpleJob.scala
@@ -2,30 +2,16 @@
package pl.elka.pw.sparkseq
import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties
-import pl.elka.pw.sparkseq.util.SparkSeqContexProperties
-import pl.elka.pw.sparkseq.conversions.SparkSeqConversions._
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
+import pl.elka.pw.sparkseq.serialization.SparkSeqKryoProperties
+import pl.elka.pw.sparkseq.util.SparkSeqContexProperties
//import org.apache.spark.SparkConf
//import org.apache.spark._
-import org.apache.spark.storage._
//import SparkContext._
//import spark._
-import org.apache.spark.HashPartitioner
-import org.apache.spark.rdd._
-import fi.tkk.ics.hadoop.bam.BAMInputFormat
-import fi.tkk.ics.hadoop.bam.SAMRecordWritable
-import org.apache.hadoop.io.LongWritable
import org.apache.spark.SparkContext.rddToPairRDDFunctions
-import com.esotericsoftware.kryo.Kryo
-import collection.mutable.ArrayBuffer
-import pl.elka.pw.sparkseq.statisticalTests._
-import java.util.HashMap
-import scala.collection.mutable.Map
-import java.io.File
/**
* An example of a simple job implemented using SparkSeq.
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala
index b3253c3..ecf646d 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/differentialExpression/SparkSeqDiffExpr.scala
@@ -73,43 +73,43 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
private val cmDistTable = iSC.textFile(confDir + "cm" + caseSampleNum + "_" + controlSampleNum + "_2.txt")
.map(l => l.split("\t"))
.map(r => (r.array(0).toDouble, r.array(1).toDouble))
- .toArray
+ .collect()
private val cmDistTableB = iSC.broadcast(cmDistTable)
private val genExonsMapB = iSC.broadcast(SparkSeqConversions.BEDFileToHashMap(iSC, confDir + iBEDFile))
private val genExonsMapLookupB = iSC.broadcast(SparkSeqConversions.BEDFileToHashMapGeneExon(iSC, confDir + iBEDFile))
- private def groupSeqAnalysis(iSeqAnalysis: SparkSeqAnalysis, iSampleNum: Int): RDD[(Long, Seq[Int])] = {
+ private def groupSeqAnalysis(iSeqAnalysis: SparkSeqAnalysis, iSampleNum: Int): RDD[(Long, Iterable[Int])] = {
val seqGrouped = iSeqAnalysis.getCoverageBaseRegion(iChr, iStartPos, iEndPos)
.map(r => (r._1 % 1000000000000L, r._2))
.groupByKey()
- .mapValues(c => if ((iSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](iSampleNum - c.length)(0)) else (c))
+ .mapValues(c => if ((iSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](iSampleNum - c.size)(0)) else (c))
return (seqGrouped)
}
- private def joinSeqAnalysisGroup(iSeqAnalysisGroup1: RDD[(Long, Seq[Int])], iSeqAnalysisGroup2: RDD[(Long, Seq[Int])]): RDD[(Long, (Seq[Int], Seq[Int]))] = {
+ private def joinSeqAnalysisGroup(iSeqAnalysisGroup1: RDD[(Long, Iterable[Int])], iSeqAnalysisGroup2: RDD[(Long, Iterable[Int])]): RDD[(Long, (Iterable[Int], Iterable[Int]))] = {
/* val sAnalysisG1 = iSeqAnalysisGroup1.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup1))
val sAnalysisG2 = iSeqAnalysisGroup2.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup2))
val seqJoint = sAnalysisG1.cogroup(sAnalysisG2)*/
- val seqJoint: RDD[(Long, (Seq[Seq[Int]], Seq[Seq[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2)
+ val seqJoint: RDD[(Long, (Iterable[Iterable[Int]], Iterable[Iterable[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2)
val finalSeqJoint = seqJoint
// .mapValues(r=>(r._1(0),r._2(0)))
.map(r => (r._1,
- (if (r._2._1.length == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1(0),
- if (r._2._2.length == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2(0)))
+ (if (r._2._1.size == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1.head,
+ if (r._2._2.size == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2.head))
)
return (finalSeqJoint)
}
- private def joinSeqAnalysisGroupRegion(iSeqAnalysisGroup1: RDD[(Long, Seq[Int])], iSeqAnalysisGroup2: RDD[(Long, Seq[Int])]): RDD[(Long, (Seq[Int], Seq[Int]))] = {
+ private def joinSeqAnalysisGroupRegion(iSeqAnalysisGroup1: RDD[(Long, Iterable[Int])], iSeqAnalysisGroup2: RDD[(Long, Iterable[Int])]): RDD[(Long, (Iterable[Int], Iterable[Int]))] = {
/* val sAnalysisG1 = iSeqAnalysisGroup1.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup1))
val sAnalysisG2 = iSeqAnalysisGroup2.partitionBy(new RangePartitioner[Long, Seq[Int]](72, iSeqAnalysisGroup2))
val seqJoint = sAnalysisG1.cogroup(sAnalysisG2)*/
- val seqJoint: RDD[(Long, (Seq[Seq[Int]], Seq[Seq[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2)
+ val seqJoint: RDD[(Long, (Iterable[Iterable[Int]], Iterable[Iterable[Int]]))] = iSeqAnalysisGroup1.cogroup(iSeqAnalysisGroup2)
val finalSeqJoint = seqJoint
// .mapValues(r=>(r._1(0),r._2(0)))
.map(r => (r._1,
- (if (r._2._1.length == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1(0),
- if (r._2._2.length == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2(0)))
+ (if (r._2._1.size == 0) ArrayBuffer.fill[Int](caseSampleNum)(0) else r._2._1.head,
+ if (r._2._2.size == 0) ArrayBuffer.fill[Int](controlSampleNum)(0) else r._2._2.head))
)
return (finalSeqJoint)
}
@@ -131,7 +131,7 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
val coalRegions = iRegRDD.map(r => (((if (r._6 == "0") "NEWREG" + r._3._1 else r._5), r._6, r._3._1), (r._1, r._2, r._3, r._4, r._7, r._8, r._9)))
/*( (geneId,exonId,chrName), (pval,length,(chr,startPos), foldChange, pctOverlap,avgCountA,avgCountB) ) */
.groupByKey()
- .mapValues(r => (r.sortBy(_._3._2)))
+ .mapValues(r => (r.toArray.sortBy(_._3._2)))
.mapPartitions {
var k = 0
partitionIterator =>
@@ -216,9 +216,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
return coalRegions
}
- private def findContRegionsEqual(iSeq: RDD[((Int, Double), Seq[(Long, Double, Double, Double)])]):
+ private def findContRegionsEqual(iSeq: RDD[((Int, Double), Iterable[(Long, Double, Double, Double)])]):
RDD[(Double, Int, (String, Int), Double, String, String, Double, Double, Double)] = {
- val iSeqPart = iSeq.map(r => (r._1._2, r._2.sortBy(_._1)))
+ val iSeqPart = iSeq.map(r => (r._1._2, r._2.toSeq.sortBy(_._1)))
//.partitionBy(new HashPartitioner(iNumTasks * 3))
iSeqPart
.mapPartitions {
@@ -295,9 +295,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
}.flatMap(r => r)
}
- private def findContRegionsLessEqual(iSeq: RDD[(Int, Seq[(Double, Long, Double, Double, Double)])]) /*(chrId,(pval,position,foldChange) */
+ private def findContRegionsLessEqual(iSeq: RDD[(Int, Iterable[(Double, Long, Double, Double, Double)])]) /*(chrId,(pval,position,foldChange) */
: RDD[(Double, Int, (String, Int), Double, String, String, Double, Double, Double)] = {
- val iSeqPart = iSeq.mapValues(r => (r.sortBy(_._2)))
+ val iSeqPart = iSeq.mapValues(r => (r.toSeq.sortBy(_._2)))
// .partitionBy(new HashPartitioner(iNumTasks * 3))
iSeqPart
.mapPartitions {
@@ -495,9 +495,9 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
//seqGroupControl.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseControl")
val seqJointCC = joinSeqAnalysisGroup(seqGroupCase, seqGroupControl)
//seqJointCC.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseJoint")
- val seqFilterCC = seqJointCC.filter(r => (SparkSeqStats.mean(r._2._1) >= iMinCoverage || SparkSeqStats.mean(r._2._2) >= iMinCoverage))
+ val seqFilterCC = seqJointCC.filter(r => (SparkSeqStats.mean(r._2._1.toArray) >= iMinCoverage || SparkSeqStats.mean(r._2._2.toArray) >= iMinCoverage))
//seqFilterCC.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseFilter")
- val seqCompTest = computeTwoSampleCvMTest(seqFilterCC)
+ val seqCompTest = computeTwoSampleCvMTest(seqFilterCC.map(r=>(r._1,(r._2._1.toSeq,r._2._2.toSeq)) ) )
//seqCompTest.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugBaseTest")
val seqPValGroup = seqCompTest
if (iCoalesceRegDiffPVal == false)
@@ -508,7 +508,7 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
.groupByKey()
val seqPostPar = {
- seqPrePart.partitionBy(new RangePartitioner[Int, Seq[(Double, Long, Double, Double, Double)]](iNumTasks, seqPrePart))
+ seqPrePart.partitionBy(new RangePartitioner[Int, Iterable[(Double, Long, Double, Double, Double)]](iNumTasks, seqPrePart))
}
seqRegContDERDD = findContRegionsLessEqual(seqPostPar)
//seqRegContDERDD.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/debugRegCoals")
@@ -536,16 +536,16 @@ class SparkSeqDiffExpr(iSC: SparkContext, iSeqAnalCase: SparkSeqAnalysis, iSeqAn
HashMap[String, Array[scala.collection.mutable.ArrayBuffer[(String, String, Int, Int)]]]]): RDD[((String), Double, Double)] = {
val seqRegCovCase = iSeqAnalCase.getCoverageRegion(iRegions).map(r => ((SparkSeqConversions.stripSampleID(r._1), r._2)))
.groupByKey()
- .mapValues(c => if ((caseSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](caseSampleNum - c.length)(0)) else (c))
+ .mapValues(c => if ((caseSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](caseSampleNum - c.size)(0)) else (c))
val seqRegCovControl = iSeqAnalControl.getCoverageRegion(iRegions).map(r => ((SparkSeqConversions.stripSampleID(r._1), r._2)))
.groupByKey()
- .mapValues(c => if ((controlSampleNum - c.length) > 0) (c ++ ArrayBuffer.fill[Int](controlSampleNum - c.length)(0)) else (c))
+ .mapValues(c => if ((controlSampleNum - c.size) > 0) (c ++ ArrayBuffer.fill[Int](controlSampleNum - c.size)(0)) else (c))
val jointRegion = joinSeqAnalysisGroupRegion(seqRegCovCase, seqRegCovControl)
val permTestRegionD = jointRegion.map {
r =>
val statTests = Array[SparkSeqStatisticalTest](SparkSeqCvM2STest, SparkSeqKS2STest)
- val permTest = new SparkSeqAdaptivePermutTest(iNPermut = 10000, iStatTests = statTests, r._2._1, r._2._2)
- (SparkSeqConversions.ensemblRegionIdToExonId(r._1), permTest.getPvalue(), SparkSeqStats.mean(r._2._1) / SparkSeqStats.mean(r._2._2), r._2._1, r._2._2)
+ val permTest = new SparkSeqAdaptivePermutTest(iNPermut = 10000, iStatTests = statTests, r._2._1.toSeq, r._2._2.toSeq)
+ (SparkSeqConversions.ensemblRegionIdToExonId(r._1), permTest.getPvalue(), SparkSeqStats.mean(r._2._1.toSeq) / SparkSeqStats.mean(r._2._2.toSeq), r._2._1, r._2._2)
}
//permTestRegionD.saveAsTextFile("hdfs://sparkseq002.cloudapp.net:9000/BAM/64MB/debugTestStat.txt")
val permTestRegion = permTestRegionD.map(r => (r._1, r._2, r._3))
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala
index cd06f11..7997ac1 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/junctions/SparkSeqJunctionAnalysis.scala
@@ -16,12 +16,13 @@
package pl.elka.pw.sparkseq.junctions
-import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
-import org.apache.spark.rdd.{EmptyRDD, RDD}
+import htsjdk.samtools.CigarOperator
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
+import pl.elka.pw.sparkseq.seqAnalysis.SparkSeqAnalysis
+
import scala.collection.mutable.ArrayBuffer
-import net.sf.samtools.CigarOperator
-import org.apache.spark.SparkContext._
/**
* Created by mwiewiorka on 4/4/14.
@@ -49,7 +50,7 @@ class SparkSeqJunctionAnalysis(seqAnalysis: SparkSeqAnalysis) extends Serializab
}
- private def getGapFromCigar(alignStart: Int, cigar: net.sf.samtools.Cigar): Array[Range] = {
+ private def getGapFromCigar(alignStart: Int, cigar: htsjdk.samtools.Cigar): Array[Range] = {
var gapArray = ArrayBuffer[Range]()
val numCigElem = cigar.numCigarElements()
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala
index 6cc7d9f..e732b20 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/seqAnalysis/SparkSeqAnalysis.scala
@@ -19,15 +19,16 @@ import org.apache.spark.SparkContext
import SparkContext._
import org.apache.spark.rdd._
import org.apache.spark._
-import fi.tkk.ics.hadoop.bam.BAMInputFormat
-import fi.tkk.ics.hadoop.bam.SAMRecordWritable
+import org.seqdoop.hadoop_bam.BAMInputFormat
+import org.seqdoop.hadoop_bam.SAMRecordWritable
import org.apache.hadoop.io.LongWritable
import scala.util.control._
import scala.collection.mutable.ArrayBuffer
import pl.elka.pw.sparkseq.conversions.SparkSeqConversions
import java.io.{File, PrintWriter}
import pl.elka.pw.sparkseq.util.SparkSeqRegType._
-import net.sf.samtools.SAMUtils._
+import htsjdk.samtools.SAMRecord
+import htsjdk.samtools.SAMUtils._
import scala.Function._
import com.sun.org.apache.xpath.internal.operations.Bool
@@ -89,7 +90,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
* @param iCigar Cigar string of a read aligments
* @return Array of ranges computed from Cigar string.
*/
- private def genBasesFromCigar(iAlignStart: Int, iCigar: net.sf.samtools.Cigar): Array[Range] = {
+ private def genBasesFromCigar(iAlignStart: Int, iCigar:htsjdk.samtools.Cigar): Array[Range] = {
var nuclReadArray = ArrayBuffer[Range]()
val numCigElem = iCigar.numCigarElements()
@@ -347,9 +348,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Get all reads from all samples in format (sampleId,ReadObject)
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def getReads(): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def getReads(): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
return bamFileFilter
}
@@ -357,9 +358,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Get all reads for a specific sample in format (sampleId,ReadObject)
* @param sampleID ID of a given sample
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def getSampleReads(sampleID: Int): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def getSampleReads(sampleID: Int): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
return getReads().filter(r => r._1 == sampleID)
}
@@ -368,23 +369,23 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
* Set reads of SeqAnalysis object, e.g. after external filtering
* @param reads RDD of (sampleID,ReadObject)
*/
- def setReads(reads: RDD[(Int, net.sf.samtools.SAMRecord)]) = {
+ def setReads(reads: RDD[(Int, htsjdk.samtools.SAMRecord)]) = {
bamFileFilter = reads
}
/**
* Generic method for filtering out all reads using the condition provided: _.1 refers to sampleID, _.2 to ReadObject .
- * @param filterCond ((Int, net.sf.samtools.SAMRecord)) => Boolean
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @param filterCond ((Int, htsjdk.samtools.SAMRecord)) => Boolean
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterReads(filterCond: ((Int, net.sf.samtools.SAMRecord)) => Boolean): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReads(filterCond: ((Int, htsjdk.samtools.SAMRecord)) => Boolean): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(filterCond)
return bamFileFilter
}
- def filterReads(filterCond: ((Int, net.sf.samtools.SAMRecord), (Int, net.sf.samtools.SAMRecord)) => Boolean): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReads(filterCond: ((Int, htsjdk.samtools.SAMRecord), (Int, htsjdk.samtools.SAMRecord)) => Boolean): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => filterCond(r, r))
return bamFileFilter
@@ -393,15 +394,15 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the sample id
* @param sampleCond Condition on the sample id
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterSample(sampleCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterSample(sampleCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => sampleCond(r._1))
return bamFileFilter
}
- def filterSample(sampleCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterSample(sampleCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => sampleCond(r._1, r._1))
return bamFileFilter
@@ -410,15 +411,15 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the quality of mapping
* @param qaulityCond - Conditions on the quality of read mapping
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord]
*/
- def filterMappingQuality(qaulityCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterMappingQuality(qaulityCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => qaulityCond(r._2.getMappingQuality))
return bamFileFilter
}
- def filterMappingQuality(qaulityCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterMappingQuality(qaulityCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => qaulityCond(r._2.getMappingQuality, r._2.getMappingQuality))
return bamFileFilter
@@ -427,14 +428,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the base qualities of a given read
* @param baseQualCond Conditions on the quality of read bases
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterBaseQualities(baseQualCond: (Array[Int] => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterBaseQualities(baseQualCond: (Array[Int] => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => baseQualCond(r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r))))
return bamFileFilter
}
- def filterBaseQualities(baseQualCond: ((Array[Int], Array[Int]) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterBaseQualities(baseQualCond: ((Array[Int], Array[Int]) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => baseQualCond(r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r)),
r._2.getBaseQualityString.toCharArray.map(r => fastqToPhred(r))))
return bamFileFilter
@@ -443,14 +444,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the reference name
* @param refNameCond Condition on reference name
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterReferenceName(refNameCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReferenceName(refNameCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => refNameCond(SparkSeqConversions.standardizeChr(r._2.getReferenceName)))
return bamFileFilter
}
- def filterReferenceName(refNameCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReferenceName(refNameCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => refNameCond(SparkSeqConversions.standardizeChr(r._2.getReferenceName), SparkSeqConversions.standardizeChr(r._2.getReferenceName)))
return bamFileFilter
}
@@ -458,14 +459,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the start of alignment
* @param alignStartCond Condition on the start of the alignment
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterAlignmentStart(alignStartCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentStart(alignStartCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignStartCond(r._2.getAlignmentStart))
return bamFileFilter
}
- def filterAlignmentStart(alignStartCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentStart(alignStartCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignStartCond(r._2.getAlignmentStart, r._2.getAlignmentStart))
return bamFileFilter
}
@@ -473,14 +474,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the end of alignment
* @param alignEndCond Condition on the end of the alignment
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterAlignmentEnd(alignEndCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentEnd(alignEndCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignEndCond(r._2.getAlignmentEnd))
return bamFileFilter
}
- def filterAlignmentEnd(alignEndCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentEnd(alignEndCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignEndCond(r._2.getAlignmentEnd, r._2.getAlignmentEnd))
return bamFileFilter
}
@@ -489,14 +490,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
* Generic method for filtering reads using conditions on the merged flags.
* More info http://picard.sourceforge.net/explain-flags.html
* @param flagCond Condition on the merged flags
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterFlags(flagCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterFlags(flagCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => flagCond(r._2.getFlags))
return bamFileFilter
}
- def filterFlags(flagCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterFlags(flagCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => flagCond(r._2.getFlags, r._2.getFlags))
return bamFileFilter
}
@@ -504,9 +505,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the mapped flag
* @param unmapFlagCond Condition on the end of the mapped flag
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, nhtsjdk.samtools.SAMRecord)]
*/
- def filterUnmappedFlag(unmapFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterUnmappedFlag(unmapFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => unmapFlagCond(r._2.getReadUnmappedFlag))
return bamFileFilter
}
@@ -514,9 +515,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the duplicate flag
* @param dupFlagCond Condition on the end of the duplicate flag
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterDuplicateReadFlag(dupFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterDuplicateReadFlag(dupFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => dupFlagCond(r._2.getDuplicateReadFlag))
return bamFileFilter
}
@@ -524,9 +525,9 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the primary flag
* @param notPrimFlagCond Condition on the end of the primary flag
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterNotPrimaryAlignFlag(notPrimFlagCond: (Boolean => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterNotPrimaryAlignFlag(notPrimFlagCond: (Boolean => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => notPrimFlagCond(r._2.getNotPrimaryAlignmentFlag))
return bamFileFilter
}
@@ -534,14 +535,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the read name
* @param readNameCond Condition on the end of the read name
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterReadName(readNameCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReadName(readNameCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => readNameCond(r._2.getReadName))
return bamFileFilter
}
- def filterReadName(readNameCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReadName(readNameCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => readNameCond(r._2.getReadName, r._2.getReadName))
return bamFileFilter
}
@@ -549,14 +550,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the read length
* @param readLengthCond Condition on the end of the read length
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterReadLength(readLengthCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReadLength(readLengthCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => readLengthCond(r._2.getReadLength))
return bamFileFilter
}
- def filterReadLength(readLengthCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterReadLength(readLengthCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => readLengthCond(r._2.getReadLength, r._2.getReadLength))
return bamFileFilter
}
@@ -564,14 +565,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the CIGAR string
* @param cigarStringCond Condition on the end of the CIGAR string
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterCigarString(cigarStringCond: (String => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterCigarString(cigarStringCond: (String => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => cigarStringCond(r._2.getCigarString))
return bamFileFilter
}
- def filterCigarString(cigarStringCond: ((String, String) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterCigarString(cigarStringCond: ((String, String) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => cigarStringCond(r._2.getCigarString, r._2.getCigarString))
return bamFileFilter
}
@@ -580,14 +581,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the CIGAR object
* @param cigarCond Condition on the end of the CIGAR object
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int,htsjdk.samtools.SAMRecord)]
*/
- def filterCigar(cigarCond: (net.sf.samtools.Cigar => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterCigar(cigarCond: (htsjdk.samtools.Cigar => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => cigarCond(r._2.getCigar))
return bamFileFilter
}
- def filterCigar(cigarCond: ((net.sf.samtools.Cigar, net.sf.samtools.Cigar) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterCigar(cigarCond: ((htsjdk.samtools.Cigar, htsjdk.samtools.Cigar) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => cigarCond(r._2.getCigar, r._2.getCigar))
return bamFileFilter
}
@@ -595,14 +596,14 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
/**
* Method for filtering reads using conditions on the alignment length covered by read incl. gaps
* @param alignLenCond Condition on the end of the alignmrnent length covered by read incl. gaps
- * @return RDD[(Int, net.sf.samtools.SAMRecord)]
+ * @return RDD[(Int, htsjdk.samtools.SAMRecord)]
*/
- def filterAlignmentLength(alignLenCond: (Int => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentLength(alignLenCond: (Int => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignLenCond(r._2.getCigar.getPaddedReferenceLength))
return bamFileFilter
}
- def filterAlignmentLength(alignLenCond: ((Int, Int) => Boolean)): RDD[(Int, net.sf.samtools.SAMRecord)] = {
+ def filterAlignmentLength(alignLenCond: ((Int, Int) => Boolean)): RDD[(Int, htsjdk.samtools.SAMRecord)] = {
bamFileFilter = bamFileFilter.filter(r => alignLenCond(r._2.getCigar.getPaddedReferenceLength, r._2.getCigar.getPaddedReferenceLength))
return bamFileFilter
}
@@ -613,7 +614,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
.map(r => (r._1._2, (r._1._1, r._2)))
.groupByKey()
.sortByKey()
- .mapValues(r => r.sortBy(r => r._1))
+ .mapValues(r => r.toSeq.sortBy(r => r._1))
.collect()
var samplesHeader: String = ""
val samplesIDSort = samplesID.sortBy(r => r)
@@ -661,7 +662,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
.map(r => (r._1._2, (r._1._1, r._2)))
.groupByKey()
.sortByKey()
- .mapValues(r => r.sortBy(r => r._1))
+ .mapValues(r => r.toSeq.sortBy(r => r._1))
.collect()
var samplesHeader: String = ""
val samplesIDSort = samplesID.sortBy(r => r)
@@ -722,7 +723,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
.map(r => (r._1._2, (r._1._1, r._2)))
.groupByKey()
.sortByKey()
- .mapValues(r => r.sortBy(r => r._1))
+ .mapValues(r => r.toSeq.sortBy(r => r._1))
.collect()
val samplesIDSort = samplesID.sortBy(r => r)
var i = 0
@@ -925,7 +926,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
* @param iAsc If results should be sorted ascending (by default)
* @return RDD of((sampleID,(chrName,alignStart)),read object)
*/
- def sortReadsByAlign(iAsc: Boolean = true): RDD[((Int, (String, Int)), net.sf.samtools.SAMRecord)] = {
+ def sortReadsByAlign(iAsc: Boolean = true): RDD[((Int, (String, Int)), htsjdk.samtools.SAMRecord)] = {
val sortReads = getReads
.map(r => ((r._1, SparkSeqConversions.chrToLong(r._2.getReferenceName), r._2.getAlignmentStart), r._2))
@@ -941,7 +942,7 @@ class SparkSeqAnalysis(iSC: SparkContext, iBAMFile: String, iSampleId: Int, iNor
* @param iAsc If results should be sorted ascending (by default)
* @return RDD of((sampleID,(chrName,alignStart)),read object)
*/
- def sortSampleReadsByAlign(iSampleID: Int, iAsc: Boolean = true): RDD[((Int, (String, Int)), net.sf.samtools.SAMRecord)] = {
+ def sortSampleReadsByAlign(iSampleID: Int, iAsc: Boolean = true): RDD[((Int, (String, Int)), htsjdk.samtools.SAMRecord)] = {
val sortReads = getSampleReads(iSampleID)
.map(r => ((r._1, SparkSeqConversions.chrToLong(r._2.getReferenceName), r._2.getAlignmentStart), r._2))
diff --git a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala
index 6a1c53e..583f164 100644
--- a/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala
+++ b/sparkseq-core/src/main/scala/pl/elka/pw/sparkseq/serialization/SparkSeqKryoRegistrator.scala
@@ -16,8 +16,6 @@
package pl.elka.pw.sparkseq.serialization
import com.esotericsoftware.kryo.Kryo
-import org.apache.spark.serializer.KryoRegistrator
-import pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr
/**
* Class for registering various classes with KryoSerializer.
@@ -29,10 +27,10 @@ class SparkSeqKryoRegistrator extends org.apache.spark.serializer.KryoRegistrato
*/
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[fi.tkk.ics.hadoop.bam.SAMRecordWritable])
- kryo.register(classOf[net.sf.samtools.Cigar])
+ kryo.register(classOf[htsjdk.samtools.Cigar])
kryo.register(classOf[fi.tkk.ics.hadoop.bam.BAMInputFormat])
kryo.register(classOf[org.apache.hadoop.io.LongWritable])
- kryo.register(classOf[net.sf.samtools.BAMRecord])
+ kryo.register(classOf[htsjdk.samtools.BAMRecord])
//kryo.register(classOf[pl.elka.pw.sparkseq.differentialExpression.SparkSeqDiffExpr])
//kryo.register(classOf[scala.collection.Traversable[_]], new ScalaCollectionSerializer(kryo))
//kryo.register(classOf[scala.Product], new ScalaProductSerializer(kryo))
diff --git a/sparkseq-repl/build.sbt b/sparkseq-repl/build.sbt
index bc52cca..32423b7 100644
--- a/sparkseq-repl/build.sbt
+++ b/sparkseq-repl/build.sbt
@@ -4,14 +4,14 @@ version := "0.1-SNAPSHOT"
organization := "pl.edu.pw.elka"
-scalaVersion := "2.10.3"
+scalaVersion := "2.10.4"
publishTo := Some(Resolver.file("file", new File("/var/www/maven.sparkseq001.cloudapp.net/maven")) )
ScctPlugin.instrumentSettings
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-core" % "0.9.0-incubating",
+ "org.apache.spark" %% "spark-core" % "1.0.2",
"org.scalatest" % "scalatest_2.10" % "2.1.0-RC2" % "test",
"com.github.nscala-time" %% "nscala-time" % "0.8.0",
"pl.edu.pw.elka" %% "sparkseq-core" % "0.1-SNAPSHOT"