diff --git a/pom.xml b/pom.xml
index 5482b8c..5e34cdf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -279,9 +279,22 @@
org.apache.maven.plugins
maven-surefire-plugin
- 2.18
+ 2.18.1
- true
+
+ **/Test*.java
+ **/*Test.java
+ **/*TestCase.java
+ **/*Suite.java
+
+ ${project.build.directory}/surefire-reports
+
+
+ ${test_classpath}
+
@@ -291,7 +304,7 @@
${project.build.directory}/surefire-reports
.
- ${project.build.directory}/ToonaTestSuite.txt
+ MarlinTestSuite.txt
-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m
${session.executionRootDirectory}
@@ -302,6 +315,13 @@
false
true
+
+
+ ${test_classpath}
+
diff --git a/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala b/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala
index 7eda6f3..61e69d4 100644
--- a/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala
+++ b/src/main/scala/edu/nju/pasalab/marlin/matrix/BlockMatrix.scala
@@ -80,12 +80,10 @@ class BlockMatrix(
val mat = BDM.zeros[Double](m, n)
blocks.collect().foreach {
case (blkID, matrix) =>
- val rowStart = blkID.row
- val colStart = blkID.column
- matrix.activeIterator.foreach {
- case ((i, j), v) =>
- mat(rowStart * mostBlkRowLen + i, colStart * mostBlkColLen + j) = v
- }
+ val rowStart = blkID.row * mostBlkRowLen
+ val colStart = blkID.column * mostBlkColLen
+ mat(rowStart until rowStart + matrix.rows,
+ colStart until colStart + matrix.cols) := matrix
}
mat
}
@@ -302,7 +300,7 @@ class BlockMatrix(
}else {
(blkId.column + 1) * colBlkSize
}
- (blkId, (blk.asInstanceOf[BDM[Double]] *
+ (BlockID(blkId.row, 0), (blk.asInstanceOf[BDM[Double]] *
Bb.value(startRow until endRow, ::)).asInstanceOf[BDM[Double]])
}.reduceByKey(_ + _)
new BlockMatrix(blocks, numRows(), B.cols, numBlksByRow(), numBlksByCol())
diff --git a/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala b/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala
index 2001a6e..ab25fc0 100644
--- a/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala
+++ b/src/main/scala/edu/nju/pasalab/marlin/matrix/DenseVecMatrix.scala
@@ -184,7 +184,7 @@ class DenseVecMatrix(
}
result
}
-
+
/**
* Matrix-matrix multiply
@@ -196,8 +196,8 @@ class DenseVecMatrix(
* @return result in BlockMatrix type
*/
def multiply(other: DistributedMatrix,
- cores: Int,
- broadcastThreshold: Int = 300): DistributedMatrix = {
+ cores: Int,
+ broadcastThreshold: Int = 300): DistributedMatrix = {
require(numCols == other.numRows(),
s"Dimension mismatch during matrix-matrix multiplication: ${numCols()} vs ${other.numRows()}")
other match {
@@ -222,9 +222,9 @@ class DenseVecMatrix(
val broadSize = broadcastThreshold * 1024 * 1024 / 8
if (that.numRows() * that.numCols() <= broadSize) {
this.multiply(that.toBreeze())
- } else if(this.numRows() * this.numCols() <= broadSize ){
+ } else if (this.numRows() * this.numCols() <= broadSize) {
that.multiplyBy(this.toBreeze())
- }else{
+ } else {
val splitMethod =
MTUtils.splitMethod(numRows(), numCols(), other.numCols(), cores)
multiply(that, splitMethod)
@@ -241,9 +241,9 @@ class DenseVecMatrix(
require(numRows() == other.numRows(), s"Dimension mismatch: ${numRows()} vs ${other.numRows()}")
other match {
case that: DenseVecMatrix =>
- val result = rows.join(that.rows).map(t => {
- (t._1, BDV(t._2._1.toArray ++: t._2._2.toArray))
- })
+ val result = rows.join(that.rows).mapValues{case(v1, v2) =>
+ BDV(v1.toArray ++: v2.toArray)
+ }
new DenseVecMatrix(result, numRows(), numCols() + that.numCols())
case that: BlockMatrix =>
val thatDenVec = that.toDenseVecMatrix()
@@ -252,7 +252,7 @@ class DenseVecMatrix(
throw new IllegalArgumentException("Do not support this type " + that.getClass + " for cBind operation")
}
}
-
+
/**
* multiply a elementary matrix on the left to apply row switching transformations
@@ -266,7 +266,7 @@ class DenseVecMatrix(
s"Dimension mismatch, row permutation matrix: ${permutation.length} vs $nRows")
val index = rows.context.parallelize(permutation.zipWithIndex.toSeq, getClusterCores())
.map(t => (t._1.toLong, t._2.toLong))
- val result = rows.join(index).map(t => (t._2._2, t._2._1))
+ val result = rows.join(index).map{case(id1, (v, id2)) => (id2, v)}
new DenseVecMatrix(result, numRows(), numCols())
}
@@ -282,7 +282,7 @@ class DenseVecMatrix(
*
* @param mode in which manner should the result be calculated, locally or distributed
*/
-def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
+ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
require(numRows() == numCols(),
s"LU decompose only support square matrix: ${numRows()} v.s ${numCols()}")
object LUmode extends Enumeration {
@@ -299,7 +299,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
}
val (luResult: BlockMatrix, perm: Array[Int]) = computeMode match {
- case LUmode.LocalBreeze => {
+ case LUmode.LocalBreeze =>
val brz = toBreeze()
val lu = brzLU(brz)
val pArray = (0 until lu._2.length).toArray
@@ -308,11 +308,10 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
pArray(i) = pArray(lu._2(i) - 1)
pArray(lu._2(i) - 1) = tmp
}
- val blk = rows.context.parallelize(Seq((new BlockID(0, 0), lu._1)), 1)
+ val blk = rows.context.parallelize(Seq((BlockID(0, 0), lu._1)), 1)
(new BlockMatrix(blk, lu._1.rows, lu._1.cols, 1, 1), pArray)
- }
- case LUmode.DistSpark =>
+ case LUmode.DistSpark =>
val subMatrixBaseSize = rows.context.getConf.getInt("marlin.lu.basesize", 1000)
val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt
val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt
@@ -394,44 +393,43 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val matThirdEmit = matThird
.mapPartitions({
- iter =>
- iter.flatMap(t => {
- val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum)
- for (j <- 0 until nSplitNum) {
- //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column
- val seq = 0
- array(j) = (new BlockID(t._1.row, (j + i + 1), seq), t._2)
+ iter =>
+ iter.flatMap { case (blkId, blk) =>
+ val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum)
+ for (j <- 0 until nSplitNum) {
+ //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column
+ val seq = 0
+ array(j) = (BlockID(blkId.row, (j + i + 1), seq), blk)
+ }
+ array
}
- array
- })
- }) //.partitionBy(partitioner)
+ }) //.partitionBy(partitioner)
val matSecondEmit = matSecond
.mapPartitions({
- iter =>
- iter.flatMap(t => {
- val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum)
- for (j <- 0 until mSplitNum) {
- //val seq = (j + i + 1) * nSplitNum * kSplitNum + t._1.column * kSplitNum + t._1.row
- val seq = 0
- array(j) = (new BlockID(j + i + 1, t._1.column, seq), t._2)
+ iter =>
+ iter.flatMap { case (blkId, blk) =>
+ val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum)
+ for (j <- 0 until mSplitNum) {
+ val seq = 0
+ array(j) = (BlockID(j + i + 1, blkId.column, seq), blk)
+ }
+ array
}
- array
- })
- }) //.partitionBy(partitioner)
+ }) //.partitionBy(partitioner)
println("matThirdEmit and matSecondEmit same partitioner? "
+ (matSecondEmit.partitioner == matThirdEmit.partitioner).toString)
val mult = matThirdEmit.join(matSecondEmit, partitioner)
- .mapValues(t => {
- val mat = (t._1.asInstanceOf[BDM[Double]] * (bdata.value \ t._2.asInstanceOf[BDM[Double]]))
- .asInstanceOf[BDM[Double]]
- (mat)
- }) //.partitionBy(partitioner)
+ .mapValues { case (blk1, blk2) => {
+ val mat = (blk1.asInstanceOf[BDM[Double]] * (bdata.value \ blk2.asInstanceOf[BDM[Double]]))
+ .asInstanceOf[BDM[Double]]
+ mat
+ }
+ } //.partitionBy(partitioner)
blkMat = matForth
.join(mult, partitioner).mapValues(t => t._1 - t._2) //.partitionBy(partitioner).cache()
.cache()
- //println(blkMat.toDebugString)
}
}
@@ -443,32 +441,31 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val bpArray = blkMat.context.broadcast(pArray)
blkMat = blkMat.mapPartitions(iter =>
- iter.map(block =>
- if (block._1.row > block._1.column) {
+ iter.map { case (blkId, blk) =>
+ if (blkId.row > blkId.column) {
val array = bpArray.value
- .slice(subMatrixBase * block._1.row,
- if (block._1.row == numBlksByRow - 1) {
+ .slice(subMatrixBase * blkId.row,
+ if (blkId.row == numBlksByRow - 1) {
numRows().toInt
}
else {
- subMatrixBase * block._1.row + subMatrixBase
+ subMatrixBase * blkId.row + subMatrixBase
})
val permutation = BDM.zeros[Double](array.length, array.length)
for (j <- 0 until array.length) {
- permutation.update(j, array(j) - subMatrixBase * block._1.row, 1.0)
+ permutation.update(j, array(j) - subMatrixBase * blkId.row, 1.0)
}
- (block._1, permutation * block._2)
+ (blkId, permutation * blk)
} else {
- block
+ (blkId, blk)
}
- ), true).cache()
+ }, true).cache()
val result = new BlockMatrix(blkMat, numRows(), numCols(), numBlksByRow, numBlksByCol)
// println("cnt:" + result.blocks.count())
(result, pArray)
}
-
(luResult, perm)
}
@@ -500,9 +497,9 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
case LUmode.LocalBreeze =>
val brz = toBreeze()
val l = brzCholesky(brz)
- val blk = rows.context.parallelize(Seq((new BlockID(0, 0), l)), 1)
- (new BlockMatrix(blk, l.rows, l.cols, 1, 1))
- case LUmode.DistSpark => {
+ val blk = rows.context.parallelize(Seq((BlockID(0, 0), l)), 1)
+ new BlockMatrix(blk, l.rows, l.cols, 1, 1)
+ case LUmode.DistSpark =>
val subMatrixBaseSize = rows.context.getConf.getInt("marlin.cholesky.basesize", 1000)
val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt
val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt
@@ -512,7 +509,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
blkMat.cache()
println("numBlkByRow is: " + numBlksByRow)
- println("original partitioner: " + partitioner.toString())
+ println("original partitioner: " + partitioner.toString)
val scatterRdds = Array.ofDim[RDD[(BlockID, BDM[Double])]](numBlksByRow - 1, 2)
for (i <- 0 until numBlksByRow) {
@@ -537,19 +534,19 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val nSplitNum = numBlksByRow - (i + 1)
val mult = scatterRdds(i)(1)
.mapPartitions({
- iter =>
- iter.flatMap(t => {
- val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum + 1)
- for (j <- 0 until array.length) {
- if (j < t._1.row - i) {
- array(j) = (new BlockID(t._1.row, j + i + 1, 0), t._2)
- } else {
- array(j) = (new BlockID(i + j, t._1.row, 0), t._2.t)
+ iter =>
+ iter.flatMap{case(blkId, blk) =>
+ val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum + 1)
+ for (j <- 0 until array.length) {
+ if (j < blkId.row - i) {
+ array(j) = (BlockID(blkId.row, j + i + 1, 0), blk)
+ } else {
+ array(j) = (BlockID(i + j, blkId.row, 0), blk.t)
+ }
}
+ array
}
- array
- })
- }).reduceByKey(partitioner, (a, b) => if (a.isTranspose) b * a else a * b)
+ }).reduceByKey(partitioner, (a, b) => if (a.isTranspose) b * a else a * b)
blkMat = blksForth.join(mult, partitioner).mapValues(t => t._1 - t._2).cache()
}
@@ -560,7 +557,6 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
}
//blkMat.partitionBy(partitioner)
new BlockMatrix(blkMat)
- }
}
println("row:" + luResult.blocks.count())
luResult
@@ -588,14 +584,12 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
}
val luResult: BlockMatrix = computeMode match {
- case LUmode.LocalBreeze => {
+ case LUmode.LocalBreeze =>
val mat = toBreeze()
val inverse = brzInv(mat)
- val blk = rows.context.parallelize(Seq((new BlockID(0, 0), inverse)), 1)
+ val blk = rows.context.parallelize(Seq((BlockID(0, 0), inverse)), 1)
new BlockMatrix(blk, inverse.rows, inverse.cols, 1, 1)
- }
-
- case LUmode.DistSpark => {
+ case LUmode.DistSpark =>
val subMatrixBaseSize = rows.context.getConf.getInt("marlin.inverse.basesize", 1000)
val numBlksByRow, numBlksByCol = math.ceil(numRows().toDouble / subMatrixBaseSize.toDouble).toInt
val subMatrixBase = math.ceil(numRows().toDouble / numBlksByRow.toDouble).toInt
@@ -645,40 +639,38 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val matThirdEmit = matThird
.mapPartitions({
- iter =>
- iter.flatMap(t => {
- val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum)
- for (j <- 0 until nSplitNum) {
- //val seq = t._1.row * nSplitNum * kSplitNum + (j+i+1) * kSplitNum + t._1.column
- val seq = 0
- array(j) = (new BlockID(t._1.row, (j + i + 1), seq), t._2)
- }
- array
- })
- }) //.partitionBy(partitioner)
+ iter =>
+ iter.flatMap(t => {
+ val array = Array.ofDim[(BlockID, BDM[Double])](nSplitNum)
+ for (j <- 0 until nSplitNum) {
+ val seq = 0
+ array(j) = (BlockID(t._1.row, (j + i + 1), seq), t._2)
+ }
+ array
+ })
+ }) //.partitionBy(partitioner)
val matSecondEmit = matSecond
.mapPartitions({
- iter =>
- iter.flatMap(t => {
- val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum)
- for (j <- 0 until mSplitNum) {
- //val seq = (j + i + 1) * nSplitNum * kSplitNum + t._1.column * kSplitNum + t._1.row
- val seq = 0
- array(j) = (new BlockID(j + i + 1, t._1.column, seq), t._2)
+ iter =>
+ iter.flatMap{case(blkId, blk) =>
+ val array = Array.ofDim[(BlockID, BDM[Double])](mSplitNum)
+ for (j <- 0 until mSplitNum) {
+ val seq = 0
+ array(j) = (BlockID(j + i + 1, blkId.column, seq), blk)
+ }
+ array
}
- array
- })
- }) //.partitionBy(partitioner)
+ }) //.partitionBy(partitioner)
println("matThirdEmit and matSecondEmit same partitioner? "
+ (matSecondEmit.partitioner == matThirdEmit.partitioner).toString)
val mult = matThirdEmit.join(matSecondEmit, partitioner)
- .mapValues(t => {
- val mat = (t._1.asInstanceOf[BDM[Double]] * binv.value * t._2.asInstanceOf[BDM[Double]])
- .asInstanceOf[BDM[Double]]
- (mat)
- }) //.partitionBy(partitioner)
+ .mapValues{case(blk1, blk2) =>
+ val mat = (blk1.asInstanceOf[BDM[Double]] * binv.value * blk2.asInstanceOf[BDM[Double]])
+ .asInstanceOf[BDM[Double]]
+ mat
+ } //.partitionBy(partitioner)
blkMat = matForth
.join(mult, partitioner).mapValues(t => t._1 - t._2) //.partitionBy(partitioner).cache()
.cache()
@@ -696,77 +688,75 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val FourthEmit = blkMat.mapPartitions({
iter =>
- iter.flatMap(t => {
+ iter.flatMap { case (blkId, blk) =>
val array = Array.ofDim[(BlockID, BDM[Double])](n)
for (j <- 0 until array.length) {
- //val seq = t._1.row * k * n + (i+j) * k + t._1.column
- val seq = t._1.row * numBlksByRow * numBlksByCol + (i + j) * numBlksByCol + t._1.column
- array(j) = (new BlockID(t._1.row, (i + j), seq), t._2)
+ val seq = blkId.row * numBlksByRow * numBlksByCol + (i + j) * numBlksByCol + blkId.column
+ array(j) = (BlockID(blkId.row, i + j, seq), blk)
}
array
- })
+ }
})
val ThirdEmit = thirdMat.mapPartitions({
iter =>
- iter.flatMap(t => {
+ iter.flatMap { case (blkId, blk) =>
val array = Array.ofDim[(BlockID, BDM[Double])](m)
for (j <- 0 until array.length) {
- //val seq = (i + 1 + j) * k * n + t._1.column * k + t._1.row
- val seq = (i + 1 + j) * numBlksByRow * numBlksByCol + t._1.column * numBlksByCol + t._1.row
- array(j) = (new BlockID((i + j + 1), t._1.column, seq), t._2)
+ val seq = (i + 1 + j) * numBlksByRow * numBlksByCol + blkId.column * numBlksByCol + blkId.row
+ array(j) = (BlockID((i + j + 1), blkId.column, seq), blk)
}
array
- })
+ }
})
val multThird = FourthEmit.join(ThirdEmit)
- .map(t => {
- val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]])
- .asInstanceOf[BDM[Double]]
- (new BlockID(t._1.row, t._1.column), mat)
- }).reduceByKey(_ + _) //.partitionBy(partitioner)
+ .map { case (blkId, (blk1, blk2)) =>
+ val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]])
+ .asInstanceOf[BDM[Double]]
+ (BlockID(blkId.row, blkId.column), mat)
+ }.reduceByKey(_ + _) //.partitionBy(partitioner)
multThird.count()
m = 1
n = numBlksByRow - i - 1
k = n
val SecondEmit = secondMat.mapPartitions(iter =>
- iter.flatMap(t => {
+ iter.flatMap { case (blkId, blk) =>
val array = Array.ofDim[(BlockID, BDM[Double])](n)
for (j <- 0 until array.length) {
- val seq = t._1.row * k * n + (i + j + 1) * k + t._1.column
- array(j) = (new BlockID(t._1.row, i + j + 1, seq), t._2)
+ val seq = blkId.row * k * n + (i + j + 1) * k + blkId.column
+ array(j) = (BlockID(blkId.row, i + j + 1, seq), blk)
}
array
- }))
+ })
val FourthEmit2 = blkMat.mapPartitions(iter =>
- iter.flatMap(t => {
+ iter.flatMap { case (blkId, blk) =>
val array = Array.ofDim[(BlockID, BDM[Double])](m)
for (j <- 0 until array.length) {
- val seq = (i + j) * k * n + t._1.column * k + t._1.row
- array(j) = (new BlockID(i + j, t._1.column, seq), t._2)
+ val seq = (i + j) * k * n + blkId.column * k + blkId.row
+ array(j) = (BlockID(i + j, blkId.column, seq), blk)
}
array
- }))
+ })
val multSecond = SecondEmit.join(FourthEmit2)
- .map(t => {
- val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]])
- .asInstanceOf[BDM[Double]]
- (new BlockID(t._1.row, t._1.column), mat)
- }).reduceByKey(_ + _) //.partitionBy(partitioner)
+ .map { case (blkId, (blk1, blk2)) =>
+ val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]])
+ .asInstanceOf[BDM[Double]]
+ (BlockID(blkId.row, blkId.column), mat)
+ }.reduceByKey(_ + _) //.partitionBy(partitioner)
val multFirst = scatterRdds(i)(1)
- .map(t => (new BlockID(t._1.column, t._1.row), t._2))
+ .map { case (blkId, blk) => (BlockID(blkId.column, blkId.row), blk) }
.join(multThird)
.mapPartitions(iter =>
- iter.map(t => {
- val mat = (t._2._1.asInstanceOf[BDM[Double]] * t._2._2.asInstanceOf[BDM[Double]])
- .asInstanceOf[BDM[Double]]
- (new BlockID(i, i), mat)
- }))
+ iter.map { case (blkId, (blk1, blk2)) =>
+ val mat = (blk1.asInstanceOf[BDM[Double]] * blk2.asInstanceOf[BDM[Double]])
+ .asInstanceOf[BDM[Double]]
+ (BlockID(i, i), mat)
+ })
.reduceByKey(_ + _)
.join(scatterRdds(i)(0))
.mapValues(t => t._1 + t._2)
@@ -776,7 +766,6 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
.partitionBy(partitioner).cache()
}
new BlockMatrix(blkMat, numRows(), numCols(), numBlksByRow, numBlksByCol)
- }
}
luResult
}
@@ -788,18 +777,18 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
*/
final def add(other: DistributedMatrix): DenseVecMatrix = {
other match {
- case that: DenseVecMatrix =>
+ case that: DenseVecMatrix =>
require(numRows() == that.numRows(), s"Dimension mismatch: ${numRows()} vs ${that.numRows()}")
require(numCols() == that.numCols, s"Dimension mismatch: ${numCols()} vs ${that.numCols()}")
val result = rows.join(that.rows).mapPartitions(iter => {
- iter.map(t =>
- (t._1, ((t._2._1 + t._2._2).asInstanceOf[BDV[Double]])))
+ iter.map{case(id, (v1, v2)) =>
+ (id, ((v1 + v2).asInstanceOf[BDV[Double]]))}
}, true)
new DenseVecMatrix(result, numRows(), numCols())
- case that: BlockMatrix =>
+ case that: BlockMatrix =>
add(that.toDenseVecMatrix())
- case that: DistributedMatrix =>
+ case that: DistributedMatrix =>
throw new IllegalArgumentException("Do not support this type " + that.getClass + "for add operation")
}
@@ -924,7 +913,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
require(numRows() == other.numRows(), s"row dimension mismatch ${numRows()} vs ${other.numRows()}")
require(numCols() == other.numCols(), s"column dimension mismatch ${numCols()} vs ${other.numCols()}")
other match {
- case that: DenseVecMatrix =>
+ case that: DenseVecMatrix =>
val result = rows.join(that.rows).mapPartitions(iter => {
iter.map(t => {
val array = t._2._1.toArray.zip(t._2._2.toArray).map(x => x._1 * x._2)
@@ -932,7 +921,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
})
}, true)
new DenseVecMatrix(result, numRows(), numCols())
- case that: BlockMatrix =>
+ case that: BlockMatrix =>
dotProduct(that.toDenseVecMatrix())
}
}
@@ -1298,7 +1287,7 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
val (index, vec) = iterator.next()
mat(index.toInt - blockRow * mBlockRowSize, ::) := vec.t
}
- (new BlockID(blockRow, 0), mat)
+ (BlockID(blockRow, 0), mat)
}
})
new BlockMatrix(result, numRows(), numCols(), blksByRow, blksByCol)
@@ -1322,25 +1311,25 @@ def luDecompose(mode: String = "auto"): (BlockMatrix, Array[Int]) = {
}
}).groupByKey()
.mapPartitions(iter =>
- iter.map { case (blkId, iterable) =>
- val colBase = blkId.column * mBlockColSize
- val rowBase = blkId.row * mBlockRowSize
- var smRows = mBlockRowSize
- if ((rowBase + mBlockRowSize - 1) >= mRows) {
- smRows = mRows - rowBase
- }
- var smCols = mBlockColSize
- if ((colBase + mBlockColSize - 1) >= mColumns) {
- smCols = mColumns - colBase
- }
- val mat = BDM.zeros[Double](smRows, smCols)
- val iterator = iterable.iterator
- while (iterator.hasNext) {
- val (index, vector) = iterator.next()
- mat((index - rowBase).toInt, ::) := vector.t
- }
- (blkId, mat)
- }, true)
+ iter.map { case (blkId, iterable) =>
+ val colBase = blkId.column * mBlockColSize
+ val rowBase = blkId.row * mBlockRowSize
+ var smRows = mBlockRowSize
+ if ((rowBase + mBlockRowSize - 1) >= mRows) {
+ smRows = mRows - rowBase
+ }
+ var smCols = mBlockColSize
+ if ((colBase + mBlockColSize - 1) >= mColumns) {
+ smCols = mColumns - colBase
+ }
+ val mat = BDM.zeros[Double](smRows, smCols)
+ val iterator = iterable.iterator
+ while (iterator.hasNext) {
+ val (index, vector) = iterator.next()
+ mat((index - rowBase).toInt, ::) := vector.t
+ }
+ (blkId, mat)
+ }, true)
new BlockMatrix(result, numRows(), numCols(), blksByRow, blksByCol)
}
}
diff --git a/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala b/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala
index d8740c4..b77d17b 100644
--- a/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala
+++ b/src/test/scala/edu/nju/pasalab/marlin/matrix/MatrixSuite.scala
@@ -147,7 +147,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext {
test("generate random sparse matrix"){
val s = MTUtils.randomSpaVecMatrix(sc, 10, 8, 0.3)
-// s.rows.collect().foreach{ case(a, b) => println(b.data.mkString(","))}
assert(s.nRows == 10L)
}
@@ -196,8 +195,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext {
assert(mat.multiply(2).toBreeze() === addSelf)
assert(mat.divide(2).toBreeze() === divide2)
val ma = new BlockMatrix(blocks)
- // val denVecMat = new DenseVecMatrix(rows)
-
assert(ma.add(1).toBreeze() === eleAdd1)
assert(ma.add(ma).toBreeze() === addSelf)
assert(ma.add(mat).toBreeze() === addSelf)
@@ -226,16 +223,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext {
assert(mat.getSubMatrix(1, 2, 1, 2).toBreeze() === sub1212)
}
-// test("DenseVecMatrix multiply a DenseVecMatrix in block-approach") {
-// val mat = new DenseVecMatrix(indexRows)
-// val result = mat.multiplyHama(mat, 2)
-// val blkSeq = result.blocks.collect().toSeq
-// assert(blkSeq.contains(new BlockID(0, 0), BDM((11.0, 10.0), (23.0, 24.0))))
-// assert(blkSeq.contains(new BlockID(0, 1), BDM((9.0, 8.0), (25.0, 26.0))))
-// assert(blkSeq.contains(new BlockID(1, 0), BDM((7.0, 11.0), (6.0, 7.0))))
-// assert(blkSeq.contains(new BlockID(1, 1), BDM((15.0, 19.0), (8.0, 9.0))))
-// }
-
test("DenseVecMatrix multiply a DenseVecMatrix, and select broadcast-approach") {
val mat = new DenseVecMatrix(indexRows)
val result = mat.multiply(mat, 2)
@@ -436,10 +423,6 @@ class MatrixSuite extends FunSuite with LocalSparkContext {
val blk1 = mat.toBlockMatrix(2, 2)
val blk2 = mat.toBlockMatrix(1, 4)
val m = blk1.toBlockMatrix(2, 1)
- println(s"this toBreeze: ${m.toBreeze()}")
- println(s"other toBreeze: ${blk2.toBreeze()}")
- println(s"block matrix info: numRows: ${m.numRows()}, numCols: ${m.numCols()}, " +
- s"blksByRow: ${m.numBlksByRow()}, blksByCol: ${m.numBlksByCol()}")
val result = m.multiply(blk2)
val expected = BDM(
(11.0, 10.0, 9.0, 8.0),
@@ -462,9 +445,7 @@ class MatrixSuite extends FunSuite with LocalSparkContext {
(7.0, 11.0, 15.0, 19.0),
(6.0, 7.0, 8.0, 9.0))
val result = blkMat.multiply(local)
-// result.print()
assert(result.toBreeze() === expected)
-
}
}