You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Current support of RSS with AQE framework does not provide performant APIs for AQE skew join optimization. To explain more, when AQE detects a skew partition it tries to divide the partition into multiple subpartitions – it tries to batch of consecutive mapTaskOutput to achieve target size of a subpartition.
For ESS it works seamlessly as all the mapTaskOutput as stored in separate files, so reducers can fetch the data it requires. Let's say a reducer wants data for mapID - 5 to 7, it can fetch these individual blocks only (no extra data fetch).
But for RSS mapTaskOutput for all the mapper tasks are intertwined in one single partition file. Due to this RSS has to fetch full partition file every time and then filter out the data to get the required mapID. This adds to a lot of overhead to network and compute IO.
Ex: For a partition file size 50 GB and target size 1 GB, AQE will try to divide this partition into 50 subpartitions to divide the work into 50 executors. Now each executor will have to fetch the 50GB partition file and filter out 49GB of data to get the required 1GB data. Moreover, as 1 or 2 RSS servers are serving partition files to all of the executors, it'll affect the network and IO throughput of those servers. We've seen such applications choking the network out throughput for > 5hrs and making hosts unresponsive, this leads up to the failure of other applications as well.
We're proposing an approach to solve this issue by creating an index file for each partition while writing that partition on the shuffle server. And adding a new RSS server API to read partial partition data from partition files using index file. This approach will also require a custom Skew Join Optimizer operator to be plugged in that will divide the partition based on just data size instead of "Batch of consecutive mapTaskOutput".
More details on this doc - https://docs.google.com/document/d/1nctmnhSFpvv5V5coJfqjslhcCwEBDneNY10pG76psEE/edit?usp=sharing
We would love to hear from other users' suggestions on our approach and if they are facing these same issues, how are they tackling them?
The text was updated successfully, but these errors were encountered:
@s0nskar The data distribution of reduce partition is important. Otherwise the reader will suffer from random IO according to our experience in the Uniffle (another remote shuffle service implement similar to Zeus) You can refer to apache/incubator-uniffle#293
Thanks @jerqi for replying, this sounds like the exact problem that we're facing. Although after reading the docs I'm a little confused about how exactly Uniffle is storing the data. I'll read more from the docs and get back.
Current support of RSS with AQE framework does not provide performant APIs for AQE skew join optimization. To explain more, when AQE detects a skew partition it tries to divide the partition into multiple subpartitions – it tries to batch of consecutive mapTaskOutput to achieve target size of a subpartition.
For ESS it works seamlessly as all the mapTaskOutput as stored in separate files, so reducers can fetch the data it requires. Let's say a reducer wants data for mapID - 5 to 7, it can fetch these individual blocks only (no extra data fetch).
But for RSS mapTaskOutput for all the mapper tasks are intertwined in one single partition file. Due to this RSS has to fetch full partition file every time and then filter out the data to get the required mapID. This adds to a lot of overhead to network and compute IO.
Ex: For a partition file size 50 GB and target size 1 GB, AQE will try to divide this partition into 50 subpartitions to divide the work into 50 executors. Now each executor will have to fetch the 50GB partition file and filter out 49GB of data to get the required 1GB data. Moreover, as 1 or 2 RSS servers are serving partition files to all of the executors, it'll affect the network and IO throughput of those servers. We've seen such applications choking the network out throughput for > 5hrs and making hosts unresponsive, this leads up to the failure of other applications as well.
We're proposing an approach to solve this issue by creating an index file for each partition while writing that partition on the shuffle server. And adding a new RSS server API to read partial partition data from partition files using index file. This approach will also require a custom Skew Join Optimizer operator to be plugged in that will divide the partition based on just data size instead of "Batch of consecutive mapTaskOutput".
More details on this doc - https://docs.google.com/document/d/1nctmnhSFpvv5V5coJfqjslhcCwEBDneNY10pG76psEE/edit?usp=sharing
We would love to hear from other users' suggestions on our approach and if they are facing these same issues, how are they tackling them?
The text was updated successfully, but these errors were encountered: