Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can Rss have stage retry when one server is down? #93

Open
YutingWang98 opened this issue Jan 11, 2023 · 13 comments
Open

Can Rss have stage retry when one server is down? #93

YutingWang98 opened this issue Jan 11, 2023 · 13 comments

Comments

@YutingWang98
Copy link

YutingWang98 commented Jan 11, 2023

Hi, I just found out my spark job got killed with this error:

Caused by: com.uber.rss.exceptions.RssException: Failed to get node data for zookeeper node: /spark_rss/{cluster}/default/nodes/{server_host_name}
    at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:231)
    ...
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /spark_rss/{cluster}/default/nodes/{server_host_name}
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
    ...
    at com.uber.rss.metadata.ZooKeeperServiceRegistry.getServerInfo(ZooKeeperServiceRegistry.java:228)

I then checked the zookeeper and didn't see this {server_host_name} registered there. So I suspect that it was already removed from zk due to some internal issues with the node, but was picked up by Rss before this happend. When Rss tried to connect, it was no longer on zk, and caused 'NoNodeException'. It retried and failed for 4 times, so then killed the job.

If this was the reason, maybe Rss needs to allow the connection process to skip nodes that are no longer on zookeepr, and pick a current available one? Any thoughts would be appreciated, thanks!

@hiboyang
Copy link
Contributor

You are right that the server should be down and was removed from ZooKeeper after losing heartbeat with ZooKeeper.

Current RSS implementation assigns a static list of servers in the beginning of the Spark application, and does not change server list during application running time. This helps to simplify the logic to track which partition is on which server. The downside is if one RSS server is down in the middle, the application will fail.

If you want to have redundancy, you could enable "spark.shuffle.rss.replicas" to have multiple replicas, so if one server is down for one shuffle partition, there will be still another server and the application will not fail.

@YutingWang98
Copy link
Author

Thank you for the suggestions @hiboyang ! Does this mean the shuffle data written to the server will be doubled if I set 'spark.shuffle.rss.replicas' to 2? If so, this will work with some small jobs, but may require more disk for some really large jobs.

Also, currently we want to use Rss on a cluster level, so using the default values of 1 to 50 wouldn't make sense for all the jobs. Do you have any advice on how to specifiy "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" for different spark jobs? Thanks!

@YutingWang98
Copy link
Author

Hi, @hiboyang. If the 'spark.shuffle.rss.replicas' does write double size of data to server, we won't be able to use this to large jobs with 400+ TB shuffle data unfortunatly.

So do you think we can just skip connecting to the unreachable server and pick up the next available one if this happens in the write stage. And if a server is down during the read stage, can Rss do what spark external shuffle service does to trigger a stage retry to revover the data?

@hiboyang
Copy link
Contributor

"spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount"

Thank you for the suggestions @hiboyang ! Does this mean the shuffle data written to the server will be doubled if I set 'spark.shuffle.rss.replicas' to 2? If so, this will work with some small jobs, but may require more disk for some really large jobs.

Also, currently we want to use Rss on a cluster level, so using the default values of 1 to 50 wouldn't make sense for all the jobs. Do you have any advice on how to specifiy "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" for different spark jobs? Thanks!

Values for "spark.shuffle.rss.maxServerCount" and "spark.shuffle.rss.minServerCount" are case by case depending on Spark job (e.g. how many mappers/reducers, how much shuffle data, etc.). Normally you could start with default values (or not set them), and then changes the values and see any impact on your jobs.

@hiboyang
Copy link
Contributor

Hi, @hiboyang. If the 'spark.shuffle.rss.replicas' does write double size of data to server, we won't be able to use this to large jobs with 400+ TB shuffle data unfortunatly.

So do you think we can just skip connecting to the unreachable server and pick up the next available one if this happens in the write stage. And if a server is down during the read stage, can Rss do what spark external shuffle service does to trigger a stage retry to revover the data?

I see. This is the current limitation that RSS cannot handle server down during spark job running time, if not use 'spark.shuffle.rss.replicas'. Hope someone could contribute to this feature!

@YutingWang98
Copy link
Author

Thanks for the replay! Will see what I can do to improve this.

@YutingWang98 YutingWang98 changed the title Rss tried to connect to a node that had been removed from zookeeper Can Rss have stage retry when one server is down? Jan 25, 2023
@YutingWang98
Copy link
Author

YutingWang98 commented Jan 25, 2023

@hiboyang Hi! I attempted to contribute to adding stage retry, but there seems to be a difficulty due to the implementation of Rss. Wondering if I can have some insights from you.

The issue is the amount of tasks to retry. When there is a server down, and we want to do stage retry to recover the files on this server,

  • Spark Ess: only retry a few tasks, since one mapper only writes data to one server (one servers holds files from certain tasks).
  • Rss: needs to retry all the tasks of this stage, since one mapper writes its data to all the servers evenly. So, the down server stores partition files wrote by all the finished tasks.

So, due to the different map-reduce structure in Ess, I suppose the only way to have the fault tolerance without using replicas is to retry the entire stage instead of a few tasks? Please correct if I understand this wrong, and do you think it is possible to have a better way to retry and recover the partition files? Thanks!

@hiboyang
Copy link
Contributor

Hi @YutingWang98 , you are right! This is the limit in current RSS. Maybe we could brainstorm ideas sometime on how to improve this.

@mayurdb
Copy link
Collaborator

mayurdb commented Feb 2, 2023

Hi @YutingWang98 , you are right. Given the push nature of the RSS, all the tasks in the stage have to retried to deal with a server going down. This is how we have solved it internally in Uber:

  1. Throw a fetch failed exception from the RSS client (from both map/reduce tasks) when connectivity to RSS server is lost
  2. Upon receiving a fetch fail from RSS clients, clear all map outputs of the tasks completed so far
  3. Re-trigger the shuffle planning on server: call the ShuffleManager.registerShuffle()
  4. Pick a new set of available RSS server to execute the particular shuffle
  5. Retry all the tasks from the stage

We had to add a patch in Spark code to handle 2 and 3. Also there are cases of laggard tasks from the previous stage finishing after the re-attempt of the stage has started that needed to be handled.

This has been working out quite well for us. @hiboyang @YutingWang98 I can put up a formal design doc as well where we can brainstorm more ideas. Let me know your thoughts

@YutingWang98
Copy link
Author

YutingWang98 commented Feb 2, 2023

Hi @mayurdb, thank you for the reply, and sharing your implementation! I have a question here:

If the spark stages are cascading, then one stage may depend on the previous stage's shuffle output. In this case, the retry should be recursive, and almost like rerunning the whole job. So besides retrying tasks from the current stage (what you mentioned in point 5), do you need to retry all the previous stages as well?

Looking forward to reading more details on your design doc, and we can sure discuss more about this! Thank you.

@hiboyang
Copy link
Contributor

hiboyang commented Feb 2, 2023

@mayurdb, this is pretty cool, thanks for sharing how you did inside Uber to fix this issue! Is it possible to share the patch you did in Spark code?

@mayurdb
Copy link
Collaborator

mayurdb commented Mar 20, 2023

@hiboyang @YutingWang98 I have create a pull request with these changes. Can you take a look once.
https://github.com/uber/RemoteShuffleService/pull/97/files

@YutingWang98
Copy link
Author

@mayurdb Thank you for sharing it, will take a look!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants