You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Mappers in RSS send shuffle data for any given partition to a single RSS servers, so that reducers can read the shuffle data from a single location. To incorporate this, a hashmap of (partition Id -> shuffle data for that partition) is maintained in mappers. It works as follows:
Maintain a hashmap of the partition id to a buffer.
After a new record is received, get the shuffle partition it must be sent to using the shuffle dependency.
If the hashmap already has the entry corresponding to this partition ID, data is serialized and then appended to the buffer. Buffer is created first if not already present.
If after the insertion, the size of the buffer is greater than 32 KB (configurable), send the contents of that partition ID to the appropriate RSS server.
If the size of the entire hashmap becomes greater than a configured value, entire data in the hashmap is sent to the RSS servers
This approach leads to perf degradation when the number of records and/or amount of shuffle data is very high mainly because:
Cost of hashmap lookups -
Even though the hashmap size is not large (hashmap per mapper will have upto number of shuffle partitions keys), but one hashmap look up per record adds to a significant overhead. It is very common for a spark application to read billion plus records, which is why hashmap look-ups become an overhead even though each call takes a constant amount of time.
High serialization cost -
Individual buffers per key are allowed to grow only till 32KB, this leads to a very high number of spills per mapper. For a 1TB shuffle, we have seen cases of it being as high as 10e9.
Within a single mapper task, one buffer and one serializer stream to write to this buffer is created per hashmap key. Because data to one partition can be sent over multiple spills, more than one buffer and its accompanying serializer stream might be created per partition. In the worst case, if a record size is large, there could be as many as the number of record allocation happening within each map task.
Such large numbers of small java object allocations and buffer extension causes memory pressure and there by GC, viz, frequent and larger pauses which also adds to the latencies.
Excessive internal copying of data -
Current implementation, copies over the data from the hashmap to a new buffer before spilling. This not only adds to the latency but also creates memory pressure, which in itself can degrade the application performance further.
Proposal
Use unsafe memory management for RSS in lines with Spark's unsafe memory management.
Stores the map output data in serialized form
Buffers the data in memory as much as possible. Chunks the data before sending it to RSS servers.
Avoids any extra copying of the data before spill
Uses Java’s unsafe apis to acquire large chunks of memory for storing the serialized record
After receiving a record, it is serialized and stored at an offset in the already acquired page in memory.
A tuple of (PartitionId, Memory location where record is stored) is maintained in an Array
Array is sorted by the partition before spilling. Once sorted, data is read from the accompanying pointer location
For implementation, components from the Spark's implementation of unsafe shuffle writer can be used.
Based on the POC, here are the performance numbers
The text was updated successfully, but these errors were encountered:
Mappers in RSS send shuffle data for any given partition to a single RSS servers, so that reducers can read the shuffle data from a single location. To incorporate this, a hashmap of (partition Id -> shuffle data for that partition) is maintained in mappers. It works as follows:
This approach leads to perf degradation when the number of records and/or amount of shuffle data is very high mainly because:
Cost of hashmap lookups -
Even though the hashmap size is not large (hashmap per mapper will have upto number of shuffle partitions keys), but one hashmap look up per record adds to a significant overhead. It is very common for a spark application to read billion plus records, which is why hashmap look-ups become an overhead even though each call takes a constant amount of time.
High serialization cost -
Individual buffers per key are allowed to grow only till 32KB, this leads to a very high number of spills per mapper. For a 1TB shuffle, we have seen cases of it being as high as 10e9.
Within a single mapper task, one buffer and one serializer stream to write to this buffer is created per hashmap key. Because data to one partition can be sent over multiple spills, more than one buffer and its accompanying serializer stream might be created per partition. In the worst case, if a record size is large, there could be as many as the number of record allocation happening within each map task.
Such large numbers of small java object allocations and buffer extension causes memory pressure and there by GC, viz, frequent and larger pauses which also adds to the latencies.
Excessive internal copying of data -
Current implementation, copies over the data from the hashmap to a new buffer before spilling. This not only adds to the latency but also creates memory pressure, which in itself can degrade the application performance further.
Proposal
Use unsafe memory management for RSS in lines with Spark's unsafe memory management.
For implementation, components from the Spark's implementation of unsafe shuffle writer can be used.
Based on the POC, here are the performance numbers
The text was updated successfully, but these errors were encountered: