-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Can Rss have stage retry when one server is down? #93
Comments
You are right that the server should be down and was removed from ZooKeeper after losing heartbeat with ZooKeeper. Current RSS implementation assigns a static list of servers in the beginning of the Spark application, and does not change server list during application running time. This helps to simplify the logic to track which partition is on which server. The downside is if one RSS server is down in the middle, the application will fail. If you want to have redundancy, you could enable "spark.shuffle.rss.replicas" to have multiple replicas, so if one server is down for one shuffle partition, there will be still another server and the application will not fail. |
Thank you for the suggestions @hiboyang ! Does this mean the shuffle data written to the server will be doubled if I set 'spark.shuffle.rss.replicas' to 2? If so, this will work with some small jobs, but may require more disk for some really large jobs. Also, currently we want to use Rss on a cluster level, so using the default values of 1 to 50 wouldn't make sense for all the jobs. Do you have any advice on how to specifiy "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" for different spark jobs? Thanks! |
Hi, @hiboyang. If the 'spark.shuffle.rss.replicas' does write double size of data to server, we won't be able to use this to large jobs with 400+ TB shuffle data unfortunatly. So do you think we can just skip connecting to the unreachable server and pick up the next available one if this happens in the write stage. And if a server is down during the read stage, can Rss do what spark external shuffle service does to trigger a stage retry to revover the data? |
Values for "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" are case by case depending on Spark job (e.g. how many mappers/reducers, how much shuffle data, etc.). Normally you could start with default values (or not set them), and then changes the values and see any impact on your jobs. |
I see. This is the current limitation that RSS cannot handle server down during spark job running time, if not use 'spark.shuffle.rss.replicas'. Hope someone could contribute to this feature! |
Thanks for the replay! Will see what I can do to improve this. |
@hiboyang Hi! I attempted to contribute to adding stage retry, but there seems to be a difficulty due to the implementation of Rss. Wondering if I can have some insights from you. The issue is the amount of tasks to retry. When there is a server down, and we want to do stage retry to recover the files on this server,
So, due to the different map-reduce structure in Ess, I suppose the only way to have the fault tolerance without using replicas is to retry the entire stage instead of a few tasks? Please correct if I understand this wrong, and do you think it is possible to have a better way to retry and recover the partition files? Thanks! |
Hi @YutingWang98 , you are right! This is the limit in current RSS. Maybe we could brainstorm ideas sometime on how to improve this. |
Hi @YutingWang98 , you are right. Given the push nature of the RSS, all the tasks in the stage have to retried to deal with a server going down. This is how we have solved it internally in Uber:
We had to add a patch in Spark code to handle 2 and 3. Also there are cases of laggard tasks from the previous stage finishing after the re-attempt of the stage has started that needed to be handled. This has been working out quite well for us. @hiboyang @YutingWang98 I can put up a formal design doc as well where we can brainstorm more ideas. Let me know your thoughts |
Hi @mayurdb, thank you for the reply, and sharing your implementation! I have a question here: If the spark stages are cascading, then one stage may depend on the previous stage's shuffle output. In this case, the retry should be recursive, and almost like rerunning the whole job. So besides retrying tasks from the current stage (what you mentioned in point 5), do you need to retry all the previous stages as well? Looking forward to reading more details on your design doc, and we can sure discuss more about this! Thank you. |
@mayurdb, this is pretty cool, thanks for sharing how you did inside Uber to fix this issue! Is it possible to share the patch you did in Spark code? |
@hiboyang @YutingWang98 I have create a pull request with these changes. Can you take a look once. |
@mayurdb Thank you for sharing it, will take a look! |
Hi, I just found out my spark job got killed with this error:
I then checked the zookeeper and didn't see this {server_host_name} registered there. So I suspect that it was already removed from zk due to some internal issues with the node, but was picked up by Rss before this happend. When Rss tried to connect, it was no longer on zk, and caused 'NoNodeException'. It retried and failed for 4 times, so then killed the job.
If this was the reason, maybe Rss needs to allow the connection process to skip nodes that are no longer on zookeepr, and pick a current available one? Any thoughts would be appreciated, thanks!
The text was updated successfully, but these errors were encountered: