From 88220376eb7b464adb0b0af1b7e2537ba1523d2a Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sun, 20 Apr 2014 04:58:33 +0000 Subject: [PATCH] Revert "In memory shuffle (cherry-picked from amplab/graphx#135)" This reverts commit 5b65b1fb8b3d385eeab44f360bd5402b8de02d45. Conflicts: graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala --- .../spark/storage/BlockObjectWriter.scala | 24 +++++++++++-------- .../apache/spark/storage/MemoryStore.scala | 9 ------- .../spark/storage/ShuffleBlockManager.scala | 13 +--------- .../org/apache/spark/graphx/Pregel.scala | 5 ---- 4 files changed, 15 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 4d7e1852f..696b930a2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{ByteArrayOutputStream, FileOutputStream, File, OutputStream} +import java.io.{FileOutputStream, File, OutputStream} import java.nio.channels.FileChannel import it.unimi.dsi.fastutil.io.FastBufferedOutputStream @@ -44,7 +44,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { * Flush the partial writes and commit them as a single atomic block. Return the * number of bytes written for this commit. */ - def commit(): Array[Byte] + def commit(): Long /** * Reverts writes that haven't been flushed yet. Callers should invoke this function @@ -106,7 +106,7 @@ private[spark] class DiskBlockObjectWriter( /** The file channel, used for repositioning / truncating the file. */ private var channel: FileChannel = null private var bs: OutputStream = null - private var fos: ByteArrayOutputStream = null + private var fos: FileOutputStream = null private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private val initialPosition = file.length() @@ -115,8 +115,9 @@ private[spark] class DiskBlockObjectWriter( private var _timeWriting = 0L override def open(): BlockObjectWriter = { - fos = new ByteArrayOutputStream() + fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) + channel = fos.getChannel() lastValidPosition = initialPosition bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) @@ -129,6 +130,9 @@ private[spark] class DiskBlockObjectWriter( if (syncWrites) { // Force outstanding writes to disk and track how long it takes objOut.flush() + val start = System.nanoTime() + fos.getFD.sync() + _timeWriting += System.nanoTime() - start } objOut.close() @@ -145,18 +149,18 @@ private[spark] class DiskBlockObjectWriter( override def isOpen: Boolean = objOut != null - override def commit(): Array[Byte] = { + override def commit(): Long = { if (initialized) { // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the // serializer stream and the lower level stream. objOut.flush() bs.flush() val prevPos = lastValidPosition - lastValidPosition = fos.size() - fos.toByteArray + lastValidPosition = channel.position() + lastValidPosition - prevPos } else { // lastValidPosition is zero if stream is uninitialized - null + lastValidPosition } } @@ -166,7 +170,7 @@ private[spark] class DiskBlockObjectWriter( // truncate the file to the last valid position. objOut.flush() bs.flush() - throw new UnsupportedOperationException("Revert temporarily broken due to in memory shuffle code changes.") + channel.truncate(lastValidPosition) } } @@ -178,7 +182,7 @@ private[spark] class DiskBlockObjectWriter( } override def fileSegment(): FileSegment = { - new FileSegment(null, initialPosition, bytesWritten) + new FileSegment(file, initialPosition, bytesWritten) } // Only valid if called after close() diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7afae0310..488f1ea96 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -23,7 +23,6 @@ import java.util.LinkedHashMap import scala.collection.mutable.ArrayBuffer import org.apache.spark.util.{SizeEstimator, Utils} -import org.apache.spark.serializer.Serializer /** * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as @@ -120,14 +119,6 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } - /** - * A version of getValues that allows a custom serializer. This is used as part of the - * shuffle short-circuit code. - */ - def getValues(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = { - getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer)) - } - override def remove(blockId: BlockId): Boolean = { entries.synchronized { val entry = entries.remove(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index f90e9c481..4cd4cdbd9 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -207,17 +207,6 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging { private def cleanup(cleanupTime: Long) { shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) } - - def removeAllShuffleStuff() { - for (state <- shuffleStates.values; - group <- state.allFileGroups; - (mapId, _) <- group.mapIdToIndex.iterator; - reducerId <- 0 until group.files.length) { - val blockId = new ShuffleBlockId(group.shuffleId, mapId, reducerId) - blockManager.removeBlock(blockId, tellMaster = false) - } - shuffleStates.clear() - } } private[spark] @@ -231,7 +220,7 @@ object ShuffleBlockManager { * Stores the absolute index of each mapId in the files of this group. For instance, * if mapId 5 is the first block in each file, mapIdToIndex(5) = 0. */ - val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() + private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]() /** * Stores consecutive offsets of blocks into each reducer file, ordered by position in the file. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index 7fa4967d6..be8e8c0ca 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -154,11 +154,6 @@ object Pregel extends Logging { logInfo("Pregel finished iteration " + i) - // Very ugly code to clear the in-memory shuffle data - messages.foreachPartition { iter => - SparkEnv.get.blockManager.shuffleBlockManager.removeAllShuffleStuff() - } - // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking=false) newVerts.unpersist(blocking=false)