Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicitly set a host when using NodePort Service #352

Closed
wants to merge 12 commits into from
Closed
33 changes: 27 additions & 6 deletions dask_kubernetes/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def __init__(self, scheduler: str, name=None, **kwargs):


class Scheduler(Pod):
"""A Remote Dask Scheduler controled by Kubernetes
"""A Remote Dask Scheduler controlled by Kubernetes
Parameters
----------
idle_timeout: str, optional
Expand Down Expand Up @@ -191,14 +191,16 @@ async def start(self, **kwargs):
self.address = line.split("Scheduler at:")[1].strip()
await asyncio.sleep(0.1)

self.service = await self._create_service()
self.service = await self._create_service(
scheduler_service_type=self.kwargs.get("scheduler_service_type")
)
self.address = "tcp://{name}.{namespace}:{port}".format(
name=self.service.metadata.name,
namespace=self.namespace,
port=SCHEDULER_PORT,
)
self.external_address = await get_external_address_for_scheduler_service(
self.core_api, self.service
self.core_api, self.service, nodeport_host=self.kwargs["nodeport_host"]
)

self.pdb = await self._create_pdb()
Expand All @@ -214,7 +216,7 @@ async def close(self, **kwargs):
)
await super().close(**kwargs)

async def _create_service(self):
async def _create_service(self, scheduler_service_type=None):
service_template_dict = dask.config.get("kubernetes.scheduler-service-template")
self.service_template = clean_service_template(
make_service_from_dict(service_template_dict)
Expand All @@ -225,7 +227,8 @@ async def _create_service(self):
self.service_template.spec.selector["dask.org/cluster-name"] = self.cluster_name
if self.service_template.spec.type is None:
self.service_template.spec.type = dask.config.get(
"kubernetes.scheduler-service-type"
"kubernetes.scheduler-service-type",
override_with=scheduler_service_type,
)
await self.core_api.create_namespaced_service(
self.namespace, self.service_template
Expand Down Expand Up @@ -315,7 +318,11 @@ class KubeCluster(SpecCluster):
env: Dict[str, str]
Dictionary of environment variables to pass to worker pod
host: str
Listen address for local scheduler. Defaults to 0.0.0.0
Listen address for local scheduler. Only used when deploy_mode == "local". Defaults to
0.0.0.0
nodeport_host: str
When scheduler_service_type == ``"NodePort"``, select a specific node IP to connect to.
When this is not set the node IP will be autodetected.
port: int
Port of local scheduler
auth: List[ClusterAuth] (optional)
Expand All @@ -329,6 +336,9 @@ class KubeCluster(SpecCluster):
Timeout, in seconds, to wait for the remote scheduler service to be ready.
Defaults to 30 seconds.
Set to 0 to disable the timeout (not recommended).
scheduler_service_type: str
The Service type used to expose the scheduler. Can be one of ``"ClusterIP"``,
``"NodePort"`` or ``"LoadBalancer"``. Defaults to ``"ClusterIP"``
deploy_mode: str (optional)
Run the scheduler as "local" or "remote".
Defaults to ``"remote"``.
Expand Down Expand Up @@ -415,6 +425,8 @@ def __init__(
security=None,
scheduler_service_wait_timeout=None,
scheduler_pod_template=None,
scheduler_service_type=None,
nodeport_host=None,
**kwargs
):
if isinstance(pod_template, str):
Expand Down Expand Up @@ -451,6 +463,9 @@ def __init__(
"kubernetes.scheduler-service-wait-timeout",
override_with=scheduler_service_wait_timeout,
)
self._scheduler_service_type = dask.config.get(
"kubernetes.scheduler-service-type", override_with=scheduler_service_type
)
self.security = security
if self.security and not isinstance(
self.security, distributed.security.Security
Expand All @@ -461,6 +476,10 @@ def __init__(
self.host = dask.config.get("kubernetes.host", override_with=host)
self.port = dask.config.get("kubernetes.port", override_with=port)
self.env = dask.config.get("kubernetes.env", override_with=env)

self.nodeport_host = dask.config.get(
"kubernetes.scheduler-nodeport-host", override_with=nodeport_host
)
self.auth = auth
self.kwargs = kwargs
super().__init__(**self.kwargs)
Expand Down Expand Up @@ -578,6 +597,8 @@ async def _start(self):
"idle_timeout": self._idle_timeout,
"service_wait_timeout_s": self._scheduler_service_wait_timeout,
"pod_template": self.scheduler_pod_template,
"nodeport_host": self.nodeport_host,
"scheduler_service_type": self._scheduler_service_type,
**common_options,
},
}
Expand Down
1 change: 1 addition & 0 deletions dask_kubernetes/kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ kubernetes:
dashboard_address: ":8787"

scheduler-service-type: "ClusterIP"
scheduler-nodeport-host: null
# Timeout to wait for the scheduler service to be up (in seconds)
# Set it to 0 to wait indefinitely (not recommended)
scheduler-service-wait-timeout: 30
Expand Down
30 changes: 24 additions & 6 deletions dask_kubernetes/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import os
import random
from time import time
import yaml

import dask
import kubernetes_asyncio as kubernetes
import pytest

import dask
import yaml
from dask.distributed import Client, wait
from distributed.utils import tmpfile
from distributed.utils_test import captured_logger

import dask_kubernetes
from dask_kubernetes import (
KubeCluster,
Expand All @@ -20,9 +22,7 @@
KubeConfig,
KubeAuth,
)
from distributed.utils import tmpfile
from distributed.utils_test import captured_logger

from dask_kubernetes.utils import get_external_address_for_scheduler_service

TEST_DIR = os.path.abspath(os.path.join(__file__, ".."))
CONFIG_DEMO = os.path.join(TEST_DIR, "config-demo.yaml")
Expand Down Expand Up @@ -390,6 +390,24 @@ async def test_constructor_parameters(k8s_cluster, pod_spec):
assert pod.metadata.generate_name == "myname"


@pytest.mark.asyncio
async def test_passing_nodeport_host_to_scheduler_will_set_correct_host(
k8s_cluster, pod_spec
):
from dask_kubernetes.objects import make_service_from_dict

service = make_service_from_dict(
{"spec": {"ports": [{"port": 8786, "name": "comm"}], "type": "NodePort"}}
)

external_address = await get_external_address_for_scheduler_service(
None,
service,
nodeport_host="10.10.10.10",
)
assert external_address == "tcp://10.10.10.10:8786"


@pytest.mark.asyncio
async def test_reject_evicted_workers(cluster):
cluster.scale(1)
Expand Down
9 changes: 6 additions & 3 deletions dask_kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def namespace_default():


async def get_external_address_for_scheduler_service(
core_api, service, port_forward_cluster_ip=None
core_api, service, port_forward_cluster_ip=None, nodeport_host=None
):
"""Take a service object and return the scheduler address."""
[port] = [
Expand All @@ -53,8 +53,11 @@ async def get_external_address_for_scheduler_service(
lb = service.status.load_balancer.ingress[0]
host = lb.hostname or lb.ip
elif service.spec.type == "NodePort":
nodes = await core_api.list_node()
host = nodes.items[0].status.addresses[0].address
if nodeport_host is not None:
host = nodeport_host
else:
nodes = await core_api.list_node()
host = nodes.items[0].status.addresses[0].address
elif service.spec.type == "ClusterIP":
try:
# Try to resolve the service name. If we are inside the cluster this should succeeed.
Expand Down