Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark 3] RSS performance with Adaptive Skew Join Optimization #101

Open
s0nskar opened this issue Jun 8, 2023 · 3 comments
Open

[Spark 3] RSS performance with Adaptive Skew Join Optimization #101

s0nskar opened this issue Jun 8, 2023 · 3 comments

Comments

@s0nskar
Copy link
Member

s0nskar commented Jun 8, 2023

Current support of RSS with AQE framework does not provide performant APIs for AQE skew join optimization. To explain more, when AQE detects a skew partition it tries to divide the partition into multiple subpartitions – it tries to batch of consecutive mapTaskOutput to achieve target size of a subpartition.

For ESS it works seamlessly as all the mapTaskOutput as stored in separate files, so reducers can fetch the data it requires. Let's say a reducer wants data for mapID - 5 to 7, it can fetch these individual blocks only (no extra data fetch).

But for RSS mapTaskOutput for all the mapper tasks are intertwined in one single partition file. Due to this RSS has to fetch full partition file every time and then filter out the data to get the required mapID. This adds to a lot of overhead to network and compute IO.

Ex: For a partition file size 50 GB and target size 1 GB, AQE will try to divide this partition into 50 subpartitions to divide the work into 50 executors. Now each executor will have to fetch the 50GB partition file and filter out 49GB of data to get the required 1GB data. Moreover, as 1 or 2 RSS servers are serving partition files to all of the executors, it'll affect the network and IO throughput of those servers. We've seen such applications choking the network out throughput for > 5hrs and making hosts unresponsive, this leads up to the failure of other applications as well.

We're proposing an approach to solve this issue by creating an index file for each partition while writing that partition on the shuffle server. And adding a new RSS server API to read partial partition data from partition files using index file. This approach will also require a custom Skew Join Optimizer operator to be plugged in that will divide the partition based on just data size instead of "Batch of consecutive mapTaskOutput".
More details on this doc - https://docs.google.com/document/d/1nctmnhSFpvv5V5coJfqjslhcCwEBDneNY10pG76psEE/edit?usp=sharing

We would love to hear from other users' suggestions on our approach and if they are facing these same issues, how are they tackling them?

@s0nskar
Copy link
Member Author

s0nskar commented Jun 16, 2023

cc: @hiboyang for viz

@jerqi
Copy link

jerqi commented Jun 25, 2023

@s0nskar The data distribution of reduce partition is important. Otherwise the reader will suffer from random IO according to our experience in the Uniffle (another remote shuffle service implement similar to Zeus) You can refer to apache/incubator-uniffle#293

@s0nskar
Copy link
Member Author

s0nskar commented Jul 3, 2023

Thanks @jerqi for replying, this sounds like the exact problem that we're facing. Although after reading the docs I'm a little confused about how exactly Uniffle is storing the data. I'll read more from the docs and get back.

@s0nskar s0nskar closed this as completed Jul 3, 2023
@s0nskar s0nskar reopened this Jul 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants