Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help: Scheduler on cluster doesn't seem to work #341

Closed
pvanderlinden opened this issue May 28, 2021 · 8 comments
Closed

Help: Scheduler on cluster doesn't seem to work #341

pvanderlinden opened this issue May 28, 2021 · 8 comments

Comments

@pvanderlinden
Copy link
Contributor

This might need to be split into several tickets. I just tried to upgrade to a newer version of dask-kubernetes. If I switch on legacy mode, this seems to work fine. But if I switch to the new mode, where the scheduler runs as a separate pod, I run into several issues, but I might miss something which will resolve all three of these:

A small issue: The scheduler will take the same name as a worker (so you don't know which pod is the scheduler by looking at the name), but worse, it also uses the same resource requests (which it doesn't really need). Also, because the scheduler runs as a separate container, this will be a nightmare when you the client pod is killed/crashes (not terminated), as it won't cleanup anything, and instead of the old situation (workers exciting after 60 seconds) the workers and the scheduler will just stick around forever.

The bigger issue: I can't get it working at all, there are pickle errors when trying to connect to the scheduler both by the worker and the client distributed.protocol.pickle - INFO - Failed to deserialize, although it seems to be masked by a timeout error.

Is the legacy mode going to disappear in the long run(the name suggests it), or is it safe to keep using it?

@pvanderlinden pvanderlinden changed the title Scheduler on cluster doesn't seem to work Help: Scheduler on cluster doesn't seem to work May 28, 2021
@jacobtomlinson
Copy link
Member

The scheduler will take the same name as a worker

Yeah we should definitely fix that. Could you please raise a separate issue for this?

Also, because the scheduler runs as a separate container, this will be a nightmare when you the client pod is killed/crashes (not terminated), as it won't cleanup anything, and instead of the old situation (workers exciting after 60 seconds) the workers and the scheduler will just stick around forever.

The scheduler should exit after it is idle for a certain amount of time. If that isn't working correctly then could you please raise a separate issue for that?

I can't get it working at all, there are pickle errors when trying to connect to the scheduler both by the worker and the client distributed.protocol.pickle - INFO - Failed to deserialize, although it seems to be masked by a timeout error.

This sounds like a Dask or Python version mismatch between your local environment and the one that is running on Kubernetes. Can you check that your versions match in both environments?

Is the legacy mode going to disappear in the long run(the name suggests it), or is it safe to keep using it?

This is currently undecided.

In the long term I want to replace all of KubeCluster with an operator based version that supports dask-ctl as this will make cluster creation/deletion much more robust, but is a departure from where things were with the original (now legacy) version.

The current remote scheduler mode is a step towards this, but has drawbacks as you say.

Perhaps instead of modifying KubeCluster any further it would be best to introduce a new class altogether going forward. In which case we could leave KubeCluster roughly as it is today, but it's unlikely we would support it or make any big fixes.

@pvanderlinden
Copy link
Contributor Author

Created a separate tickets for some of the issues:
#342
#343

I can't get it working at all, there are pickle errors when trying to connect to the scheduler both by the worker and the client distributed.protocol.pickle - INFO - Failed to deserialize, although it seems to be masked by a timeout error.

This sounds like a Dask or Python version mismatch between your local environment and the one that is running on Kubernetes. Can you check that your versions match in both environments?

@jacobtomlinson The only thing which is different, is python 3.7.3 vs 3.7.10, but this works fine if I set deploy_mode='local' with the same version difference. I'm actually not sure what is happening as I see a ton of errors in the kubernetes containers. The worker will output 2 exceptions at the same time. It seems like the worker gets the instruction to connect to the kubectl proxy instance running on the host, which doesn't make sense: (54476 is the port used by the automatically run kubectl proxy start). Maybe that is the cause of the deserialisation issue?

