Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client submit with workers doesn’t handle new joining workers correctly #8862

Open
YuriFeigin opened this issue Sep 3, 2024 · 1 comment
Labels
bug Something is broken scheduling

Comments

@YuriFeigin
Copy link

Use client.submit() with workers argument will run on other workers if they will be joining after you run the submit

Do reproduce it, first run a scheduler and the two workers

dask scheduler
dask worker tcp://192.0.0.100:8786 --nthreads 1 --name a-0
dask worker tcp://192.0.0.100:8786 --nthreads 1 --name a-1

then submit the jobs

import time
from distributed import get_worker
from dask.distributed import Client

client = Client("tcp://192.0.0.100:8786")


def run_me(i):
    worker = get_worker()
    print(f"worker name {worker.name}, run {i}")
    time.sleep(30)


futures = []
for i in range(100):
    futures.append(client.submit(run_me, i, workers=["a-0", "b-0"]))

for f in futures:
    f.result()

This is working properly and only worker a-0 is getting jobs.
adding one more worker

dask worker tcp://192.0.0.100:8786 --nthreads 1 --name b-0

Now worker a-1 start getting jobs as well
P.S, If instead of adding worker with name b-0 I add worker with name b-1, everything will work correct, so the problem is only happen when I'm adding a worker name that part of workers argument.

  • Dask version: 2024.8.1
  • Python version: 3.10.14
  • Operating System: macOS
  • Install method (conda, pip, source): pip
@hendrikmakait hendrikmakait transferred this issue from dask/dask Sep 5, 2024
@fjetter
Copy link
Member

fjetter commented Oct 15, 2024

I can confirm this. Below is a version that runs in a jupyter notebook

import time
from distributed import LocalCluster

cluster = LocalCluster(n_workers=0)
client = cluster.get_client()
print(cluster.dashboard_link)

from distributed import Worker
wa = Worker(cluster.scheduler_comm.address, name="a-0")
await wa

from distributed import get_worker
def run_me(i):
    worker = get_worker()
    print(f"worker name {worker.name}, run {i}")
    time.sleep(5)


futures = []
for i in range(100):
    futures.append(client.submit(run_me, i, workers=["a-0", "b-0"]))

from distributed import Worker
wa1 = Worker(cluster.scheduler_comm.address, name="a-1")
await wa1

from distributed import Worker
wb0 = Worker(cluster.scheduler_comm.address, name="b-0")
await wb0

@jacobtomlinson jacobtomlinson added bug Something is broken scheduling and removed needs triage labels Oct 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken scheduling
Projects
None yet
Development

No branches or pull requests

3 participants