Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask Scheduler host/port Not Written to Skein Key-Value Storage When YARN Application Restarts #135

Open
gallamine opened this issue Feb 5, 2021 · 5 comments

Comments

@gallamine
Copy link

We are running a Dask cluster using Dask-Yarn on AWS EMR. I am trying to make it more robust so we're running the Scheduler in local mode. To make the system more robust we allow multiple attempts of the YARN Application using skein.ApplicationSpec(..., max_attempts=2).

In the event that a node fails, and that node is running the ApplicationMaster, then the YARN application will make a second attempt - it will restart itself and all the workers. However, the Scheduler will not see these new workers. It will sit there happily waiting on workers and blocking our application flow, since there's pending work to be done, but no workers to do it on.

The problem is that when the YARN Application retries after the ApplicationMaster has failed, the scheduler's address and port number are not written to Key/Value store and the new workers don't know where to connect. This line (https://github.com/dask/dask-yarn/blob/master/dask_yarn/core.py#L558) appears to not re-run on the 2nd application attempt.

To verify this, on the 1st YARN application attempt you can navigate to http://appmaster:20888/proxy/attempt_num/kv and see the dask.scheduler and dask.dashboard urls listed in the KV storage. Then if you shutdown the node running the ApplicationMaster the application will start a 2nd attempt. Navigate to http://new_appmaster:20888/proxy/second_attempt_num/kv and you'll see that nothing is listed in the KV storage, and no workers in the Dask dashboard.

Question(s):

  1. Is there a workaround to this problem so that my YARN Application can be robust to failures and restart?
  2. Is there a way for my Scheduler to recognize that it has no workers and eventually timeout?
  3. Is there a way to use the distributed.as_completed() method with Futures that somehow timeout in the event that the Scheduler loses all its workers and they never come back?

Environment:

  • Dask version: 2.24.0, Dask-yarn: 0.8.1
  • Python version: 3.6
  • Operating System: AWS Linux
@jcrist
Copy link
Member

jcrist commented Feb 5, 2021

Are you using deploy_mode="local" for this? I believe it should do the proper thing if you have deploy_mode="remote", as the scheduler will be restarted as well. This is the default deploy mode, but if you're using the example EMR bootstrap script it will be auto configured to be local instead (as most users of EMR want the interactive behavior). See https://github.com/dask/dask-yarn/blob/master/deployment_resources/aws-emr/bootstrap-dask#L157.

Is there a way for my Scheduler to recognize that it has no workers and eventually timeout?

Not really. There's a way to make a job shutdown if it has no pending work (by setting an idle timeout), but not if it has no workers. Having no workers is usually a temporary state (the cluster will scale back up), so this is a bit of an uncommon ask.

Is there a way to use the distributed.as_completed() method with Futures that somehow timeout in the event that the Scheduler loses all its workers and they never come back?

Same as the above, no, not really. You could monitor this yourself through periodically checking Client.scheduler_info()['workers'] and handling this case yourself.

I suggest trying with a remote scheduler (unless that's already what you're doing) and seeing if that fixes things.

@gallamine
Copy link
Author

gallamine commented Feb 5, 2021

@jcrist we originally started with the remote scheduler but found this doubled our failure footprint as if either the node running the Scheduler or the Application Master failed, the cluster would fail. That means that 2 nodes (AppMaster and Scheduler) presented failure points for our workflow instead of 1 (AppMaster).

I tried running the Scheduler inside the ApplicationMaster container but dask-yarn didn't recognize this configuration and then started a local Scheduler alongside the Scheduler running the Application.

Is there a way to force the ApplicationMaster and Scheduler to run on the same physical node? (when running the Scheduler in remote mode?)

Edit to address your question: I'm leaving the Scheduler out of the list of services so it's created by default in local mode.

@jcrist
Copy link
Member

jcrist commented Feb 5, 2021

I tried running the Scheduler inside the ApplicationMaster container but dask-yarn didn't recognize this configuration and then started a local Scheduler alongside the Scheduler running the Application.

Hmmm, I could see how that would happen. I think this will require a patch to dask-yarn. The check here should be expanded to also check if the application master has a script field configured (and if it does, will assume the scheduler is running there). Is this a PR you'd be interested in making?


If the scheduler goes down and restarts, how do you handle that from your client code? Dask (and dask-yarn) isn't really designed for this, so you must be doing something tricky to make it work.

@gallamine
Copy link
Author

If the scheduler goes down and restarts, how do you handle that from your client code?

This is essentially the problem I'm trying to mitigate - I'm trying to reduce as many failure scenarios as possible. It seems to me that I'd prefer the AppMaster and Scheduler to live and die together, so to speak. Otherwise then I double my failure footprint if the Scheduler is on a separate node from the AppMaster (either failing will cause the cluster to fail). The amount of different options for restarts and retries is a little complex, so I'm just hunting for a setup that will either be robust to restarts or cause immediate failure that my client code can deal with - right now we're running into situations where the client hangs up because the workers and schedulers end up in a strange state.

Is this a PR you'd be interested in making?

Yes, possibly. It seems like a straightforward addition but I need to talk to my teammates to see what they think about our options.

BTW, thank you for all your contributions! I appreciate them.

@jcrist
Copy link
Member

jcrist commented Feb 5, 2021

If you run the scheduler on the application master, the cluster will properly handle restarts (the whole application will restart - scheduler, application master, and workers). But your client code won't handle this properly. Dask isn't designed to handle scheduler failures gracefully - usually in this case users resubmit the whole job. You could do this yourself in the client code by running a loop with a try-except block that tries to reconnect on scheduler failure using YarnCluster.from_application_id, but any client-side handling of a failed scheduler node would be on you.

Usually we recommend users to write their jobs in a way that if the scheduler fails, they restart their client script as well. This wouldn't play well with YARN application restarts as currently written, but if the scheduler and your client-side script were all running on the application master container it would.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants