-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Unsafe shuffle writer support in RSS #53
base: master
Are you sure you want to change the base?
Conversation
(cherry picked from commit a876e216c65f9381f6f2e551d701db978db995ee)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mayurdb.
This looks like a great improvement!
I understand that this is a WIP PR, but, please, let me leave a few comments from my side.
Thanks!
.createWithDefault(true) | ||
val unsafeShuffleWriterBufferSize: ConfigEntry[Long] = | ||
ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize") | ||
.doc("Use unsafe shuffle writer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the description should be updated, as it is the same as for the previous config entry.
ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize") | ||
.doc("Use unsafe shuffle writer") | ||
.longConf | ||
.createWithDefault(5*1024l*1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark uses bytesConf
for memory sizes, so users could specify values as bytes (b), kibibytes (k), mebibytes (m), etc. E.g. 50b, 100k, or 250m. It's also possible to omit a suffix. Should we also use bytesConf
, as it's done for example for spark.memory.offHeap.size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its a good suggestion , we should consider this
var totalBufferedSize: Long = 0 | ||
var totalSendDataBlockTime: Long = 0 | ||
|
||
private val writerBufferSize = conf.get(RssOpts.unsafeShuffleWriterBufferSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like writerBufferSize
is not used.
private val serializerInstance = serializer.newInstance() | ||
|
||
def addRecord(partitionId: Int, record: Product2[K, V]): Seq[(Int, Array[Byte])] = { | ||
var totalLookUpTime = 0L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like totalLookUpTime
is only being updated, but never read.
|
||
private var recordsWrittenCount: Int = 0 | ||
|
||
private var totalSerializationTime: Long = 0l | ||
|
||
private var totalMemoryFethcWaitTime: Long = 0l |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: totalMemoryFethcWaitTime
=> totalMemoryFetchWaitTime
|
||
private final int initialSize; | ||
|
||
public RssShuffleInMemorySorter(MemoryConsumer consumer, int initialSize, boolean useRadixSort, boolean a) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest argument boolean a
is not used.
@mayurdb, I've deleted Tested locally in the
The |
Oh that's great. These classes are just package private, so ideally they should just work for all the cases where the package structure is replicated. Also, the compile and runtime Spark jars will be different in most of the cases while using RSS. I actually haven't looked into the issue as I just wanted to try this out and get the performance number first. I will check the details of the IllegalAccessError and get back |
assert(sorter != null); | ||
final K key = record._1(); | ||
final int partitionId = partitioner.getPartition(key); | ||
serBuffer.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember some serializer like JavaSerializer may use object reference in serialized stream, each serialized object inside the serialized stream will not be independent. Thus getting the bytes from MyByteArrayOutputStream each time after adding key/value may not be always safe.
KyroSerializer does not use object reference (if I remember correctly), and will be safe here.
Maybe add some check here to make sure the serializer is KyroSerializer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always use kyroSerializer, can we just fail if we are not using Kyro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like needs to be updated
import org.apache.spark.unsafe.Platform; | ||
import org.apache.spark.unsafe.array.LongArray; | ||
|
||
public class RadixSort { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought you have removed this class and using the spark class, why this is still showing up here
import org.apache.spark.unsafe.array.LongArray; | ||
|
||
public class RadixSort { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just remove this class and use spark classes
import org.apache.spark.util.Utils; | ||
|
||
|
||
final class RssShuffleExternalSorter extends MemoryConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please add some comments here , why this class is being added and what is the functionality for that
|
||
private long sizeThreshold; | ||
|
||
long numRecords = 0l; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this is not private
private MemoryBlock currentPage = null; | ||
private long pageCursor = -1; | ||
|
||
long bytesWritten = 0l; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should be private as well
// Keep track of success so we know if we encountered an exception | ||
// We do this rather than a standard try/catch/re-throw to handle | ||
// generic throwables. | ||
// TODO: Fix recordsWritten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add what we need to fix or just fix it and remove this TODO
ConfigBuilder("spark.shuffle.rss.unsafe.writer.bufferSize") | ||
.doc("Use unsafe shuffle writer") | ||
.longConf | ||
.createWithDefault(5*1024l*1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its a good suggestion , we should consider this
|
||
private val enableMapSideAggregation = shuffleDependency.mapSideCombine && conf.get(RssOpts.enableMapSideAggregation) | ||
val enableMapSideAggregation = shuffleDependency.mapSideCombine && conf.get(RssOpts.enableMapSideAggregation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make them private
import org.apache.spark.memory.{MemoryManager, MemoryMode} | ||
import org.apache.spark.storage.BlockId | ||
|
||
class RssTestMemoryManager(conf: SparkConf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some context here for the class
|
||
private val writeClientCloseLock = new Object() | ||
|
||
var totalCompressionTime: Long = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these all variables should be private
@vladhlinsky did you run these commands from within the Intellij/Any other IDE or ran a spark-submit externally and passed the RSS jars? Also, if the command was ran externally, can you please confirm if RSS was used? These classes are package private in Spark. To be able to access them, we will need same package structure and also both the interface and implementation should be loaded by same class loader. Looks like I'm hitting the second issue. |
|
@mayurdb, I ran a
|
Hi @mayurdb, we have also been experiencing memory and map stage latency issues using Rss. We plan to test and work on this implementation as well. Wondering if you have any updates about this PR that you can share with us. Many thanks :) |
Key traits
Details
Implementation uses Java's unsafe APIs for acquiring large chunks of memory. Tuple of partition Id and memory location where a record is stored in memory is stored in an array. The advantage of doing this is that the data can be sorted by just sorting the metadata array on. Data before spilling is read into chunks of configurable size and sent over the network. Similar to the above two approaches, this approach also interfaces with the TMM to acquire more memory for storing records or for expanding the metadata array.
Open Source Spark has already implemented the unsafe shuffle writer and currently gets used for most executing of the shuffle writes. Components from Spark’s implementation around memory allocation, storing data in memory and metadata based sort were reused in the implementation. Logic around spilling triggers and around reading data from the memory had to be changed to be compatible and more importantly performant with RSS.
Performance Numbers
TODOs: