Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Unsafe memory management in RSS mappers #59

Open
mayurdb opened this issue Dec 7, 2021 · 0 comments
Open

[Proposal] Unsafe memory management in RSS mappers #59

mayurdb opened this issue Dec 7, 2021 · 0 comments

Comments

@mayurdb
Copy link
Collaborator

mayurdb commented Dec 7, 2021

Mappers in RSS send shuffle data for any given partition to a single RSS servers, so that reducers can read the shuffle data from a single location. To incorporate this, a hashmap of (partition Id -> shuffle data for that partition) is maintained in mappers. It works as follows:
image

  • Maintain a hashmap of the partition id to a buffer.
  • After a new record is received, get the shuffle partition it must be sent to using the shuffle dependency.
  • If the hashmap already has the entry corresponding to this partition ID, data is serialized and then appended to the buffer. Buffer is created first if not already present.
  • If after the insertion, the size of the buffer is greater than 32 KB (configurable), send the contents of that partition ID to the appropriate RSS server.
  • If the size of the entire hashmap becomes greater than a configured value, entire data in the hashmap is sent to the RSS servers

This approach leads to perf degradation when the number of records and/or amount of shuffle data is very high mainly because:

  • Cost of hashmap lookups -
    Even though the hashmap size is not large (hashmap per mapper will have upto number of shuffle partitions keys), but one hashmap look up per record adds to a significant overhead. It is very common for a spark application to read billion plus records, which is why hashmap look-ups become an overhead even though each call takes a constant amount of time.

  • High serialization cost -
    Individual buffers per key are allowed to grow only till 32KB, this leads to a very high number of spills per mapper. For a 1TB shuffle, we have seen cases of it being as high as 10e9.
    Within a single mapper task, one buffer and one serializer stream to write to this buffer is created per hashmap key. Because data to one partition can be sent over multiple spills, more than one buffer and its accompanying serializer stream might be created per partition. In the worst case, if a record size is large, there could be as many as the number of record allocation happening within each map task.
    Such large numbers of small java object allocations and buffer extension causes memory pressure and there by GC, viz, frequent and larger pauses which also adds to the latencies.

  • Excessive internal copying of data -
    Current implementation, copies over the data from the hashmap to a new buffer before spilling. This not only adds to the latency but also creates memory pressure, which in itself can degrade the application performance further.

image

Proposal
Use unsafe memory management for RSS in lines with Spark's unsafe memory management.
image

  • Stores the map output data in serialized form
  • Buffers the data in memory as much as possible. Chunks the data before sending it to RSS servers.
  • Avoids any extra copying of the data before spill
  • Uses Java’s unsafe apis to acquire large chunks of memory for storing the serialized record
  • After receiving a record, it is serialized and stored at an offset in the already acquired page in memory.
  • A tuple of (PartitionId, Memory location where record is stored) is maintained in an Array
  • Array is sorted by the partition before spilling. Once sorted, data is read from the accompanying pointer location

For implementation, components from the Spark's implementation of unsafe shuffle writer can be used.
Based on the POC, here are the performance numbers
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant