From a262531d981a9134ef35c4e879d01d5a49773692 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Wed, 28 Jul 2021 22:00:50 +0200 Subject: [PATCH 01/10] Explicitly set a host when using NodePort Service Previously, if the service type was a NodePort, KubeCluster would attempt to discover the IP address by listing nodes. This is a set of permissions that not all users will have, so this PR adds the ability to explicitly set a known host. --- .gitignore | 3 +++ dask_kubernetes/core.py | 5 +++-- dask_kubernetes/tests/test_async.py | 10 +++++++++- dask_kubernetes/utils.py | 7 ++++--- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index f6da992d2..fc48097b3 100644 --- a/.gitignore +++ b/.gitignore @@ -138,3 +138,6 @@ credentials.csv *.pub *.rdp *_rsa + +# IDE settings +.idea \ No newline at end of file diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index c467566ca..52f3751c4 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -152,7 +152,7 @@ def __init__(self, scheduler: str, name=None, **kwargs): class Scheduler(Pod): - """A Remote Dask Scheduler controled by Kubernetes + """A Remote Dask Scheduler controlled by Kubernetes Parameters ---------- idle_timeout: str, optional @@ -198,7 +198,7 @@ async def start(self, **kwargs): port=SCHEDULER_PORT, ) self.external_address = await get_external_address_for_scheduler_service( - self.core_api, self.service + self.core_api, self.service, host=self.kwargs["host"] ) self.pdb = await self._create_pdb() @@ -578,6 +578,7 @@ async def _start(self): "idle_timeout": self._idle_timeout, "service_wait_timeout_s": self._scheduler_service_wait_timeout, "pod_template": self.scheduler_pod_template, + "host": self.host, **common_options, }, } diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index 8eae3e9db..133934ea0 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -23,7 +23,6 @@ from distributed.utils import tmpfile from distributed.utils_test import captured_logger - TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") FAKE_CERT = os.path.join(TEST_DIR, "fake-cert-file") @@ -390,6 +389,15 @@ async def test_constructor_parameters(k8s_cluster, pod_spec): assert pod.metadata.generate_name == "myname" +@pytest.mark.asyncio +@pytest.mark.xfail(reason="Can't communicate with kind nodes") +async def test_passing_host_to_nodeport_service(k8s_cluster, pod_spec): + with dask.config.set({"kubernetes.scheduler-service-type": "NodePort"}): + async with KubeCluster(pod_spec, host="127.0.0.1", **cluster_kwargs) as cluster: + assert cluster.scheduler.service_template.spec.type == "NodePort" + assert cluster.scheduler.external_address == "127.0.0.1" + + @pytest.mark.asyncio async def test_reject_evicted_workers(cluster): cluster.scale(1) diff --git a/dask_kubernetes/utils.py b/dask_kubernetes/utils.py index 363a22a53..b78f1ce4a 100644 --- a/dask_kubernetes/utils.py +++ b/dask_kubernetes/utils.py @@ -41,7 +41,7 @@ def namespace_default(): async def get_external_address_for_scheduler_service( - core_api, service, port_forward_cluster_ip=None + core_api, service, port_forward_cluster_ip=None, host=None ): """Take a service object and return the scheduler address.""" [port] = [ @@ -53,8 +53,9 @@ async def get_external_address_for_scheduler_service( lb = service.status.load_balancer.ingress[0] host = lb.hostname or lb.ip elif service.spec.type == "NodePort": - nodes = await core_api.list_node() - host = nodes.items[0].status.addresses[0].address + if host is None: + nodes = await core_api.list_node() + host = nodes.items[0].status.addresses[0].address elif service.spec.type == "ClusterIP": try: # Try to resolve the service name. If we are inside the cluster this should succeeed. From 6f7dd7ead0602e34e3a64ce1c07b82d3568cf1ce Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Wed, 11 Aug 2021 21:57:15 +0200 Subject: [PATCH 02/10] Removed IDE specific folders from .gitignore --- .gitignore | 3 --- 1 file changed, 3 deletions(-) diff --git a/.gitignore b/.gitignore index fc48097b3..f6da992d2 100644 --- a/.gitignore +++ b/.gitignore @@ -138,6 +138,3 @@ credentials.csv *.pub *.rdp *_rsa - -# IDE settings -.idea \ No newline at end of file From ed35a14cb8afd574c21b70940549d1a7acd7c94a Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Wed, 11 Aug 2021 23:31:10 +0200 Subject: [PATCH 03/10] Added nodeport_host and scheduler_service_type kwargs to KubeCluster Added the option to set scheduler_service_type in the KubeCluster init Added the nodeport_host option to KubeCluster which allows the enduser to override the default behaviour of listing all nodes to get the IP of a node. --- dask_kubernetes/core.py | 35 +++++++++++++---- dask_kubernetes/kubernetes.yaml | 1 + dask_kubernetes/tests/test_async.py | 60 +++++++++++++++++++++++------ dask_kubernetes/utils.py | 3 +- 4 files changed, 80 insertions(+), 19 deletions(-) diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index 52f3751c4..ad503f4d6 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -191,14 +191,16 @@ async def start(self, **kwargs): self.address = line.split("Scheduler at:")[1].strip() await asyncio.sleep(0.1) - self.service = await self._create_service() + self.service = await self._create_service( + scheduler_service_type=self.kwargs.get("scheduler_service_type") + ) self.address = "tcp://{name}.{namespace}:{port}".format( name=self.service.metadata.name, namespace=self.namespace, port=SCHEDULER_PORT, ) self.external_address = await get_external_address_for_scheduler_service( - self.core_api, self.service, host=self.kwargs["host"] + self.core_api, self.service, nodeport_host=self.kwargs["nodeport_host"] ) self.pdb = await self._create_pdb() @@ -214,7 +216,7 @@ async def close(self, **kwargs): ) await super().close(**kwargs) - async def _create_service(self): + async def _create_service(self, scheduler_service_type=None): service_template_dict = dask.config.get("kubernetes.scheduler-service-template") self.service_template = clean_service_template( make_service_from_dict(service_template_dict) @@ -225,7 +227,8 @@ async def _create_service(self): self.service_template.spec.selector["dask.org/cluster-name"] = self.cluster_name if self.service_template.spec.type is None: self.service_template.spec.type = dask.config.get( - "kubernetes.scheduler-service-type" + "kubernetes.scheduler-service-type", + override_with=scheduler_service_type, ) await self.core_api.create_namespaced_service( self.namespace, self.service_template @@ -315,7 +318,11 @@ class KubeCluster(SpecCluster): env: Dict[str, str] Dictionary of environment variables to pass to worker pod host: str - Listen address for local scheduler. Defaults to 0.0.0.0 + Listen address for local scheduler. Only used when deploy_mode == "local". Defaults to + 0.0.0.0 + nodeport_host: str + When scheduler_service_type == ``"NodePort"``, override KubeCluster's default behaviour + of listing nodes by specifying a nodeport_host. port: int Port of local scheduler auth: List[ClusterAuth] (optional) @@ -329,6 +336,9 @@ class KubeCluster(SpecCluster): Timeout, in seconds, to wait for the remote scheduler service to be ready. Defaults to 30 seconds. Set to 0 to disable the timeout (not recommended). + scheduler_service_type: str + The Service type used to expose the scheduler. Can be one of ``"ClusterIP"``, + ``"NodePort"`` or ``"LoadBalancer"``. Defaults to ``"ClusterIP"`` deploy_mode: str (optional) Run the scheduler as "local" or "remote". Defaults to ``"remote"``. @@ -341,7 +351,8 @@ class KubeCluster(SpecCluster): >>> pod_spec = make_pod_spec(image='daskdev/dask:latest', ... memory_limit='4G', memory_request='4G', ... cpu_limit=1, cpu_request=1, - ... env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'}) + ... env={'EXTRA_PIP_PACKAGES': 'fastparquet + git+https://github.com/dask/distributed'}) >>> cluster = KubeCluster(pod_spec) >>> cluster.scale(10) @@ -415,6 +426,8 @@ def __init__( security=None, scheduler_service_wait_timeout=None, scheduler_pod_template=None, + scheduler_service_type="ClusterIP", + nodeport_host=None, **kwargs ): if isinstance(pod_template, str): @@ -451,6 +464,9 @@ def __init__( "kubernetes.scheduler-service-wait-timeout", override_with=scheduler_service_wait_timeout, ) + self._scheduler_service_type = dask.config.get( + "kubernetes.scheduler-service-type", override_with=scheduler_service_type + ) self.security = security if self.security and not isinstance( self.security, distributed.security.Security @@ -461,6 +477,10 @@ def __init__( self.host = dask.config.get("kubernetes.host", override_with=host) self.port = dask.config.get("kubernetes.port", override_with=port) self.env = dask.config.get("kubernetes.env", override_with=env) + + self.nodeport_host = dask.config.get( + "kubernetes.scheduler-nodeport-host", override_with=nodeport_host + ) self.auth = auth self.kwargs = kwargs super().__init__(**self.kwargs) @@ -578,7 +598,8 @@ async def _start(self): "idle_timeout": self._idle_timeout, "service_wait_timeout_s": self._scheduler_service_wait_timeout, "pod_template": self.scheduler_pod_template, - "host": self.host, + "nodeport_host": self.nodeport_host, + "scheduler_service_type": self._scheduler_service_type, **common_options, }, } diff --git a/dask_kubernetes/kubernetes.yaml b/dask_kubernetes/kubernetes.yaml index ad4b25db5..5b6141289 100644 --- a/dask_kubernetes/kubernetes.yaml +++ b/dask_kubernetes/kubernetes.yaml @@ -14,6 +14,7 @@ kubernetes: dashboard_address: ":8787" scheduler-service-type: "ClusterIP" + scheduler-nodeport-host: null # Timeout to wait for the scheduler service to be up (in seconds) # Set it to 0 to wait indefinitely (not recommended) scheduler-service-wait-timeout: 30 diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index 133934ea0..8a750aa36 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -4,13 +4,15 @@ import os import random from time import time -import yaml +import dask import kubernetes_asyncio as kubernetes import pytest - -import dask +import yaml from dask.distributed import Client, wait +from distributed.utils import tmpfile +from distributed.utils_test import captured_logger + import dask_kubernetes from dask_kubernetes import ( KubeCluster, @@ -20,8 +22,8 @@ KubeConfig, KubeAuth, ) -from distributed.utils import tmpfile -from distributed.utils_test import captured_logger +from dask_kubernetes.core import Scheduler +from dask_kubernetes.utils import get_external_address_for_scheduler_service TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") @@ -390,12 +392,48 @@ async def test_constructor_parameters(k8s_cluster, pod_spec): @pytest.mark.asyncio -@pytest.mark.xfail(reason="Can't communicate with kind nodes") -async def test_passing_host_to_nodeport_service(k8s_cluster, pod_spec): - with dask.config.set({"kubernetes.scheduler-service-type": "NodePort"}): - async with KubeCluster(pod_spec, host="127.0.0.1", **cluster_kwargs) as cluster: - assert cluster.scheduler.service_template.spec.type == "NodePort" - assert cluster.scheduler.external_address == "127.0.0.1" +async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( + k8s_cluster, pod_spec +): + cluster = KubeCluster( + pod_template=pod_spec, + namespace="default", + nodeport_host="10.10.10.10", + scheduler_service_type="NodePort", + **cluster_kwargs, + ) + + scheduler_pod_template = cluster._get_pod_template( + cluster.pod_template, pod_type="scheduler" + ) + scheduler_pod_template = clean_pod_template( + pod_template=scheduler_pod_template, pod_type="scheduler" + ) + scheduler_pod_template = cluster._fill_pod_templates( + scheduler_pod_template, pod_type="scheduler" + ) + + scheduler = Scheduler( + idle_timeout=cluster._idle_timeout, + service_wait_timeout_s=cluster._scheduler_service_wait_timeout, + pod_template=scheduler_pod_template, + nodeport_host=cluster.nodeport_host, + scheduler_service_type=cluster._scheduler_service_type, + cluster=cluster, + core_api=kubernetes.client.CoreV1Api(), + policy_api=kubernetes.client.PolicyV1beta1Api(), + namespace=cluster.namespace, + loop=cluster.loop, + ) + + await scheduler._create_service(scheduler.kwargs["scheduler_service_type"]) + + external_address = await get_external_address_for_scheduler_service( + scheduler.core_api, + scheduler.service, + nodeport_host=scheduler.kwargs["nodeport_host"], + ) + assert external_address == "10.10.10.10" @pytest.mark.asyncio diff --git a/dask_kubernetes/utils.py b/dask_kubernetes/utils.py index b78f1ce4a..617bc6fc0 100644 --- a/dask_kubernetes/utils.py +++ b/dask_kubernetes/utils.py @@ -41,7 +41,7 @@ def namespace_default(): async def get_external_address_for_scheduler_service( - core_api, service, port_forward_cluster_ip=None, host=None + core_api, service, port_forward_cluster_ip=None, nodeport_host=None ): """Take a service object and return the scheduler address.""" [port] = [ @@ -53,6 +53,7 @@ async def get_external_address_for_scheduler_service( lb = service.status.load_balancer.ingress[0] host = lb.hostname or lb.ip elif service.spec.type == "NodePort": + host = nodeport_host if host is None: nodes = await core_api.list_node() host = nodes.items[0].status.addresses[0].address From 3d46f73f99117a9851113ed50e214c70bb7000ad Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Thu, 12 Aug 2021 00:00:17 +0200 Subject: [PATCH 04/10] Added test for setting nodeport_host. Simulates running _start, as kind doesn't support NodePorts --- dask_kubernetes/tests/test_async.py | 96 +++++++++++++++-------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index 8a750aa36..cfdd21aa5 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -3,6 +3,7 @@ import getpass import os import random +import uuid from time import time import dask @@ -23,7 +24,7 @@ KubeAuth, ) from dask_kubernetes.core import Scheduler -from dask_kubernetes.utils import get_external_address_for_scheduler_service +from dask_kubernetes.utils import get_external_address_for_scheduler_service, escape TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") @@ -128,9 +129,9 @@ async def test_logs(remote_cluster): assert len(logs) == 4 for _, log in logs.items(): assert ( - "distributed.scheduler" in log - or "distributed.worker" in log - or "Creating scheduler pod" in log + "distributed.scheduler" in log + or "distributed.worker" in log + or "Creating scheduler pod" in log ) @@ -149,7 +150,7 @@ async def test_diagnostics_link_env_variable(k8s_cluster, pod_spec, user_env): port = cluster.scheduler_info["services"]["dashboard"] assert ( - "foo-" + getpass.getuser() + "-" + str(port) in cluster.dashboard_link + "foo-" + getpass.getuser() + "-" + str(port) in cluster.dashboard_link ) @@ -377,7 +378,7 @@ async def test_pod_template_from_conf(docker_image): async def test_constructor_parameters(k8s_cluster, pod_spec): env = {"FOO": "BAR", "A": 1} async with KubeCluster( - pod_spec, name="myname", env=env, **cluster_kwargs + pod_spec, name="myname", env=env, **cluster_kwargs ) as cluster: pod = cluster.pod_template assert pod.metadata.namespace == "default" @@ -393,7 +394,7 @@ async def test_constructor_parameters(k8s_cluster, pod_spec): @pytest.mark.asyncio async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( - k8s_cluster, pod_spec + k8s_cluster, pod_spec ): cluster = KubeCluster( pod_template=pod_spec, @@ -403,20 +404,24 @@ async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( **cluster_kwargs, ) - scheduler_pod_template = cluster._get_pod_template( + cluster.scheduler_pod_template = cluster._get_pod_template( cluster.pod_template, pod_type="scheduler" ) - scheduler_pod_template = clean_pod_template( - pod_template=scheduler_pod_template, pod_type="scheduler" + cluster.scheduler_pod_template.spec.containers[0].args = ["dask-scheduler"] + cluster.scheduler_pod_template = clean_pod_template( + pod_template=cluster.scheduler_pod_template, pod_type="scheduler" ) - scheduler_pod_template = cluster._fill_pod_templates( - scheduler_pod_template, pod_type="scheduler" + await ClusterAuth.load_first(cluster.auth) + cluster._generate_name = escape(cluster._generate_name.format(user=getpass.getuser(), + uuid=str(uuid.uuid4())[:10])) + cluster.scheduler_pod_template = cluster._fill_pod_templates( + cluster.scheduler_pod_template, pod_type="scheduler" ) scheduler = Scheduler( idle_timeout=cluster._idle_timeout, service_wait_timeout_s=cluster._scheduler_service_wait_timeout, - pod_template=scheduler_pod_template, + pod_template=cluster.scheduler_pod_template, nodeport_host=cluster.nodeport_host, scheduler_service_type=cluster._scheduler_service_type, cluster=cluster, @@ -426,14 +431,15 @@ async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( loop=cluster.loop, ) - await scheduler._create_service(scheduler.kwargs["scheduler_service_type"]) + scheduler.service = await scheduler._create_service(scheduler.kwargs["scheduler_service_type"]) external_address = await get_external_address_for_scheduler_service( scheduler.core_api, scheduler.service, nodeport_host=scheduler.kwargs["nodeport_host"], ) - assert external_address == "10.10.10.10" + assert external_address == "tcp://10.10.10.10:8786" + assert scheduler.service.spec.type == "NodePort" @pytest.mark.asyncio @@ -638,8 +644,8 @@ async def test_repr(cluster): for text in [repr(cluster), str(cluster)]: assert "Box" not in text assert ( - cluster.scheduler.address in text - or cluster.scheduler.external_address in text + cluster.scheduler.address in text + or cluster.scheduler.external_address in text ) @@ -682,19 +688,19 @@ async def test_maximum(cluster): def test_default_toleration(pod_spec): tolerations = pod_spec.to_dict()["spec"]["tolerations"] assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org/dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org_dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations def test_default_toleration_preserved(docker_image): @@ -714,24 +720,24 @@ def test_default_toleration_preserved(docker_image): ) tolerations = pod_spec.to_dict()["spec"]["tolerations"] assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org/dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org_dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "example.org/toleration", - "operator": "Exists", - "effect": "NoSchedule", - } in tolerations + "key": "example.org/toleration", + "operator": "Exists", + "effect": "NoSchedule", + } in tolerations @pytest.mark.asyncio From 7eec453e35afa891a58317263b1c30eaa2c7b385 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Thu, 12 Aug 2021 00:02:15 +0200 Subject: [PATCH 05/10] Linting --- dask_kubernetes/tests/test_async.py | 83 +++++++++++++++-------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index cfdd21aa5..c71c19c86 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -129,9 +129,9 @@ async def test_logs(remote_cluster): assert len(logs) == 4 for _, log in logs.items(): assert ( - "distributed.scheduler" in log - or "distributed.worker" in log - or "Creating scheduler pod" in log + "distributed.scheduler" in log + or "distributed.worker" in log + or "Creating scheduler pod" in log ) @@ -150,7 +150,7 @@ async def test_diagnostics_link_env_variable(k8s_cluster, pod_spec, user_env): port = cluster.scheduler_info["services"]["dashboard"] assert ( - "foo-" + getpass.getuser() + "-" + str(port) in cluster.dashboard_link + "foo-" + getpass.getuser() + "-" + str(port) in cluster.dashboard_link ) @@ -378,7 +378,7 @@ async def test_pod_template_from_conf(docker_image): async def test_constructor_parameters(k8s_cluster, pod_spec): env = {"FOO": "BAR", "A": 1} async with KubeCluster( - pod_spec, name="myname", env=env, **cluster_kwargs + pod_spec, name="myname", env=env, **cluster_kwargs ) as cluster: pod = cluster.pod_template assert pod.metadata.namespace == "default" @@ -394,7 +394,7 @@ async def test_constructor_parameters(k8s_cluster, pod_spec): @pytest.mark.asyncio async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( - k8s_cluster, pod_spec + k8s_cluster, pod_spec ): cluster = KubeCluster( pod_template=pod_spec, @@ -412,8 +412,11 @@ async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( pod_template=cluster.scheduler_pod_template, pod_type="scheduler" ) await ClusterAuth.load_first(cluster.auth) - cluster._generate_name = escape(cluster._generate_name.format(user=getpass.getuser(), - uuid=str(uuid.uuid4())[:10])) + cluster._generate_name = escape( + cluster._generate_name.format( + user=getpass.getuser(), uuid=str(uuid.uuid4())[:10] + ) + ) cluster.scheduler_pod_template = cluster._fill_pod_templates( cluster.scheduler_pod_template, pod_type="scheduler" ) @@ -431,7 +434,9 @@ async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( loop=cluster.loop, ) - scheduler.service = await scheduler._create_service(scheduler.kwargs["scheduler_service_type"]) + scheduler.service = await scheduler._create_service( + scheduler.kwargs["scheduler_service_type"] + ) external_address = await get_external_address_for_scheduler_service( scheduler.core_api, @@ -644,8 +649,8 @@ async def test_repr(cluster): for text in [repr(cluster), str(cluster)]: assert "Box" not in text assert ( - cluster.scheduler.address in text - or cluster.scheduler.external_address in text + cluster.scheduler.address in text + or cluster.scheduler.external_address in text ) @@ -688,19 +693,19 @@ async def test_maximum(cluster): def test_default_toleration(pod_spec): tolerations = pod_spec.to_dict()["spec"]["tolerations"] assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org/dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org_dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations def test_default_toleration_preserved(docker_image): @@ -720,24 +725,24 @@ def test_default_toleration_preserved(docker_image): ) tolerations = pod_spec.to_dict()["spec"]["tolerations"] assert { - "key": "k8s.dask.org/dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org/dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "k8s.dask.org_dedicated", - "operator": "Equal", - "value": "worker", - "effect": "NoSchedule", - "toleration_seconds": None, - } in tolerations + "key": "k8s.dask.org_dedicated", + "operator": "Equal", + "value": "worker", + "effect": "NoSchedule", + "toleration_seconds": None, + } in tolerations assert { - "key": "example.org/toleration", - "operator": "Exists", - "effect": "NoSchedule", - } in tolerations + "key": "example.org/toleration", + "operator": "Exists", + "effect": "NoSchedule", + } in tolerations @pytest.mark.asyncio From 055fb24c5bee36580723d6c600aa596185ba7fd8 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Fri, 13 Aug 2021 19:55:03 +0200 Subject: [PATCH 06/10] Update dask_kubernetes/core.py Co-authored-by: Jacob Tomlinson --- dask_kubernetes/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index ad503f4d6..cbb5f0b16 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -321,8 +321,8 @@ class KubeCluster(SpecCluster): Listen address for local scheduler. Only used when deploy_mode == "local". Defaults to 0.0.0.0 nodeport_host: str - When scheduler_service_type == ``"NodePort"``, override KubeCluster's default behaviour - of listing nodes by specifying a nodeport_host. + When scheduler_service_type == ``"NodePort"``, select a specific node IP to connect to. + When this is not set the node IP will be autodetected. port: int Port of local scheduler auth: List[ClusterAuth] (optional) From 177447a14c62e25b60e877f5cb976e93d62410f2 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Fri, 13 Aug 2021 19:55:09 +0200 Subject: [PATCH 07/10] Update dask_kubernetes/utils.py Co-authored-by: Jacob Tomlinson --- dask_kubernetes/utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask_kubernetes/utils.py b/dask_kubernetes/utils.py index 617bc6fc0..2a255b5ed 100644 --- a/dask_kubernetes/utils.py +++ b/dask_kubernetes/utils.py @@ -53,8 +53,9 @@ async def get_external_address_for_scheduler_service( lb = service.status.load_balancer.ingress[0] host = lb.hostname or lb.ip elif service.spec.type == "NodePort": - host = nodeport_host - if host is None: + if nodeport_host is not None: + host = nodeport_host + else: nodes = await core_api.list_node() host = nodes.items[0].status.addresses[0].address elif service.spec.type == "ClusterIP": From d808dd4cb2364f9cf67df5e7c82118d882df6115 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Fri, 13 Aug 2021 19:56:43 +0200 Subject: [PATCH 08/10] Update dask_kubernetes/tests/test_async.py Co-authored-by: Jacob Tomlinson --- dask_kubernetes/tests/test_async.py | 51 +++-------------------------- 1 file changed, 5 insertions(+), 46 deletions(-) diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index c71c19c86..8409c9c36 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -396,55 +396,14 @@ async def test_constructor_parameters(k8s_cluster, pod_spec): async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( k8s_cluster, pod_spec ): - cluster = KubeCluster( - pod_template=pod_spec, - namespace="default", - nodeport_host="10.10.10.10", - scheduler_service_type="NodePort", - **cluster_kwargs, - ) - - cluster.scheduler_pod_template = cluster._get_pod_template( - cluster.pod_template, pod_type="scheduler" - ) - cluster.scheduler_pod_template.spec.containers[0].args = ["dask-scheduler"] - cluster.scheduler_pod_template = clean_pod_template( - pod_template=cluster.scheduler_pod_template, pod_type="scheduler" - ) - await ClusterAuth.load_first(cluster.auth) - cluster._generate_name = escape( - cluster._generate_name.format( - user=getpass.getuser(), uuid=str(uuid.uuid4())[:10] - ) - ) - cluster.scheduler_pod_template = cluster._fill_pod_templates( - cluster.scheduler_pod_template, pod_type="scheduler" - ) - - scheduler = Scheduler( - idle_timeout=cluster._idle_timeout, - service_wait_timeout_s=cluster._scheduler_service_wait_timeout, - pod_template=cluster.scheduler_pod_template, - nodeport_host=cluster.nodeport_host, - scheduler_service_type=cluster._scheduler_service_type, - cluster=cluster, - core_api=kubernetes.client.CoreV1Api(), - policy_api=kubernetes.client.PolicyV1beta1Api(), - namespace=cluster.namespace, - loop=cluster.loop, - ) - - scheduler.service = await scheduler._create_service( - scheduler.kwargs["scheduler_service_type"] - ) - + from dask_kubernetes.objects import make_service_from_dict + + service = make_service_from_dict({"spec": {"ports": [{"port": 8786, "name": "comm"}], "type": "NodePort"}}) + external_address = await get_external_address_for_scheduler_service( - scheduler.core_api, - scheduler.service, - nodeport_host=scheduler.kwargs["nodeport_host"], + None, service, nodeport_host="10.10.10.10", ) assert external_address == "tcp://10.10.10.10:8786" - assert scheduler.service.spec.type == "NodePort" @pytest.mark.asyncio From a8427f1c22c2c38dc1050b97b190d1fcd2c2c33f Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Fri, 13 Aug 2021 19:58:40 +0200 Subject: [PATCH 09/10] Ran pre-commit --- dask_kubernetes/core.py | 3 +-- dask_kubernetes/tests/test_async.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index cbb5f0b16..fb17858da 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -351,8 +351,7 @@ class KubeCluster(SpecCluster): >>> pod_spec = make_pod_spec(image='daskdev/dask:latest', ... memory_limit='4G', memory_request='4G', ... cpu_limit=1, cpu_request=1, - ... env={'EXTRA_PIP_PACKAGES': 'fastparquet - git+https://github.com/dask/distributed'}) + ... env={'EXTRA_PIP_PACKAGES': 'fastparquet git+https://github.com/dask/distributed'}) >>> cluster = KubeCluster(pod_spec) >>> cluster.scale(10) diff --git a/dask_kubernetes/tests/test_async.py b/dask_kubernetes/tests/test_async.py index 8409c9c36..2bbdc2a74 100644 --- a/dask_kubernetes/tests/test_async.py +++ b/dask_kubernetes/tests/test_async.py @@ -3,7 +3,6 @@ import getpass import os import random -import uuid from time import time import dask @@ -23,8 +22,7 @@ KubeConfig, KubeAuth, ) -from dask_kubernetes.core import Scheduler -from dask_kubernetes.utils import get_external_address_for_scheduler_service, escape +from dask_kubernetes.utils import get_external_address_for_scheduler_service TEST_DIR = os.path.abspath(os.path.join(__file__, "..")) CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml") @@ -397,11 +395,15 @@ async def test_passing_nodeport_host_to_scheduler_will_set_correct_host( k8s_cluster, pod_spec ): from dask_kubernetes.objects import make_service_from_dict - - service = make_service_from_dict({"spec": {"ports": [{"port": 8786, "name": "comm"}], "type": "NodePort"}}) - + + service = make_service_from_dict( + {"spec": {"ports": [{"port": 8786, "name": "comm"}], "type": "NodePort"}} + ) + external_address = await get_external_address_for_scheduler_service( - None, service, nodeport_host="10.10.10.10", + None, + service, + nodeport_host="10.10.10.10", ) assert external_address == "tcp://10.10.10.10:8786" From c62838bce322fbc72bdb69f401245eafc94671e8 Mon Sep 17 00:00:00 2001 From: Anders Bogsnes Date: Fri, 13 Aug 2021 20:00:07 +0200 Subject: [PATCH 10/10] Changed scheduler_service_type kwarg to be None --- dask_kubernetes/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_kubernetes/core.py b/dask_kubernetes/core.py index fb17858da..41d9598e5 100644 --- a/dask_kubernetes/core.py +++ b/dask_kubernetes/core.py @@ -425,7 +425,7 @@ def __init__( security=None, scheduler_service_wait_timeout=None, scheduler_pod_template=None, - scheduler_service_type="ClusterIP", + scheduler_service_type=None, nodeport_host=None, **kwargs ):