[rq-dask-worker-bb01c74d-6rlw7x] distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95\x9f\x00\x00\x00\x00\x00\x00\x00}\x94(\x8c\x08job_name\x94\x8c\x06rollup\x94\x8c\x07updates\x94\x8c\x12distributed.queues\x94\x8c\x05Queue\x94\x93\x94)\x81\x94\x8c&queue-f6d9b2b350c840faaeac62337a0ef7dd\x94\x8c\x15tcp://localhost:54476\x94\x86\x94b\x8c\x0bservice_ids\x94]\x94(K\x0eKheu.' 
[rq-dask-worker-bb01c74d-6rlw7x] ConnectionRefusedError: [Errno 111] Connection refused 
[rq-dask-worker-bb01c74d-6rlw7x]  
[rq-dask-worker-bb01c74d-6rlw7x] The above exception was the direct cause of the following exception: 
[rq-dask-worker-bb01c74d-6rlw7x]  
[rq-dask-worker-bb01c74d-6rlw7x] Traceback (most recent call last): 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 287, in connect 
[rq-dask-worker-bb01c74d-6rlw7x]     timeout=min(intermediate_cap, time_left()), 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/asyncio/tasks.py", line 442, in wait_for 
[rq-dask-worker-bb01c74d-6rlw7x]     return fut.result() 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 391, in connect 
[rq-dask-worker-bb01c74d-6rlw7x]     convert_stream_closed_error(self, e) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error 
[rq-dask-worker-bb01c74d-6rlw7x]     ) from exc 
[rq-dask-worker-bb01c74d-6rlw7x] distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x7f5ff6981550>: ConnectionRefusedError: [Errno 111] Connection refused 
[rq-dask-worker-bb01c74d-6rlw7x]  
[rq-dask-worker-bb01c74d-6rlw7x] The above exception was the direct cause of the following exception: 
[rq-dask-worker-bb01c74d-6rlw7x]  
[rq-dask-worker-bb01c74d-6rlw7x] Traceback (most recent call last): 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads 
[rq-dask-worker-bb01c74d-6rlw7x]     return pickle.loads(x) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/queues.py", line 289, in __setstate__ 
[rq-dask-worker-bb01c74d-6rlw7x]     client = get_client(address) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 3446, in get_client 
[rq-dask-worker-bb01c74d-6rlw7x]     return Client(address, timeout=timeout) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 754, in __init__ 
[rq-dask-worker-bb01c74d-6rlw7x]     self.start(timeout=timeout) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 967, in start 
[rq-dask-worker-bb01c74d-6rlw7x]     sync(self.loop, self._start, **kwargs) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 354, in sync 
[rq-dask-worker-bb01c74d-6rlw7x]     raise exc.with_traceback(tb) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 337, in f 
[rq-dask-worker-bb01c74d-6rlw7x]     result[0] = yield future 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 762, in run 
[rq-dask-worker-bb01c74d-6rlw7x]     value = future.result() 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1057, in _start 
[rq-dask-worker-bb01c74d-6rlw7x]     await self._ensure_connected(timeout=timeout) 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1115, in _ensure_connected 
[rq-dask-worker-bb01c74d-6rlw7x]     self.scheduler.address, timeout=timeout, **self.connection_args 
[rq-dask-worker-bb01c74d-6rlw7x]   File "/usr/local/lib/python3.7/site-packages/distributed/comm/core.py", line 311, in connect 
[rq-dask-worker-bb01c74d-6rlw7x]     ) from active_exception 
[rq-dask-worker-bb01c74d-6rlw7x] OSError: Timed out trying to connect to tcp://127.0.0.1:54476 after 10 s 
[rq-dask-worker-bb01c74d-6rlw7x] distributed.worker - WARNING - Could not deserialize task 

@jacobtomlinson
Copy link
Member

Thanks for raising separate tickets for those.

I am surprised to see the 127.0.0.1 address in that error, it doesn't match up with what I would expect given the code.

self.address = "tcp://{name}.{namespace}:{port}".format(
name=self.service.metadata.name,
namespace=self.namespace,
port=SCHEDULER_PORT,
)

Deserialization issues are almost always a version mismatch issue. I would be surprised if a minor Python version would cause this though. Are you certain that all the package versions in the conda environment match?

I am unable to reproduce this locally, and without a minimum reproducible example this is pretty hard to debug. When you opened this issue there was a template that requests a bunch of info, it would really help if you could provide that.

@pvanderlinden
Copy link
Contributor Author

I will extract a reproducible example. All versions are the same, and latest dask packages (created a fresh environment and docker image before testing today).

I'm not sure why this localhost address is shipped to the worker, it seems to be coming from dask kubernetes

@pvanderlinden
Copy link
Contributor Author

@jacobtomlinson

import dask.distributed as dd
from dask_kubernetes import KubeCluster


def test(a):
    a.put("a")


if __name__ == "__main__":
    cluster = KubeCluster("spec.yml", n_workers=1)
    client = dd.Client(cluster)

    queue = dd.Queue()
    future = client.submit(test, a=queue)
    client.gather(future)
    print(queue.get())

The issue is that if you ship a queue to a task, which works normally.

@jacobtomlinson
Copy link
Member

Could you share your spec.yml too, I'd like to try and reproduce this locally.

@pvanderlinden
Copy link
Contributor Author

This is the default from the docs (besides the large cpu/mem requests):

kind: Pod
metadata:
  labels:
    foo: bar
spec:
  restartPolicy: Never
  containers:
  - image: daskdev/dask:latest
    imagePullPolicy: IfNotPresent
    args: [dask-worker, --nthreads, '2', --no-dashboard, --memory-limit, 6GB, --death-timeout, '60']
    name: dask
    env:
      - name: EXTRA_PIP_PACKAGES
        value: git+https://github.com/dask/distributed
    resources:
      requests:
        cpu: "50m"
        memory: 500m

@jacobtomlinson
Copy link
Member

The classic KubeCluster was removed in #890. All users will need to migrate to the Dask Operator. Closing.

@jacobtomlinson jacobtomlinson closed this as not planned Won't fix, can't repro, duplicate, stale Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants