From 10340f86a4991e58f678025d55f33dce34c45a5b Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 7 May 2024 00:51:38 -0700 Subject: [PATCH 1/7] [k8s] Add support for autoscaling kubernetes clusters (#3513) * Add Karpenter label formatter. * add autoscaler support * lint * lint * comments * comments * lint --- docs/source/reference/config.rst | 19 ++++++++++ sky/cli.py | 21 +++++++++-- sky/clouds/kubernetes.py | 22 +++++++---- sky/provision/kubernetes/utils.py | 63 ++++++++++++++++++++++++++++--- sky/utils/kubernetes_enums.py | 7 ++++ sky/utils/schemas.py | 7 ++++ 6 files changed, 121 insertions(+), 18 deletions(-) diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 3c1c02dd659..641ebede5e5 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -326,6 +326,25 @@ Available fields and semantics: # Default: 10 seconds provision_timeout: 10 + # Autoscaler configured in the Kubernetes cluster (optional) + # + # This field informs SkyPilot about the cluster autoscaler used in the + # Kubernetes cluster. Setting this field disables pre-launch checks for + # GPU capacity in the cluster and SkyPilot relies on the autoscaler to + # provision nodes with the required GPU capacity. + # + # Remember to set provision_timeout accordingly when using an autoscaler. + # + # Supported values: gke, karpenter, generic + # gke: uses cloud.google.com/gke-accelerator label to identify GPUs on nodes + # karpenter: uses karpenter.k8s.aws/instance-gpu-name label to identify GPUs on nodes + # generic: uses skypilot.co/accelerator labels to identify GPUs on nodes + # Refer to https://skypilot.readthedocs.io/en/latest/reference/kubernetes/kubernetes-setup.html#setting-up-gpu-support + # for more details on setting up labels for GPU support. + # + # Default: null (no autoscaler, autodetect label format for GPU nodes) + autoscaler: gke + # Additional fields to override the pod fields used by SkyPilot (optional) # # Any key:value pairs added here would get added to the pod spec used to diff --git a/sky/cli.py b/sky/cli.py index 485703e4caf..8d60de53e87 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -2998,6 +2998,11 @@ def _output(): name, quantity = None, None + # Kubernetes specific bools + cloud_is_kubernetes = isinstance(cloud_obj, clouds.Kubernetes) + kubernetes_autoscaling = kubernetes_utils.get_autoscaler_type( + ) is not None + if accelerator_str is None: result = service_catalog.list_accelerator_counts( gpus_only=True, @@ -3005,16 +3010,17 @@ def _output(): region_filter=region, ) - if (len(result) == 0 and cloud_obj is not None and - cloud_obj.is_same_cloud(clouds.Kubernetes())): + if len(result) == 0 and cloud_is_kubernetes: yield kubernetes_utils.NO_GPU_ERROR_MESSAGE + if kubernetes_autoscaling: + yield '\n' + yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE return # "Common" GPUs # If cloud is kubernetes, we want to show all GPUs here, even if # they are not listed as common in SkyPilot. - if (cloud_obj is not None and - cloud_obj.is_same_cloud(clouds.Kubernetes())): + if cloud_is_kubernetes: for gpu, _ in sorted(result.items()): gpu_table.add_row([gpu, _list_to_str(result.pop(gpu))]) else: @@ -3038,9 +3044,16 @@ def _output(): other_table.add_row([gpu, _list_to_str(qty)]) yield from other_table.get_string() yield '\n\n' + if (cloud_is_kubernetes or + cloud is None) and kubernetes_autoscaling: + yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE + yield '\n\n' else: yield ('\n\nHint: use -a/--all to see all accelerators ' '(including non-common ones) and pricing.') + if (cloud_is_kubernetes or + cloud is None) and kubernetes_autoscaling: + yield kubernetes_utils.KUBERNETES_AUTOSCALER_NOTE return else: # Parse accelerator string diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index 9777a28948b..be9111feac5 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -337,14 +337,20 @@ def _make(instance_list): gpu_task_cpus, gpu_task_memory, acc_count, acc_type).name) # Check if requested instance type will fit in the cluster. - # TODO(romilb): This will fail early for autoscaling clusters. - fits, reason = kubernetes_utils.check_instance_fits( - chosen_instance_type) - if not fits: - logger.debug(f'Instance type {chosen_instance_type} does ' - 'not fit in the Kubernetes cluster. ' - f'Reason: {reason}') - return [], [] + autoscaler_type = kubernetes_utils.get_autoscaler_type() + if autoscaler_type is None: + # If autoscaler is not set, check if the instance type fits in the + # cluster. Else, rely on the autoscaler to provision the right + # instance type without running checks. Worst case, if autoscaling + # fails, the pod will be stuck in pending state until + # provision_timeout, after which failover will be triggered. + fits, reason = kubernetes_utils.check_instance_fits( + chosen_instance_type) + if not fits: + logger.debug(f'Instance type {chosen_instance_type} does ' + 'not fit in the Kubernetes cluster. ' + f'Reason: {reason}') + return [], [] # No fuzzy lists for Kubernetes return _make([chosen_instance_type]), [] diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index c7c19680e07..b0b27f121fe 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -35,6 +35,12 @@ (e.g., skypilot.co/accelerator) are setup correctly. \ To further debug, run: sky check.' +KUBERNETES_AUTOSCALER_NOTE = ( + 'Note: Kubernetes cluster autoscaling is enabled. ' + 'All GPUs that can be provisioned may not be listed ' + 'here. Refer to your autoscaler\'s node pool ' + 'configuration to see the list of supported GPUs.') + # TODO(romilb): Add links to docs for configuration instructions when ready. ENDPOINTS_DEBUG_MESSAGE = ('Additionally, make sure your {endpoint_type} ' 'is configured correctly. ' @@ -178,13 +184,31 @@ def get_accelerator_from_label_value(cls, value: str) -> str: f'Invalid accelerator name in GKE cluster: {value}') +class KarpenterLabelFormatter(SkyPilotLabelFormatter): + """Karpeneter label formatter + Karpenter uses the label `karpenter.k8s.aws/instance-gpu-name` to identify + the GPU type. Details: https://karpenter.sh/docs/reference/instance-types/ + The naming scheme is same as the SkyPilot formatter, so we inherit from it. + """ + LABEL_KEY = 'karpenter.k8s.aws/instance-gpu-name' + + # LABEL_FORMATTER_REGISTRY stores the label formats SkyPilot will try to # discover the accelerator type from. The order of the list is important, as -# it will be used to determine the priority of the label formats. +# it will be used to determine the priority of the label formats when +# auto-detecting the GPU label type. LABEL_FORMATTER_REGISTRY = [ - SkyPilotLabelFormatter, CoreWeaveLabelFormatter, GKELabelFormatter + SkyPilotLabelFormatter, CoreWeaveLabelFormatter, GKELabelFormatter, + KarpenterLabelFormatter ] +# Mapping of autoscaler type to label formatter +AUTOSCALER_TO_LABEL_FORMATTER = { + kubernetes_enums.KubernetesAutoscalerType.GKE: GKELabelFormatter, + kubernetes_enums.KubernetesAutoscalerType.KARPENTER: KarpenterLabelFormatter, # pylint: disable=line-too-long + kubernetes_enums.KubernetesAutoscalerType.GENERIC: SkyPilotLabelFormatter, +} + def detect_gpu_label_formatter( ) -> Tuple[Optional[GPULabelFormatter], Dict[str, List[Tuple[str, str]]]]: @@ -348,10 +372,26 @@ def get_gpu_label_key_value(acc_type: str, check_mode=False) -> Tuple[str, str]: # Check if the cluster has GPU resources # TODO(romilb): This assumes the accelerator is a nvidia GPU. We # need to support TPUs and other accelerators as well. - # TODO(romilb): This will fail early for autoscaling clusters. - # For AS clusters, we may need a way for users to specify GPU node pools - # to use since the cluster may be scaling up from zero nodes and may not - # have any GPU nodes yet. + # TODO(romilb): Currently, we broadly disable all GPU checks if autoscaling + # is configured in config.yaml since the cluster may be scaling up from + # zero nodes and may not have any GPU nodes yet. In the future, we should + # support pollingthe clusters for autoscaling information, such as the + # node pools configured etc. + + autoscaler_type = get_autoscaler_type() + if autoscaler_type is not None: + # If autoscaler is set in config.yaml, override the label key and value + # to the autoscaler's format and bypass the GPU checks. + if check_mode: + # If check mode is enabled and autoscaler is set, we can return + # early since we assume the cluster autoscaler will handle GPU + # node provisioning. + return '', '' + formatter = AUTOSCALER_TO_LABEL_FORMATTER.get(autoscaler_type) + assert formatter is not None, ('Unsupported autoscaler type:' + f' {autoscaler_type}') + return formatter.get_label_key(), formatter.get_label_value(acc_type) + has_gpus, cluster_resources = detect_gpu_resource() if has_gpus: # Check if the cluster has GPU labels setup correctly @@ -1310,3 +1350,14 @@ def get_head_pod_name(cluster_name_on_cloud: str): # label, but since we know the naming convention, we can directly return the # head pod name. return f'{cluster_name_on_cloud}-head' + + +def get_autoscaler_type( +) -> Optional[kubernetes_enums.KubernetesAutoscalerType]: + """Returns the autoscaler type by reading from config""" + autoscaler_type = skypilot_config.get_nested(['kubernetes', 'autoscaler'], + None) + if autoscaler_type is not None: + autoscaler_type = kubernetes_enums.KubernetesAutoscalerType( + autoscaler_type) + return autoscaler_type diff --git a/sky/utils/kubernetes_enums.py b/sky/utils/kubernetes_enums.py index a08e95b4a08..6ebe924ea47 100644 --- a/sky/utils/kubernetes_enums.py +++ b/sky/utils/kubernetes_enums.py @@ -36,3 +36,10 @@ class KubernetesPortMode(enum.Enum): INGRESS = 'ingress' LOADBALANCER = 'loadbalancer' PODIP = 'podip' + + +class KubernetesAutoscalerType(enum.Enum): + """Enum for the different types of cluster autoscalers for Kubernetes.""" + GKE = 'gke' + KARPENTER = 'karpenter' + GENERIC = 'generic' diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 4ea74714f6c..bea6523ae05 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -645,6 +645,13 @@ def get_config_schema(): 'provision_timeout': { 'type': 'integer', }, + 'autoscaler': { + 'type': 'string', + 'case_insensitive_enum': [ + type.value + for type in kubernetes_enums.KubernetesAutoscalerType + ] + }, } }, 'oci': { From 904aa5cb6b59a550d39ae87a2bbfe32e4c53b8b4 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 7 May 2024 02:03:30 -0700 Subject: [PATCH 2/7] [AWS] Fix config syntax for IAM profile (#3514) * Fix syntax for IAM profile * fix doc * Update docs/source/reference/config.rst Co-authored-by: Tian Xia * format --------- Co-authored-by: Tian Xia --- docs/source/reference/config.rst | 18 ++++++++++-------- sky/backends/backend_utils.py | 4 ++-- sky/skypilot_config.py | 4 +++- sky/utils/schemas.py | 31 ++++++++++++++++++++++--------- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 641ebede5e5..19d943fb855 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -120,18 +120,20 @@ Available fields and semantics: # instances. SkyPilot will auto-create and reuse a service account (IAM # role) for AWS instances. # - # Customized service account (IAM role): or + # Customized service account (IAM role): or # - : apply the service account with the specified name to all instances. # Example: # remote_identity: my-service-account-name - # - : A dict mapping from the cluster name (pattern) to the service account name to use. - # NOTE: If none of the wildcard expressions in the dict match the cluster name, LOCAL_CREDENTIALS will be used. - # To specify your default, use "*" as the wildcard expression. - # Example: + # - : A list of single-element dict mapping from the cluster name (pattern) + # to the service account name to use. The matching of the cluster name is done in the same order + # as the list. + # NOTE: If none of the wildcard expressions in the dict match the cluster name, LOCAL_CREDENTIALS will be used. + # To specify your default, use "*" as the wildcard expression. + # Example: # remote_identity: - # my-cluster-name: my-service-account-1 - # sky-serve-controller-*: my-service-account-2 - # "*": my-default-service-account + # - my-cluster-name: my-service-account-1 + # - sky-serve-controller-*: my-service-account-2 + # - "*": my-default-service-account # # Two caveats of SERVICE_ACCOUNT for multicloud users: # diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 24fde0466f7..1b344ea6e43 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -801,8 +801,8 @@ def write_cluster_config( (str(cloud).lower(), 'remote_identity'), 'LOCAL_CREDENTIALS') if remote_identity is not None and not isinstance(remote_identity, str): for profile in remote_identity: - if fnmatch.fnmatchcase(cluster_name, profile): - remote_identity = remote_identity[profile] + if fnmatch.fnmatchcase(cluster_name, list(profile.keys())[0]): + remote_identity = list(profile.values())[0] break if remote_identity != 'LOCAL_CREDENTIALS': if not cloud.supports_service_account_on_remote(): diff --git a/sky/skypilot_config.py b/sky/skypilot_config.py index 10fc90fa850..5b205e2692a 100644 --- a/sky/skypilot_config.py +++ b/sky/skypilot_config.py @@ -152,7 +152,9 @@ def _try_load_config() -> None: common_utils.validate_schema( _dict, schemas.get_config_schema(), - f'Invalid config YAML ({config_path}): ', + f'Invalid config YAML ({config_path}). See: ' + 'https://skypilot.readthedocs.io/en/latest/reference/config.html. ' # pylint: disable=line-too-long + 'Error: ', skip_none=False) logger.debug('Config syntax check passed.') diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index bea6523ae05..e42924d348d 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -531,15 +531,28 @@ def get_cluster_schema(): _REMOTE_IDENTITY_SCHEMA_AWS = { 'remote_identity': { - 'oneOf': [{ - 'type': 'string' - }, { - 'type': 'object', - 'required': [], - 'additionalProperties': { - 'type': 'string', - }, - }] + 'oneOf': [ + { + 'type': 'string' + }, + { + # A list of single-element dict to pretain the order. + # Example: + # remote_identity: + # - my-cluster1-*: my-iam-role-1 + # - my-cluster2-*: my-iam-role-2 + # - "*"": my-iam-role-3 + 'type': 'array', + 'items': { + 'type': 'object', + 'additionalProperties': { + 'type': 'string' + }, + 'maxProperties': 1, + 'minProperties': 1, + }, + } + ] } } From 071614a7033e767aef71fcb1393f7ee76aca8fa6 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 7 May 2024 13:49:03 -0700 Subject: [PATCH 3/7] [Core] Allow a very long user program (#3512) * Allow a very long user program * Fix commnet * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Tian Xia * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Tian Xia --------- Co-authored-by: Tian Xia --- sky/backends/cloud_vm_ray_backend.py | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f916d931b5f..4d0fdb8d68b 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3144,6 +3144,36 @@ def _exec_code_on_head( code = job_lib.JobLibCodeGen.queue_job(job_id, job_submit_cmd) job_submit_cmd = ' && '.join([mkdir_code, create_script_code, code]) + if len(job_submit_cmd) > 120 * 1024: + # The maximum size of a command line arguments is 128 KB, i.e. the + # command executed with /bin/sh should be less than 128KB. + # https://github.com/torvalds/linux/blob/master/include/uapi/linux/binfmts.h + # If a user have very long run or setup commands, the generated + # command may exceed the limit, as we encode the script in base64 + # and directly include it in the job submission command. If the + # command is too long, we instead write it to a file, rsync and + # execute it. + # We use 120KB as a threshold to be safe for other arguments that + # might be added during ssh. + ssh_credentials = backend_utils.ssh_credential_from_yaml( + handle.cluster_yaml, handle.docker_user, handle.ssh_user) + head_ssh_port = handle.head_ssh_port + runner = command_runner.SSHCommandRunner(handle.head_ip, + port=head_ssh_port, + **ssh_credentials) + with tempfile.NamedTemporaryFile('w', prefix='sky_app_') as fp: + fp.write(codegen) + fp.flush() + script_path = os.path.join(SKY_REMOTE_APP_DIR, + f'sky_job_{job_id}') + # We choose to sync code + exec, because the alternative of 'ray + # submit' may not work as it may use system python (python2) to + # execute the script. Happens for AWS. + runner.rsync(source=fp.name, + target=script_path, + up=True, + stream_logs=False) + job_submit_cmd = f'{mkdir_code} && {code}' if managed_job_dag is not None: # Add the managed job to job queue database. From 40a0f570537c4c4e9b81b641ce98a9c34bc593ff Mon Sep 17 00:00:00 2001 From: JGSweets Date: Wed, 8 May 2024 01:42:32 -0500 Subject: [PATCH 4/7] [Core][AWS] Allow assumption of AWS Credentials provided in ECS Containers (#3503) * feat: add container-role * fix: comment * fix: update gitignore to include .env files --- .gitignore | 2 +- sky/clouds/aws.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f1dbf59a52f..efa74dd744b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,4 @@ sky_logs/ sky/clouds/service_catalog/data_fetchers/*.csv .vscode/ .idea/ - +.env diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 1fef481d8d0..b2d76e7b7df 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -81,6 +81,8 @@ class AWSIdentityType(enum.Enum): IAM_ROLE = 'iam-role' + CONTAINER_ROLE = 'container-role' + # Name Value Type Location # ---- ----- ---- -------- # profile None None @@ -545,6 +547,12 @@ def check_credentials(cls) -> Tuple[bool, Optional[str]]: # jobs-controller) created by an SSO account, i.e. the VM will be # assigned the IAM role: skypilot-v1. hints = f'AWS IAM role is set.{single_cloud_hint}' + elif identity_type == AWSIdentityType.CONTAINER_ROLE: + # Similar to the IAM ROLE, an ECS container may not store credentials + # in the~/.aws/credentials file. So we don't check for the existence of + # the file. i.e. the container will be assigned the IAM role of the + # task: skypilot-v1. + hints = f'AWS container-role is set.{single_cloud_hint}' else: # This file is required because it is required by the VMs launched on # other clouds to access private s3 buckets and resources like EC2. @@ -604,6 +612,8 @@ def _is_access_key_of_type(type_str: str) -> bool: return AWSIdentityType.SSO elif _is_access_key_of_type(AWSIdentityType.IAM_ROLE.value): return AWSIdentityType.IAM_ROLE + elif _is_access_key_of_type(AWSIdentityType.CONTAINER_ROLE.value): + return AWSIdentityType.CONTAINER_ROLE elif _is_access_key_of_type(AWSIdentityType.ENV.value): return AWSIdentityType.ENV else: From d09827b0ee4592c02574871fbb7875a961642bf6 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 7 May 2024 23:43:36 -0700 Subject: [PATCH 5/7] [Azure] Optimize autostopping speed for azure (#3519) Optimize autostopping speed for azure --- sky/skylet/autostop_lib.py | 8 +++++- sky/skylet/providers/azure/config.py | 2 ++ sky/skylet/providers/azure/node_provider.py | 28 +++++++++++++-------- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/sky/skylet/autostop_lib.py b/sky/skylet/autostop_lib.py index 687c04f5211..130e39fb425 100644 --- a/sky/skylet/autostop_lib.py +++ b/sky/skylet/autostop_lib.py @@ -75,10 +75,16 @@ def set_autostopping_started() -> None: configs.set_config(_AUTOSTOP_INDICATOR, str(psutil.boot_time())) -def get_is_autostopping_payload() -> str: +def get_is_autostopping() -> bool: """Returns whether the cluster is in the process of autostopping.""" result = configs.get_config(_AUTOSTOP_INDICATOR) is_autostopping = (result == str(psutil.boot_time())) + return is_autostopping + + +def get_is_autostopping_payload() -> str: + """Payload for whether the cluster is in the process of autostopping.""" + is_autostopping = get_is_autostopping() return common_utils.encode_payload(is_autostopping) diff --git a/sky/skylet/providers/azure/config.py b/sky/skylet/providers/azure/config.py index 0c1827a1141..a19273761ba 100644 --- a/sky/skylet/providers/azure/config.py +++ b/sky/skylet/providers/azure/config.py @@ -120,6 +120,8 @@ def _configure_resource_group(config): create_or_update = get_azure_sdk_function( client=resource_client.deployments, function_name="create_or_update" ) + # TODO (skypilot): this takes a long time (> 40 seconds) for stopping an + # azure VM, and this can be called twice during ray down. outputs = ( create_or_update( resource_group_name=resource_group, diff --git a/sky/skylet/providers/azure/node_provider.py b/sky/skylet/providers/azure/node_provider.py index 4b315f23589..068930eb390 100644 --- a/sky/skylet/providers/azure/node_provider.py +++ b/sky/skylet/providers/azure/node_provider.py @@ -15,6 +15,7 @@ bootstrap_azure, get_azure_sdk_function, ) +from sky.skylet import autostop_lib from sky.skylet.providers.command_runner import SkyDockerCommandRunner from sky.provision import docker_utils @@ -61,16 +62,23 @@ class AzureNodeProvider(NodeProvider): def __init__(self, provider_config, cluster_name): NodeProvider.__init__(self, provider_config, cluster_name) - # TODO(suquark): This is a temporary patch for resource group. - # By default, Ray autoscaler assumes the resource group is still here even - # after the whole cluster is destroyed. However, now we deletes the resource - # group after tearing down the cluster. To comfort the autoscaler, we need - # to create/update it here, so the resource group always exists. - from sky.skylet.providers.azure.config import _configure_resource_group - - _configure_resource_group( - {"cluster_name": cluster_name, "provider": provider_config} - ) + if not autostop_lib.get_is_autostopping(): + # TODO(suquark): This is a temporary patch for resource group. + # By default, Ray autoscaler assumes the resource group is still + # here even after the whole cluster is destroyed. However, now we + # deletes the resource group after tearing down the cluster. To + # comfort the autoscaler, we need to create/update it here, so the + # resource group always exists. + # + # We should not re-configure the resource group again, when it is + # running on the remote VM and the autostopping is in progress, + # because the VM is running which guarantees the resource group + # exists. + from sky.skylet.providers.azure.config import _configure_resource_group + + _configure_resource_group( + {"cluster_name": cluster_name, "provider": provider_config} + ) subscription_id = provider_config["subscription_id"] self.cache_stopped_nodes = provider_config.get("cache_stopped_nodes", True) # Sky only supports Azure CLI credential for now. From 0a03995f896f5f7775c24a506f4586af930a24a6 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 7 May 2024 23:49:39 -0700 Subject: [PATCH 6/7] [k8s] SkyServe on Kubernetes (#3377) * playing around * wip with hacks * wip refactor get_endpoints * working get_endpoints * wip * fixed circular import * Working for ingress and loadbalancer svc * lint * add purging from #3094 * Use local catalog on the controller too * use externalip if available * add dshm_size_limit * optimize dependency installation * Add todo * optimize ingress * fix * fix * remove autostop timing * Fix URLs for raw IP:ports * fixes * wip * SA wip * Allow use of service accounts through remote_identity field * Make purge work for no clusters in kubeconfig * Handle ingress namespace not present * setup optimizations and critical SA key fix * fix docs * fix docs * Add support for skypilot.co/external-ip annotation for ingress * Remove dshm_size_limit * Undo kind changes * Update service account docs * minor docs * update comment * is_same_cloud to cloud_in_list * refactor query_ports to use head_ip * autodown + http prefixing in callers * fix ssh key issues when user hash is reused * linting * lint * lint, HOST_CONTROLLERS * add serve smoke tests for k8s * disallow file_mounts and workdir if no storage cloud is enabled * minor * lint * update fastchat to use --host 127.0.0.1 * extend timeout * docs comments * rename to port * add to core.py * docstrs * add docs on exec based auth * expand elif * add lb comment * refactor * refactor * fix docs build * add PODIP mode support * make ssh services optional * nits * Revert "make ssh services optional" This reverts commit 87d4d25daff8471241eefb9349e18a0d8af1264b. * Revert "add PODIP mode support" This reverts commit 750d4d4dbeaea470ceb8bd7b708fd82dccbb5e81. * nits * use 0.0.0.0 when on k8s; use common impl for other clouds * return dict instead of raising errors in core.endpoints() * lint * merge fixes * merge fixes * merge fixes * lint * fix smoke tests * fix smoke tests * comment * add enum for remote identity * lint * add skip_status_check * remove zone requirement * fix timings for test * silence curl download * move jq from yaml to test_minimal * move jq from yaml to test_minimal --- .../cloud-setup/cloud-permissions/index.rst | 1 + .../cloud-permissions/kubernetes.rst | 234 ++++++++++++++++++ docs/source/reference/config.rst | 32 ++- .../reference/kubernetes/kubernetes-setup.rst | 18 +- docs/source/serving/sky-serve.rst | 3 +- examples/serve/gorilla/gorilla.yaml | 3 +- examples/serve/vicuna-v1.5.yaml | 3 +- llm/llama-2/chatbot-hf.yaml | 3 +- llm/vicuna-llama-2/serve.yaml | 3 +- llm/vicuna/serve-openai-api-endpoint.yaml | 3 +- llm/vicuna/serve.yaml | 3 +- sky/adaptors/kubernetes.py | 6 +- sky/authentication.py | 2 +- sky/backends/backend_utils.py | 122 ++++++++- sky/backends/cloud_vm_ray_backend.py | 8 + sky/cli.py | 111 ++++----- sky/clouds/kubernetes.py | 61 +++-- sky/core.py | 20 ++ sky/provision/__init__.py | 16 +- sky/provision/aws/__init__.py | 1 - sky/provision/aws/instance.py | 10 - sky/provision/azure/__init__.py | 1 - sky/provision/azure/instance.py | 11 - sky/provision/common.py | 41 ++- sky/provision/gcp/__init__.py | 1 - sky/provision/gcp/instance.py | 10 - sky/provision/kubernetes/config.py | 153 ++++++++++-- sky/provision/kubernetes/instance.py | 2 +- sky/provision/kubernetes/network.py | 4 +- sky/provision/kubernetes/network_utils.py | 12 +- sky/provision/kubernetes/utils.py | 93 ++++++- sky/serve/constants.py | 2 +- sky/serve/controller.py | 20 +- sky/serve/core.py | 10 +- sky/serve/load_balancer.py | 18 +- sky/serve/replica_managers.py | 25 +- sky/serve/serve_utils.py | 23 +- sky/templates/kubernetes-ray.yml.j2 | 81 +++++- sky/utils/common_utils.py | 29 ++- sky/utils/controller_utils.py | 47 +++- .../kubernetes/generate_static_kubeconfig.sh | 137 ++++++++++ sky/utils/schemas.py | 27 +- tests/skyserve/auto_restart.yaml | 1 - tests/skyserve/http/kubernetes.yaml | 15 ++ tests/skyserve/llm/service.yaml | 9 +- tests/skyserve/restart/user_bug.yaml | 1 - tests/test_smoke.py | 32 +-- 47 files changed, 1227 insertions(+), 241 deletions(-) create mode 100644 docs/source/cloud-setup/cloud-permissions/kubernetes.rst create mode 100755 sky/utils/kubernetes/generate_static_kubeconfig.sh create mode 100644 tests/skyserve/http/kubernetes.yaml diff --git a/docs/source/cloud-setup/cloud-permissions/index.rst b/docs/source/cloud-setup/cloud-permissions/index.rst index 873cbf339fc..e2a1aaf16ae 100644 --- a/docs/source/cloud-setup/cloud-permissions/index.rst +++ b/docs/source/cloud-setup/cloud-permissions/index.rst @@ -20,3 +20,4 @@ Table of Contents aws gcp vsphere + kubernetes diff --git a/docs/source/cloud-setup/cloud-permissions/kubernetes.rst b/docs/source/cloud-setup/cloud-permissions/kubernetes.rst new file mode 100644 index 00000000000..5318d76b1a3 --- /dev/null +++ b/docs/source/cloud-setup/cloud-permissions/kubernetes.rst @@ -0,0 +1,234 @@ +.. _cloud-permissions-kubernetes: + +Kubernetes +========== + +When running outside your Kubernetes cluster, SkyPilot uses your local ``~/.kube/config`` file +for authentication and creating resources on your Kubernetes cluster. + +When running inside your Kubernetes cluster (e.g., as a Spot controller or Serve controller), +SkyPilot can operate using either of the following three authentication methods: + +1. **Using your local kubeconfig file**: In this case, SkyPilot will + copy your local ``~/.kube/config`` file to the controller pod and use it for + authentication. This is the default method when running inside the cluster, + and no additional configuration is required. + + .. note:: + + If your cluster uses exec based authentication in your ``~/.kube/config`` file + (e.g., GKE uses exec auth by default), SkyPilot may not be able to authenticate using this method. In this case, + consider using the service account methods below. + +2. **Creating a service account**: SkyPilot can automatically create the service + account and roles for itself to manage resources in the Kubernetes cluster. + To use this method, set ``remote_identity: SERVICE_ACCOUNT`` to your + Kubernetes configuration in the :ref:`~/.sky/config.yaml ` file: + + .. code-block:: yaml + + kubernetes: + remote_identity: SERVICE_ACCOUNT + + For details on the permissions that are granted to the service account, + refer to the `Permissions required for SkyPilot`_ section below. + +3. **Using a custom service account**: If you have a custom service account + with the `necessary permissions `__, you can configure + SkyPilot to use it by adding this to your :ref:`~/.sky/config.yaml ` file: + + .. code-block:: yaml + + kubernetes: + remote_identity: your-service-account-name + +.. note:: + + Service account based authentication applies only when the remote SkyPilot + cluster (including spot and serve controller) is launched inside the + Kubernetes cluster. When running outside the cluster (e.g., on AWS), + SkyPilot will use the local ``~/.kube/config`` file for authentication. + +Below are the permissions required by SkyPilot and an example service account YAML that you can use to create a service account with the necessary permissions. + +.. _k8s-permissions: + +Permissions required for SkyPilot +--------------------------------- + +SkyPilot requires permissions equivalent to the following roles to be able to manage the resources in the Kubernetes cluster: + +.. code-block:: yaml + + # Namespaced role for the service account + # Required for creating pods, services and other necessary resources in the namespace. + # Note these permissions only apply in the namespace where SkyPilot is deployed. + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-sa-role + namespace: default + rules: + - apiGroups: ["*"] + resources: ["*"] + verbs: ["*"] + --- + # ClusterRole for accessing cluster-wide resources. Details for each resource below: + kind: ClusterRole + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-sa-cluster-role + namespace: default + labels: + parent: skypilot + rules: + - apiGroups: [""] + resources: ["nodes"] # Required for getting node resources. + verbs: ["get", "list", "watch"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["clusterroles", "clusterrolebindings"] # Required for launching more SkyPilot clusters from within the pod. + verbs: ["get", "list", "watch"] + - apiGroups: ["node.k8s.io"] + resources: ["runtimeclasses"] # Required for autodetecting the runtime class of the nodes. + verbs: ["get", "list", "watch"] + --- + # Optional: If using ingresses, role for accessing ingress service IP + apiVersion: rbac.authorization.k8s.io/v1 + kind: Role + metadata: + namespace: ingress-nginx + name: sky-sa-role-ingress-nginx + rules: + - apiGroups: [""] + resources: ["services"] + verbs: ["list", "get"] + +These roles must apply to both the user account configured in the kubeconfig file and the service account used by SkyPilot (if configured). + +.. _k8s-sa-example: + +Example using Custom Service Account +------------------------------------ + +To create a service account that has the necessary permissions for SkyPilot, you can use the following YAML: + +.. code-block:: yaml + + # create-sky-sa.yaml + kind: ServiceAccount + apiVersion: v1 + metadata: + name: sky-sa + namespace: default + labels: + parent: skypilot + --- + # Role for the service account + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-sa-role + namespace: default + labels: + parent: skypilot + rules: + - apiGroups: ["*"] # Required for creating pods, services, secrets and other necessary resources in the namespace. + resources: ["*"] + verbs: ["*"] + --- + # RoleBinding for the service account + kind: RoleBinding + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-sa-rb + namespace: default + labels: + parent: skypilot + subjects: + - kind: ServiceAccount + name: sky-sa + roleRef: + kind: Role + name: sky-sa-role + apiGroup: rbac.authorization.k8s.io + --- + # Role for accessing ingress resources + apiVersion: rbac.authorization.k8s.io/v1 + kind: Role + metadata: + namespace: ingress-nginx + name: sky-sa-role-ingress-nginx + rules: + - apiGroups: [""] + resources: ["services"] + verbs: ["list", "get", "watch"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["roles", "rolebindings"] + verbs: ["list", "get", "watch"] + --- + # RoleBinding for accessing ingress resources + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + name: sky-sa-rolebinding-ingress-nginx + namespace: ingress-nginx + subjects: + - kind: ServiceAccount + name: sky-sa + namespace: default + roleRef: + kind: Role + name: sky-sa-role-ingress-nginx + apiGroup: rbac.authorization.k8s.io + --- + # ClusterRole for the service account + kind: ClusterRole + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + name: sky-sa-cluster-role + namespace: default + labels: + parent: skypilot + rules: + - apiGroups: [""] + resources: ["nodes"] # Required for getting node resources. + verbs: ["get", "list", "watch"] + - apiGroups: ["rbac.authorization.k8s.io"] + resources: ["clusterroles", "clusterrolebindings"] # Required for launching more SkyPilot clusters from within the pod. + verbs: ["get", "list", "watch"] + - apiGroups: ["node.k8s.io"] + resources: ["runtimeclasses"] # Required for autodetecting the runtime class of the nodes. + verbs: ["get", "list", "watch"] + - apiGroups: ["networking.k8s.io"] # Required for exposing services. + resources: ["ingressclasses"] + verbs: ["get", "list", "watch"] + --- + # ClusterRoleBinding for the service account + apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + name: sky-sa-cluster-role-binding + namespace: default + labels: + parent: skypilot + subjects: + - kind: ServiceAccount + name: sky-sa + namespace: default + roleRef: + kind: ClusterRole + name: sky-sa-cluster-role + apiGroup: rbac.authorization.k8s.io + +Create the service account using the following command: + +.. code-block:: bash + + $ kubectl apply -f create-sky-sa.yaml + +After creating the service account, configure SkyPilot to use it through ``~/.sky/config.yaml``: + +.. code-block:: yaml + + kubernetes: + remote_identity: sky-sa # Or your service account name diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 19d943fb855..e7594142331 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -205,21 +205,21 @@ Available fields and semantics: # Reserved capacity (optional). - # + # # Whether to prioritize reserved instance types/locations (considered as 0 # cost) in the optimizer. - # + # # If you have "automatically consumed" reservations in your GCP project: # Setting this to true guarantees the optimizer will pick any matching # reservation and GCP will auto consume your reservation, and setting to # false means optimizer uses regular, non-zero pricing in optimization (if # by chance any matching reservation is selected, GCP still auto consumes # the reservation). - # + # # If you have "specifically targeted" reservations (set by the # `specific_reservations` field below): This field will automatically be set # to true. - # + # # Default: false. prioritize_reservations: false # @@ -298,6 +298,30 @@ Available fields and semantics: # Default: loadbalancer ports: loadbalancer + # Identity to use for all Kubernetes pods (optional). + # + # LOCAL_CREDENTIALS: The user's local ~/.kube/config will be uploaded to the + # Kubernetes pods created by SkyPilot. They are used for authenticating with + # the Kubernetes API server and launching new pods (e.g., for + # spot/serve controllers). + # + # SERVICE_ACCOUNT: Local ~/.kube/config is not uploaded to Kubernetes pods. + # SkyPilot will auto-create and reuse a service account with necessary roles + # in the user's namespace. + # + # : The name of a service account to use for all Kubernetes pods. + # This service account must exist in the user's namespace and have all + # necessary permissions. Refer to https://skypilot.readthedocs.io/en/latest/cloud-setup/cloud-permissions/kubernetes.html + # for details on the roles required by the service account. + # + # Using SERVICE_ACCOUNT or a custom service account only affects Kubernetes + # instances. Local ~/.kube/config will still be uploaded to non-Kubernetes + # instances (e.g., a serve controller on GCP or AWS may need to provision + # Kubernetes resources). + # + # Default: 'LOCAL_CREDENTIALS'. + remote_identity: my-k8s-service-account + # Attach custom metadata to Kubernetes objects created by SkyPilot # # Uses the same schema as Kubernetes metadata object: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#objectmeta-v1-meta diff --git a/docs/source/reference/kubernetes/kubernetes-setup.rst b/docs/source/reference/kubernetes/kubernetes-setup.rst index fca4d539327..3ed1b8c89f0 100644 --- a/docs/source/reference/kubernetes/kubernetes-setup.rst +++ b/docs/source/reference/kubernetes/kubernetes-setup.rst @@ -382,7 +382,7 @@ To use this mode: # ingress-nginx-controller LoadBalancer 10.24.4.254 35.202.58.117 80:31253/TCP,443:32699/TCP .. note:: - If the ``EXTERNAL-IP`` field is ````, you must manually assign an External IP. + If the ``EXTERNAL-IP`` field is ````, you may manually assign it an External IP. This can be done by patching the service with an IP that can be accessed from outside the cluster. If the service type is ``NodePort``, you can set the ``EXTERNAL-IP`` to any node's IP address: @@ -395,6 +395,22 @@ To use this mode: If the ``EXTERNAL-IP`` field is left as ````, SkyPilot will use ``localhost`` as the external IP for the Ingress, and the endpoint may not be accessible from outside the cluster. +.. note:: + If you cannot update the ``EXTERNAL-IP`` field of the service, you can also + specify the Ingress IP or hostname through the ``skypilot.co/external-ip`` + annotation on the ``ingress-nginx-controller`` service. In this case, + having a valid ``EXTERNAL-IP`` field is not required. + + For example, if your ``ingress-nginx-controller`` service is ``NodePort``: + + .. code-block:: bash + + # Add skypilot.co/external-ip annotation to the nginx ingress service. + # Replace in the following command with the IP you select. + # Can be any node's IP if using NodePort service type. + $ kubectl annotate service ingress-nginx-controller skypilot.co/external-ip= -n ingress-nginx + + 3. Update the :ref:`SkyPilot config ` at :code:`~/.sky/config` to use the ingress mode. .. code-block:: yaml diff --git a/docs/source/serving/sky-serve.rst b/docs/source/serving/sky-serve.rst index eb2daa6ffb8..1c4ee3f2751 100644 --- a/docs/source/serving/sky-serve.rst +++ b/docs/source/serving/sky-serve.rst @@ -308,11 +308,12 @@ Let's bring up a real LLM chat service with FastChat + Vicuna. We'll use the `Vi conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path lmsys/vicuna-${MODEL_SIZE}b-v1.3 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/examples/serve/gorilla/gorilla.yaml b/examples/serve/gorilla/gorilla.yaml index ee46aa94568..e3072d816fb 100644 --- a/examples/serve/gorilla/gorilla.yaml +++ b/examples/serve/gorilla/gorilla.yaml @@ -35,11 +35,12 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path gorilla-llm/gorilla-falcon-7b-hf-v0 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/examples/serve/vicuna-v1.5.yaml b/examples/serve/vicuna-v1.5.yaml index c94115ea3d7..0f659e85697 100644 --- a/examples/serve/vicuna-v1.5.yaml +++ b/examples/serve/vicuna-v1.5.yaml @@ -34,11 +34,12 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path lmsys/vicuna-${MODEL_SIZE}b-v1.5 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/llm/llama-2/chatbot-hf.yaml b/llm/llama-2/chatbot-hf.yaml index 4c0132e4dd4..992c01346e6 100644 --- a/llm/llama-2/chatbot-hf.yaml +++ b/llm/llama-2/chatbot-hf.yaml @@ -24,12 +24,13 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path meta-llama/Llama-2-${MODEL_SIZE}b-chat-hf \ --num-gpus $SKYPILOT_NUM_GPUS_PER_NODE 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/llm/vicuna-llama-2/serve.yaml b/llm/vicuna-llama-2/serve.yaml index 0a98dab5d26..69f89f2fc28 100644 --- a/llm/vicuna-llama-2/serve.yaml +++ b/llm/vicuna-llama-2/serve.yaml @@ -27,11 +27,12 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path /skypilot-vicuna 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/llm/vicuna/serve-openai-api-endpoint.yaml b/llm/vicuna/serve-openai-api-endpoint.yaml index 247043ee3c2..639dfadc6d6 100644 --- a/llm/vicuna/serve-openai-api-endpoint.yaml +++ b/llm/vicuna/serve-openai-api-endpoint.yaml @@ -19,11 +19,12 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path lmsys/vicuna-${MODEL_SIZE}b-v1.3 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/llm/vicuna/serve.yaml b/llm/vicuna/serve.yaml index d458112a42f..49185fcea20 100644 --- a/llm/vicuna/serve.yaml +++ b/llm/vicuna/serve.yaml @@ -19,11 +19,12 @@ run: | conda activate chatbot echo 'Starting controller...' - python -u -m fastchat.serve.controller > ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ --model-path lmsys/vicuna-${MODEL_SIZE}b-v1.3 2>&1 \ + --host 127.0.0.1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/sky/adaptors/kubernetes.py b/sky/adaptors/kubernetes.py index ce6f93a8905..f4c84d3f578 100644 --- a/sky/adaptors/kubernetes.py +++ b/sky/adaptors/kubernetes.py @@ -55,9 +55,9 @@ def _load_config(): ' If you were running a local Kubernetes ' 'cluster, run `sky local up` to start the cluster.') else: - err_str = ( - 'Failed to load Kubernetes configuration. ' - f'Please check if your kubeconfig file is valid.{suffix}') + err_str = ('Failed to load Kubernetes configuration. ' + 'Please check if your kubeconfig file exists at ' + f'~/.kube/config and is valid.{suffix}') err_str += '\nTo disable Kubernetes for SkyPilot: run `sky check`.' with ux_utils.print_exception_no_traceback(): raise ValueError(err_str) from None diff --git a/sky/authentication.py b/sky/authentication.py index 581fdc12c7f..966dad670c5 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -408,7 +408,7 @@ def setup_kubernetes_authentication(config: Dict[str, Any]) -> Dict[str, Any]: # Add the user's public key to the SkyPilot cluster. public_key_path = os.path.expanduser(PUBLIC_SSH_KEY_PATH) secret_name = clouds.Kubernetes.SKY_SSH_KEY_SECRET_NAME - secret_field_name = clouds.Kubernetes.SKY_SSH_KEY_SECRET_FIELD_NAME + secret_field_name = clouds.Kubernetes().ssh_key_secret_field_name namespace = kubernetes_utils.get_current_kube_config_context_namespace() k8s = kubernetes.kubernetes with open(public_key_path, 'r', encoding='utf-8') as f: diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 1b344ea6e43..b0856ed1909 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -48,7 +48,9 @@ from sky.utils import common_utils from sky.utils import controller_utils from sky.utils import env_options +from sky.utils import resources_utils from sky.utils import rich_utils +from sky.utils import schemas from sky.utils import subprocess_utils from sky.utils import timeline from sky.utils import ux_utils @@ -109,6 +111,9 @@ # Remote dir that holds our runtime files. _REMOTE_RUNTIME_FILES_DIR = '~/.sky/.runtime_files' +_ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, ' + 'please retry after a while.') + # Include the fields that will be used for generating tags that distinguishes # the cluster in ray, to avoid the stopped cluster being discarded due to # updates in the yaml template. @@ -798,13 +803,14 @@ def write_cluster_config( assert cluster_name is not None excluded_clouds = [] remote_identity = skypilot_config.get_nested( - (str(cloud).lower(), 'remote_identity'), 'LOCAL_CREDENTIALS') + (str(cloud).lower(), 'remote_identity'), + schemas.REMOTE_IDENTITY_DEFAULT) if remote_identity is not None and not isinstance(remote_identity, str): for profile in remote_identity: if fnmatch.fnmatchcase(cluster_name, list(profile.keys())[0]): remote_identity = list(profile.values())[0] break - if remote_identity != 'LOCAL_CREDENTIALS': + if remote_identity != schemas.RemoteIdentityOptions.LOCAL_CREDENTIALS.value: if not cloud.supports_service_account_on_remote(): raise exceptions.InvalidCloudConfigs( 'remote_identity: SERVICE_ACCOUNT is specified in ' @@ -2712,3 +2718,115 @@ def check_stale_runtime_on_remote(returncode: int, stderr: str, f'not interrupted): {colorama.Style.BRIGHT}sky start -f -y ' f'{cluster_name}{colorama.Style.RESET_ALL}' f'\n--- Details ---\n{stderr.strip()}\n') + + +def get_endpoints(cluster: str, + port: Optional[Union[int, str]] = None, + skip_status_check: bool = False) -> Dict[int, str]: + """Gets the endpoint for a given cluster and port number (endpoint). + + Args: + cluster: The name of the cluster. + port: The port number to get the endpoint for. If None, endpoints + for all ports are returned. + skip_status_check: Whether to skip the status check for the cluster. + This is useful when the cluster is known to be in a INIT state + and the caller wants to query the endpoints. Used by serve + controller to query endpoints during cluster launch when multiple + services may be getting launched in parallel (and as a result, + the controller may be in INIT status due to a concurrent launch). + + Returns: A dictionary of port numbers to endpoints. If endpoint is None, + the dictionary will contain all ports:endpoints exposed on the cluster. + If the endpoint is not exposed yet (e.g., during cluster launch or + waiting for cloud provider to expose the endpoint), an empty dictionary + is returned. + + Raises: + ValueError: if the port is invalid or the cloud provider does not + support querying endpoints. + exceptions.ClusterNotUpError: if the cluster is not in UP status. + """ + # Cast endpoint to int if it is not None + if port is not None: + try: + port = int(port) + except ValueError: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Invalid endpoint {port!r}.') from None + cluster_records = get_clusters(include_controller=True, + refresh=False, + cluster_names=[cluster]) + cluster_record = cluster_records[0] + if (not skip_status_check and + cluster_record['status'] != status_lib.ClusterStatus.UP): + with ux_utils.print_exception_no_traceback(): + raise exceptions.ClusterNotUpError( + f'Cluster {cluster_record["name"]!r} ' + 'is not in UP status.', cluster_record['status']) + handle = cluster_record['handle'] + if not isinstance(handle, backends.CloudVmRayResourceHandle): + with ux_utils.print_exception_no_traceback(): + raise ValueError('Querying IP address is not supported ' + f'for cluster {cluster!r} with backend ' + f'{get_backend_from_handle(handle).NAME}.') + + launched_resources = handle.launched_resources + cloud = launched_resources.cloud + try: + cloud.check_features_are_supported( + launched_resources, {clouds.CloudImplementationFeatures.OPEN_PORTS}) + except exceptions.NotSupportedError: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Querying endpoints is not supported ' + f'for cluster {cluster!r} on {cloud}.') from None + + config = common_utils.read_yaml(handle.cluster_yaml) + port_details = provision_lib.query_ports(repr(cloud), + handle.cluster_name_on_cloud, + handle.launched_resources.ports, + head_ip=handle.head_ip, + provider_config=config['provider']) + + # Validation before returning the endpoints + if port is not None: + # If the requested endpoint was not to be exposed + port_set = resources_utils.port_ranges_to_set( + handle.launched_resources.ports) + if port not in port_set: + logger.warning(f'Port {port} is not exposed on ' + f'cluster {cluster!r}.') + return {} + # If the user requested a specific port endpoint, check if it is exposed + if port not in port_details: + error_msg = (f'Port {port} not exposed yet. ' + f'{_ENDPOINTS_RETRY_MESSAGE} ') + if handle.launched_resources.cloud.is_same_cloud( + clouds.Kubernetes()): + # Add Kubernetes specific debugging info + error_msg += (kubernetes_utils.get_endpoint_debug_message()) + logger.warning(error_msg) + return {} + return {port: port_details[port][0].url()} + else: + if not port_details: + # If cluster had no ports to be exposed + if handle.launched_resources.ports is None: + logger.warning(f'Cluster {cluster!r} does not have any ' + 'ports to be exposed.') + return {} + # Else ports have not been exposed even though they exist. + # In this case, ask the user to retry. + else: + error_msg = (f'No endpoints exposed yet. ' + f'{_ENDPOINTS_RETRY_MESSAGE} ') + if handle.launched_resources.cloud.is_same_cloud( + clouds.Kubernetes()): + # Add Kubernetes specific debugging info + error_msg += \ + kubernetes_utils.get_endpoint_debug_message() + logger.warning(error_msg) + return {} + return { + port_num: urls[0].url() for port_num, urls in port_details.items() + } diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 4d0fdb8d68b..ca152a75f91 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4007,6 +4007,14 @@ def post_teardown_cleanup(self, pass except exceptions.PortDoesNotExistError: logger.debug('Ports do not exist. Skipping cleanup.') + except Exception as e: # pylint: disable=broad-except + if purge: + logger.warning( + f'Failed to cleanup ports. Skipping since purge is ' + f'set. Details: ' + f'{common_utils.format_exception(e, use_bracket=True)}') + else: + raise # The cluster file must exist because the cluster_yaml will only # be removed after the cluster entry in the database is removed. diff --git a/sky/cli.py b/sky/cli.py index 8d60de53e87..2e863f2eef7 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -52,7 +52,6 @@ from sky import exceptions from sky import global_user_state from sky import jobs as managed_jobs -from sky import provision as provision_lib from sky import serve as serve_lib from sky import sky_logging from sky import status_lib @@ -1650,71 +1649,28 @@ def status(all: bool, refresh: bool, ip: bool, endpoints: bool, head_ip = handle.external_ips()[0] if show_endpoints: - launched_resources = handle.launched_resources - cloud = launched_resources.cloud - try: - cloud.check_features_are_supported( - launched_resources, - {clouds.CloudImplementationFeatures.OPEN_PORTS}) - except exceptions.NotSupportedError: - with ux_utils.print_exception_no_traceback(): - raise ValueError('Querying endpoints is not supported ' - f'for {cloud}.') from None - - config = common_utils.read_yaml(handle.cluster_yaml) - port_details = provision_lib.query_ports( - repr(cloud), handle.cluster_name_on_cloud, - handle.launched_resources.ports, config['provider']) - - if endpoint is not None: - # If cluster had no ports to be exposed - ports_set = resources_utils.port_ranges_to_set( - handle.launched_resources.ports) - if endpoint not in ports_set: - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Port {endpoint} is not exposed ' - 'on cluster ' - f'{cluster_record["name"]!r}.') - # If the user requested a specific port endpoint - if endpoint not in port_details: - error_msg = (f'Port {endpoint} not exposed yet. ' - f'{_ENDPOINTS_RETRY_MESSAGE} ') - if handle.launched_resources.cloud.is_same_cloud( - clouds.Kubernetes()): - # Add Kubernetes specific debugging info - error_msg += ( - kubernetes_utils.get_endpoint_debug_message()) - with ux_utils.print_exception_no_traceback(): - raise RuntimeError(error_msg) - click.echo(port_details[endpoint][0].url(ip=head_ip)) - return - - if not port_details: - # If cluster had no ports to be exposed - if handle.launched_resources.ports is None: - with ux_utils.print_exception_no_traceback(): - raise ValueError('Cluster does not have any ports ' - 'to be exposed.') - # Else wait for the ports to be exposed - else: - error_msg = (f'No endpoints exposed yet. ' - f'{_ENDPOINTS_RETRY_MESSAGE} ') - if handle.launched_resources.cloud.is_same_cloud( - clouds.Kubernetes()): - # Add Kubernetes specific debugging info - error_msg += \ - kubernetes_utils.get_endpoint_debug_message() - with ux_utils.print_exception_no_traceback(): - raise RuntimeError(error_msg) - - for port, urls in port_details.items(): - click.echo( - f'{colorama.Fore.BLUE}{colorama.Style.BRIGHT}{port}' - f'{colorama.Style.RESET_ALL}: ' - f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' - f'{urls[0].url(ip=head_ip)}{colorama.Style.RESET_ALL}') + if endpoint: + cluster_endpoint = core.endpoints(cluster_record['name'], + endpoint).get( + endpoint, None) + if not cluster_endpoint: + raise click.Abort( + f'Endpoint {endpoint} not found for cluster ' + f'{cluster_record["name"]!r}.') + click.echo(cluster_endpoint) + else: + cluster_endpoints = core.endpoints(cluster_record['name']) + assert isinstance(cluster_endpoints, dict) + if not cluster_endpoints: + raise click.Abort(f'No endpoint found for cluster ' + f'{cluster_record["name"]!r}.') + for port, port_endpoint in cluster_endpoints.items(): + click.echo( + f'{colorama.Fore.BLUE}{colorama.Style.BRIGHT}{port}' + f'{colorama.Style.RESET_ALL}: ' + f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' + f'{port_endpoint}{colorama.Style.RESET_ALL}') return - click.echo(head_ip) return hints = [] @@ -2590,6 +2546,17 @@ def down( def _hint_or_raise_for_down_jobs_controller(controller_name: str): + """Helper function to check job controller status before tearing it down. + + Raises helpful exceptions and errors if the controller is not in a safe + state to be torn down. + + Raises: + RuntimeError: if failed to get the job queue. + exceptions.NotSupportedError: if the controller is not in a safe state + to be torn down (e.g., because it has jobs running or + it is in init state) + """ controller = controller_utils.Controllers.from_name(controller_name) assert controller is not None, controller_name @@ -2633,6 +2600,17 @@ def _hint_or_raise_for_down_jobs_controller(controller_name: str): def _hint_or_raise_for_down_sky_serve_controller(controller_name: str): + """Helper function to check serve controller status before tearing it down. + + Raises helpful exceptions and errors if the controller is not in a safe + state to be torn down. + + Raises: + RuntimeError: if failed to get the service status. + exceptions.NotSupportedError: if the controller is not in a safe state + to be torn down (e.g., because it has services running or + it is in init state) + """ controller = controller_utils.Controllers.from_name(controller_name) assert controller is not None, controller_name with rich_utils.safe_status('[bold cyan]Checking for live services[/]'): @@ -2756,7 +2734,8 @@ def _down_or_stop_clusters( # managed job or service. We should make this check atomic # with the termination. hint_or_raise(controller_name) - except exceptions.ClusterOwnerIdentityMismatchError as e: + except (exceptions.ClusterOwnerIdentityMismatchError, + RuntimeError) as e: if purge: click.echo(common_utils.format_exception(e)) else: diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index be9111feac5..5740e0ed9b1 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -14,6 +14,7 @@ from sky.provision.kubernetes import utils as kubernetes_utils from sky.utils import common_utils from sky.utils import resources_utils +from sky.utils import schemas if typing.TYPE_CHECKING: # Renaming to avoid shadowing variables. @@ -36,9 +37,8 @@ class Kubernetes(clouds.Cloud): """Kubernetes.""" SKY_SSH_KEY_SECRET_NAME = 'sky-ssh-keys' - SKY_SSH_KEY_SECRET_FIELD_NAME = \ - f'ssh-publickey-{common_utils.get_user_hash()}' SKY_SSH_JUMP_NAME = 'sky-ssh-jump-pod' + SKY_DEFAULT_SERVICE_ACCOUNT_NAME = 'skypilot-service-account' PORT_FORWARD_PROXY_CMD_TEMPLATE = \ 'kubernetes-port-forward-proxy-command.sh.j2' PORT_FORWARD_PROXY_CMD_PATH = '~/.sky/port-forward-proxy-cmd.sh' @@ -52,6 +52,8 @@ class Kubernetes(clouds.Cloud): timeout = skypilot_config.get_nested(['kubernetes', 'provision_timeout'], 10) + _SUPPORTS_SERVICE_ACCOUNT_ON_REMOTE = True + _DEFAULT_NUM_VCPUS = 2 _DEFAULT_MEMORY_CPU_RATIO = 1 _DEFAULT_MEMORY_CPU_RATIO_WITH_GPU = 4 # Allocate more memory for GPU tasks @@ -71,13 +73,6 @@ class Kubernetes(clouds.Cloud): 'tiers are not ' 'supported in ' 'Kubernetes.', - # Kubernetes may be using exec-based auth, which may not work by - # directly copying the kubeconfig file to the controller. - # Support for service accounts for auth will be added in #3377, which - # will allow us to support hosting controllers. - clouds.CloudImplementationFeatures.HOST_CONTROLLERS: 'Kubernetes can ' - 'not host ' - 'controllers.', } IMAGE_CPU = 'skypilot:cpu-ubuntu-2004' @@ -86,11 +81,24 @@ class Kubernetes(clouds.Cloud): PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT STATUS_VERSION = clouds.StatusVersion.SKYPILOT + @property + def ssh_key_secret_field_name(self): + # Use a fresh user hash to avoid conflicts in the secret object naming. + # This can happen when the controller is reusing the same user hash + # through USER_ID_ENV_VAR but has a different SSH key. + fresh_user_hash = common_utils.get_user_hash(force_fresh_hash=True) + return f'ssh-publickey-{fresh_user_hash}' + @classmethod def _unsupported_features_for_resources( cls, resources: 'resources_lib.Resources' ) -> Dict[clouds.CloudImplementationFeatures, str]: unsupported_features = cls._CLOUD_UNSUPPORTED_FEATURES + is_exec_auth, message = kubernetes_utils.is_kubeconfig_exec_auth() + if is_exec_auth: + assert isinstance(message, str), message + unsupported_features[ + clouds.CloudImplementationFeatures.HOST_CONTROLLERS] = message return unsupported_features @classmethod @@ -261,6 +269,23 @@ def make_deploy_resources_variables( port_mode = network_utils.get_port_mode(None) + remote_identity = skypilot_config.get_nested( + ('kubernetes', 'remote_identity'), schemas.REMOTE_IDENTITY_DEFAULT) + if (remote_identity == + schemas.RemoteIdentityOptions.LOCAL_CREDENTIALS.value): + # SA name doesn't matter since automounting credentials is disabled + k8s_service_account_name = 'default' + k8s_automount_sa_token = 'false' + elif (remote_identity == + schemas.RemoteIdentityOptions.SERVICE_ACCOUNT.value): + # Use the default service account + k8s_service_account_name = self.SKY_DEFAULT_SERVICE_ACCOUNT_NAME + k8s_automount_sa_token = 'true' + else: + # User specified a custom service account + k8s_service_account_name = remote_identity + k8s_automount_sa_token = 'true' + fuse_device_required = bool(resources.requires_fuse) deploy_vars = { @@ -279,6 +304,8 @@ def make_deploy_resources_variables( 'k8s_acc_label_value': k8s_acc_label_value, 'k8s_ssh_jump_name': self.SKY_SSH_JUMP_NAME, 'k8s_ssh_jump_image': ssh_jump_image, + 'k8s_service_account_name': k8s_service_account_name, + 'k8s_automount_sa_token': k8s_automount_sa_token, 'k8s_fuse_device_required': fuse_device_required, # Namespace to run the FUSE device manager in 'k8s_fuse_device_manager_namespace': _SKY_SYSTEM_NAMESPACE, @@ -357,16 +384,12 @@ def _make(instance_list): @classmethod def check_credentials(cls) -> Tuple[bool, Optional[str]]: - if os.path.exists(os.path.expanduser(CREDENTIAL_PATH)): - # Test using python API - try: - return kubernetes_utils.check_credentials() - except Exception as e: # pylint: disable=broad-except - return (False, 'Credential check failed: ' - f'{common_utils.format_exception(e)}') - else: - return (False, 'Credentials not found - ' - f'check if {CREDENTIAL_PATH} exists.') + # Test using python API + try: + return kubernetes_utils.check_credentials() + except Exception as e: # pylint: disable=broad-except + return (False, 'Credential check failed: ' + f'{common_utils.format_exception(e)}') def get_credential_file_mounts(self) -> Dict[str, str]: if os.path.exists(os.path.expanduser(CREDENTIAL_PATH)): diff --git a/sky/core.py b/sky/core.py index c71a3fa9734..b5ecc483354 100644 --- a/sky/core.py +++ b/sky/core.py @@ -109,6 +109,26 @@ def status(cluster_names: Optional[Union[str, List[str]]] = None, cluster_names=cluster_names) +def endpoints(cluster: str, + port: Optional[Union[int, str]] = None) -> Dict[int, str]: + """Gets the endpoint for a given cluster and port number (endpoint). + + Args: + cluster: The name of the cluster. + port: The port number to get the endpoint for. If None, endpoints + for all ports are returned.. + + Returns: A dictionary of port numbers to endpoints. If endpoint is None, + the dictionary will contain all ports:endpoints exposed on the cluster. + + Raises: + ValueError: if the cluster is not UP or the endpoint is not exposed. + RuntimeError: if the cluster has no ports to be exposed or no endpoints + are exposed yet. + """ + return backend_utils.get_endpoints(cluster=cluster, port=port) + + @usage_lib.entrypoint def cost_report() -> List[Dict[str, Any]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 9dc73a54a53..2f9a5bda44c 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -41,8 +41,12 @@ def _wrapper(*args, **kwargs): module = globals().get(module_name) assert module is not None, f'Unknown provider: {module_name}' - impl = getattr(module, func.__name__) - return impl(*args, **kwargs) + impl = getattr(module, func.__name__, None) + if impl: + return impl(*args, **kwargs) + + # If implementation does not exist, fall back to default implementation + return func(provider_name, *args, **kwargs) return _wrapper @@ -141,13 +145,19 @@ def query_ports( provider_name: str, cluster_name_on_cloud: str, ports: List[str], + head_ip: Optional[str] = None, provider_config: Optional[Dict[str, Any]] = None, ) -> Dict[int, List[common.Endpoint]]: """Query details about ports on a cluster. + If head_ip is provided, it may be used by the cloud implementation to + return the endpoint without querying the cloud provider. If head_ip is not + provided, the cloud provider will be queried to get the endpoint info. + Returns a dict with port as the key and a list of common.Endpoint. """ - raise NotImplementedError + del provider_name, provider_config, cluster_name_on_cloud # unused + return common.query_ports_passthrough(ports, head_ip) @_route_to_cloud_impl diff --git a/sky/provision/aws/__init__.py b/sky/provision/aws/__init__.py index bcbe646f219..e569d3b042e 100644 --- a/sky/provision/aws/__init__.py +++ b/sky/provision/aws/__init__.py @@ -5,7 +5,6 @@ from sky.provision.aws.instance import get_cluster_info from sky.provision.aws.instance import open_ports from sky.provision.aws.instance import query_instances -from sky.provision.aws.instance import query_ports from sky.provision.aws.instance import run_instances from sky.provision.aws.instance import stop_instances from sky.provision.aws.instance import terminate_instances diff --git a/sky/provision/aws/instance.py b/sky/provision/aws/instance.py index b9fdf80326d..bdf1650665f 100644 --- a/sky/provision/aws/instance.py +++ b/sky/provision/aws/instance.py @@ -876,13 +876,3 @@ def get_cluster_info( instances=instances, head_instance_id=head_instance_id, ) - - -def query_ports( - cluster_name_on_cloud: str, - ports: List[str], - provider_config: Optional[Dict[str, Any]] = None, -) -> Dict[int, List[common.Endpoint]]: - """See sky/provision/__init__.py""" - return common.query_ports_passthrough(cluster_name_on_cloud, ports, - provider_config) diff --git a/sky/provision/azure/__init__.py b/sky/provision/azure/__init__.py index 9c87fc907db..b83dbb462d9 100644 --- a/sky/provision/azure/__init__.py +++ b/sky/provision/azure/__init__.py @@ -2,4 +2,3 @@ from sky.provision.azure.instance import cleanup_ports from sky.provision.azure.instance import open_ports -from sky.provision.azure.instance import query_ports diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index dc7b23dee5c..de5c7cbf0e9 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -4,7 +4,6 @@ from sky import sky_logging from sky.adaptors import azure -from sky.provision import common from sky.utils import ux_utils logger = sky_logging.init_logger(__name__) @@ -94,13 +93,3 @@ def cleanup_ports( # Azure will automatically cleanup network security groups when cleanup # resource group. So we don't need to do anything here. del cluster_name_on_cloud, ports, provider_config # Unused. - - -def query_ports( - cluster_name_on_cloud: str, - ports: List[str], - provider_config: Optional[Dict[str, Any]] = None, -) -> Dict[int, List[common.Endpoint]]: - """See sky/provision/__init__.py""" - return common.query_ports_passthrough(cluster_name_on_cloud, ports, - provider_config) diff --git a/sky/provision/common.py b/sky/provision/common.py index 75178b15623..dbcb9e659e6 100644 --- a/sky/provision/common.py +++ b/sky/provision/common.py @@ -4,7 +4,7 @@ import os from typing import Any, Dict, List, Optional, Tuple -from sky.utils.resources_utils import port_ranges_to_set +from sky.utils import resources_utils # NOTE: we can use pydantic instead of dataclasses or namedtuples, because # pydantic provides more features like validation or parsing from @@ -201,7 +201,7 @@ class Endpoint: pass @abc.abstractmethod - def url(self, ip: str): + def url(self, override_ip: Optional[str] = None) -> str: raise NotImplementedError @@ -211,44 +211,41 @@ class SocketEndpoint(Endpoint): port: Optional[int] host: str = '' - def url(self, ip: str): - if not self.host: - self.host = ip - return f'{self.host}{":" + str(self.port) if self.port else ""}' + def url(self, override_ip: Optional[str] = None) -> str: + host = override_ip if override_ip else self.host + return f'{host}{":" + str(self.port) if self.port else ""}' @dataclasses.dataclass class HTTPEndpoint(SocketEndpoint): - """HTTP endpoint accesible via a url.""" + """HTTP endpoint accessible via a url.""" path: str = '' - def url(self, ip: str): - del ip # Unused. - return f'http://{os.path.join(super().url(self.host), self.path)}' + def url(self, override_ip: Optional[str] = None) -> str: + host = override_ip if override_ip else self.host + return f'http://{os.path.join(super().url(host), self.path)}' @dataclasses.dataclass class HTTPSEndpoint(SocketEndpoint): - """HTTPS endpoint accesible via a url.""" + """HTTPS endpoint accessible via a url.""" path: str = '' - def url(self, ip: str): - del ip # Unused. - return f'https://{os.path.join(super().url(self.host), self.path)}' + def url(self, override_ip: Optional[str] = None) -> str: + host = override_ip if override_ip else self.host + return f'https://{os.path.join(super().url(host), self.path)}' def query_ports_passthrough( - cluster_name_on_cloud: str, ports: List[str], - provider_config: Optional[Dict[str, Any]] = None, + head_ip: Optional[str], ) -> Dict[int, List[Endpoint]]: - """Common function to query ports for AWS, GCP and Azure. + """Common function to get endpoints for AWS, GCP and Azure. - Returns a list of socket endpoint with empty host and the input ports.""" - del cluster_name_on_cloud, provider_config # Unused. - ports = list(port_ranges_to_set(ports)) + Returns a list of socket endpoint using head_ip and ports.""" + assert head_ip is not None, head_ip + ports = list(resources_utils.port_ranges_to_set(ports)) result: Dict[int, List[Endpoint]] = {} for port in ports: - result[port] = [SocketEndpoint(port=port)] - + result[port] = [SocketEndpoint(port=port, host=head_ip)] return result diff --git a/sky/provision/gcp/__init__.py b/sky/provision/gcp/__init__.py index fdadd5345e2..0d24a577690 100644 --- a/sky/provision/gcp/__init__.py +++ b/sky/provision/gcp/__init__.py @@ -5,7 +5,6 @@ from sky.provision.gcp.instance import get_cluster_info from sky.provision.gcp.instance import open_ports from sky.provision.gcp.instance import query_instances -from sky.provision.gcp.instance import query_ports from sky.provision.gcp.instance import run_instances from sky.provision.gcp.instance import stop_instances from sky.provision.gcp.instance import terminate_instances diff --git a/sky/provision/gcp/instance.py b/sky/provision/gcp/instance.py index e7f69f8c6eb..35c8ae44dc8 100644 --- a/sky/provision/gcp/instance.py +++ b/sky/provision/gcp/instance.py @@ -615,13 +615,3 @@ def cleanup_ports( firewall_rule_name = provider_config['firewall_rule'] instance_utils.GCPComputeInstance.delete_firewall_rule( project_id, firewall_rule_name) - - -def query_ports( - cluster_name_on_cloud: str, - ports: List[str], - provider_config: Optional[Dict[str, Any]] = None, -) -> Dict[int, List[common.Endpoint]]: - """See sky/provision/__init__.py""" - return common.query_ports_passthrough(cluster_name_on_cloud, ports, - provider_config) diff --git a/sky/provision/kubernetes/config.py b/sky/provision/kubernetes/config.py index ef1926ac9ce..d5c30133ef2 100644 --- a/sky/provision/kubernetes/config.py +++ b/sky/provision/kubernetes/config.py @@ -30,12 +30,47 @@ def bootstrap_instances( if config.provider_config.get('fuse_device_required', False): _configure_fuse_mounting(config.provider_config) - if not config.provider_config.get('_operator'): - # These steps are unecessary when using the Operator. + requested_service_account = config.node_config['spec']['serviceAccountName'] + if requested_service_account == 'skypilot-service-account': + # If the user has requested a different service account (via pod_config + # in ~/.sky/config.yaml), we assume they have already set up the + # necessary roles and role bindings. + # If not, set up the roles and bindings for skypilot-service-account + # here. _configure_autoscaler_service_account(namespace, config.provider_config) - _configure_autoscaler_role(namespace, config.provider_config) - _configure_autoscaler_role_binding(namespace, config.provider_config) - + _configure_autoscaler_role(namespace, + config.provider_config, + role_field='autoscaler_role') + _configure_autoscaler_role_binding( + namespace, + config.provider_config, + binding_field='autoscaler_role_binding') + _configure_autoscaler_cluster_role(namespace, config.provider_config) + _configure_autoscaler_cluster_role_binding(namespace, + config.provider_config) + if config.provider_config.get('port_mode', 'loadbalancer') == 'ingress': + logger.info('Port mode is set to ingress, setting up ingress role ' + 'and role binding.') + try: + _configure_autoscaler_role(namespace, + config.provider_config, + role_field='autoscaler_ingress_role') + _configure_autoscaler_role_binding( + namespace, + config.provider_config, + binding_field='autoscaler_ingress_role_binding') + except kubernetes.api_exception() as e: + # If namespace is not found, we will ignore the error + if e.status == 404: + logger.info( + 'Namespace not found - is your nginx ingress installed?' + ' Skipping ingress role and role binding setup.') + else: + raise e + + elif requested_service_account != 'default': + logger.info(f'Using service account {requested_service_account!r}, ' + 'skipping role and role binding setup.') return config @@ -214,9 +249,16 @@ def _configure_autoscaler_service_account( f'{created_msg(account_field, name)}') -def _configure_autoscaler_role(namespace: str, - provider_config: Dict[str, Any]) -> None: - role_field = 'autoscaler_role' +def _configure_autoscaler_role(namespace: str, provider_config: Dict[str, Any], + role_field: str) -> None: + """ Reads the role from the provider config, creates if it does not exist. + + Args: + namespace: The namespace to create the role in. + provider_config: The provider config. + role_field: The field in the provider config that contains the role. + """ + if role_field not in provider_config: logger.info('_configure_autoscaler_role: ' f'{not_provided_msg(role_field)}') @@ -225,8 +267,8 @@ def _configure_autoscaler_role(namespace: str, role = provider_config[role_field] if 'namespace' not in role['metadata']: role['metadata']['namespace'] = namespace - elif role['metadata']['namespace'] != namespace: - raise InvalidNamespaceError(role_field, namespace) + else: + namespace = role['metadata']['namespace'] name = role['metadata']['name'] field_selector = f'metadata.name={name}' @@ -245,8 +287,16 @@ def _configure_autoscaler_role(namespace: str, def _configure_autoscaler_role_binding(namespace: str, - provider_config: Dict[str, Any]) -> None: - binding_field = 'autoscaler_role_binding' + provider_config: Dict[str, Any], + binding_field: str) -> None: + """ Reads the role binding from the config, creates if it does not exist. + + Args: + namespace: The namespace to create the role binding in. + provider_config: The provider config. + binding_field: The field in the provider config that contains the role + """ + if binding_field not in provider_config: logger.info('_configure_autoscaler_role_binding: ' f'{not_provided_msg(binding_field)}') @@ -255,8 +305,10 @@ def _configure_autoscaler_role_binding(namespace: str, binding = provider_config[binding_field] if 'namespace' not in binding['metadata']: binding['metadata']['namespace'] = namespace - elif binding['metadata']['namespace'] != namespace: - raise InvalidNamespaceError(binding_field, namespace) + rb_namespace = namespace + else: + rb_namespace = binding['metadata']['namespace'] + for subject in binding['subjects']: if 'namespace' not in subject: subject['namespace'] = namespace @@ -268,7 +320,7 @@ def _configure_autoscaler_role_binding(namespace: str, name = binding['metadata']['name'] field_selector = f'metadata.name={name}' accounts = (kubernetes.auth_api().list_namespaced_role_binding( - namespace, field_selector=field_selector).items) + rb_namespace, field_selector=field_selector).items) if len(accounts) > 0: assert len(accounts) == 1 logger.info('_configure_autoscaler_role_binding: ' @@ -277,11 +329,80 @@ def _configure_autoscaler_role_binding(namespace: str, logger.info('_configure_autoscaler_role_binding: ' f'{not_found_msg(binding_field, name)}') - kubernetes.auth_api().create_namespaced_role_binding(namespace, binding) + kubernetes.auth_api().create_namespaced_role_binding(rb_namespace, binding) logger.info('_configure_autoscaler_role_binding: ' f'{created_msg(binding_field, name)}') +def _configure_autoscaler_cluster_role(namespace, + provider_config: Dict[str, Any]) -> None: + role_field = 'autoscaler_cluster_role' + if role_field not in provider_config: + logger.info('_configure_autoscaler_cluster_role: ' + f'{not_provided_msg(role_field)}') + return + + role = provider_config[role_field] + if 'namespace' not in role['metadata']: + role['metadata']['namespace'] = namespace + elif role['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(role_field, namespace) + + name = role['metadata']['name'] + field_selector = f'metadata.name={name}' + accounts = (kubernetes.auth_api().list_cluster_role( + field_selector=field_selector).items) + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info('_configure_autoscaler_cluster_role: ' + f'{using_existing_msg(role_field, name)}') + return + + logger.info('_configure_autoscaler_cluster_role: ' + f'{not_found_msg(role_field, name)}') + kubernetes.auth_api().create_cluster_role(role) + logger.info( + f'_configure_autoscaler_cluster_role: {created_msg(role_field, name)}') + + +def _configure_autoscaler_cluster_role_binding( + namespace, provider_config: Dict[str, Any]) -> None: + binding_field = 'autoscaler_cluster_role_binding' + if binding_field not in provider_config: + logger.info('_configure_autoscaler_cluster_role_binding: ' + f'{not_provided_msg(binding_field)}') + return + + binding = provider_config[binding_field] + if 'namespace' not in binding['metadata']: + binding['metadata']['namespace'] = namespace + elif binding['metadata']['namespace'] != namespace: + raise InvalidNamespaceError(binding_field, namespace) + for subject in binding['subjects']: + if 'namespace' not in subject: + subject['namespace'] = namespace + elif subject['namespace'] != namespace: + subject_name = subject['name'] + raise InvalidNamespaceError( + binding_field + f' subject {subject_name}', namespace) + + name = binding['metadata']['name'] + field_selector = f'metadata.name={name}' + accounts = (kubernetes.auth_api().list_cluster_role_binding( + field_selector=field_selector).items) + if len(accounts) > 0: + assert len(accounts) == 1 + logger.info('_configure_autoscaler_cluster_role_binding: ' + f'{using_existing_msg(binding_field, name)}') + return + + logger.info('_configure_autoscaler_cluster_role_binding: ' + f'{not_found_msg(binding_field, name)}') + kubernetes.auth_api().create_cluster_role_binding(binding) + logger.info('_configure_autoscaler_cluster_role_binding: ' + f'{created_msg(binding_field, name)}') + + def _configure_ssh_jump(namespace, config: common.ProvisionConfig): """Creates a SSH jump pod to connect to the cluster. diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 51484f1f579..9129d39e586 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -218,7 +218,7 @@ def _wait_for_pods_to_run(namespace, new_nodes): node.metadata.name, namespace) # Continue if pod and all the containers within the - # pod are succesfully created and running. + # pod are successfully created and running. if pod.status.phase == 'Running' and all( container.state.running for container in pod.status.container_statuses): diff --git a/sky/provision/kubernetes/network.py b/sky/provision/kubernetes/network.py index 4abde138b1a..61870cb9119 100644 --- a/sky/provision/kubernetes/network.py +++ b/sky/provision/kubernetes/network.py @@ -73,7 +73,7 @@ def _open_ports_using_ingress( 'https://github.com/kubernetes/ingress-nginx/blob/main/docs/deploy/index.md.' # pylint: disable=line-too-long ) - # Prepare service names, ports, for template rendering + # Prepare service names, ports, for template rendering service_details = [ (f'{cluster_name_on_cloud}-skypilot-service--{port}', port, _PATH_PREFIX.format(cluster_name_on_cloud=cluster_name_on_cloud, @@ -177,9 +177,11 @@ def _cleanup_ports_for_ingress( def query_ports( cluster_name_on_cloud: str, ports: List[str], + head_ip: Optional[str] = None, provider_config: Optional[Dict[str, Any]] = None, ) -> Dict[int, List[common.Endpoint]]: """See sky/provision/__init__.py""" + del head_ip # unused assert provider_config is not None, 'provider_config is required' port_mode = network_utils.get_port_mode( provider_config.get('port_mode', None)) diff --git a/sky/provision/kubernetes/network_utils.py b/sky/provision/kubernetes/network_utils.py index 19abe01888d..836d75af41f 100644 --- a/sky/provision/kubernetes/network_utils.py +++ b/sky/provision/kubernetes/network_utils.py @@ -199,11 +199,17 @@ def get_ingress_external_ip_and_ports( ingress_service = ingress_services[0] if ingress_service.status.load_balancer.ingress is None: - # Try to use assigned external IP if it exists, - # otherwise return 'localhost' + # We try to get an IP/host for the service in the following order: + # 1. Try to use assigned external IP if it exists + # 2. Use the skypilot.co/external-ip annotation in the service + # 3. Otherwise return 'localhost' + ip = None if ingress_service.spec.external_i_ps is not None: ip = ingress_service.spec.external_i_ps[0] - else: + elif ingress_service.metadata.annotations is not None: + ip = ingress_service.metadata.annotations.get( + 'skypilot.co/external-ip', None) + if ip is None: ip = 'localhost' ports = ingress_service.spec.ports http_port = [port for port in ports if port.name == 'http'][0].node_port diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index b0b27f121fe..4c26c0c2199 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -18,6 +18,7 @@ from sky.utils import common_utils from sky.utils import env_options from sky.utils import kubernetes_enums +from sky.utils import schemas from sky.utils import ux_utils DEFAULT_NAMESPACE = 'default' @@ -549,20 +550,106 @@ def check_credentials(timeout: int = kubernetes.API_TIMEOUT) -> \ except Exception as e: # pylint: disable=broad-except return False, ('An error occurred: ' f'{common_utils.format_exception(e, use_bracket=True)}') - # If we reach here, the credentials are valid and Kubernetes cluster is up + + # If we reach here, the credentials are valid and Kubernetes cluster is up. + # We now do softer checks to check if exec based auth is used and to + # see if the cluster is GPU-enabled. + + _, exec_msg = is_kubeconfig_exec_auth() + # We now check if GPUs are available and labels are set correctly on the # cluster, and if not we return hints that may help debug any issues. # This early check avoids later surprises for user when they try to run # `sky launch --gpus ` and the optimizer does not list Kubernetes as a # provider if their cluster GPUs are not setup correctly. + gpu_msg = '' try: _, _ = get_gpu_label_key_value(acc_type='', check_mode=True) except exceptions.ResourcesUnavailableError as e: # If GPUs are not available, we return cluster as enabled (since it can # be a CPU-only cluster) but we also return the exception message which # serves as a hint for how to enable GPU access. - return True, f'{e}' - return True, None + gpu_msg = str(e) + if exec_msg and gpu_msg: + return True, f'{gpu_msg}\n Additionally, {exec_msg}' + elif gpu_msg: + return True, gpu_msg + elif exec_msg: + return True, exec_msg + else: + return True, None + + +def is_kubeconfig_exec_auth() -> Tuple[bool, Optional[str]]: + """Checks if the kubeconfig file uses exec-based authentication + + Exec-based auth is commonly used for authenticating with cloud hosted + Kubernetes services, such as GKE. Here is an example snippet from a + kubeconfig using exec-based authentication for a GKE cluster: + - name: mycluster + user: + exec: + apiVersion: client.authentication.k8s.io/v1beta1 + command: /Users/romilb/google-cloud-sdk/bin/gke-gcloud-auth-plugin + installHint: Install gke-gcloud-auth-plugin ... + provideClusterInfo: true + + + Using exec-based authentication is problematic when used in conjunction + with kubernetes.remote_identity = LOCAL_CREDENTIAL in ~/.sky/config.yaml. + This is because the exec-based authentication may not have the relevant + dependencies installed on the remote cluster or may have hardcoded paths + that are not available on the remote cluster. + + Returns: + bool: True if exec-based authentication is used and LOCAL_CREDENTIAL + mode is used for remote_identity in ~/.sky/config.yaml. + str: Error message if exec-based authentication is used, None otherwise + """ + k8s = kubernetes.kubernetes + try: + k8s.config.load_kube_config() + except kubernetes.config_exception(): + # Using service account token or other auth methods, continue + return False, None + + # Get active context and user from kubeconfig using k8s api + _, current_context = k8s.config.list_kube_config_contexts() + target_username = current_context['context']['user'] + + # K8s api does not provide a mechanism to get the user details from the + # context. We need to load the kubeconfig file and parse it to get the + # user details. + kubeconfig_path = os.path.expanduser( + os.getenv('KUBECONFIG', + k8s.config.kube_config.KUBE_CONFIG_DEFAULT_LOCATION)) + # Load the kubeconfig file as a dictionary + with open(kubeconfig_path, 'r', encoding='utf-8') as f: + kubeconfig = yaml.safe_load(f) + + user_details = kubeconfig['users'] + + # Find user matching the target username + user_details = next( + user for user in user_details if user['name'] == target_username) + + remote_identity = skypilot_config.get_nested( + ('kubernetes', 'remote_identity'), schemas.REMOTE_IDENTITY_DEFAULT) + if ('exec' in user_details.get('user', {}) and remote_identity + == schemas.RemoteIdentityOptions.LOCAL_CREDENTIALS.value): + ctx_name = current_context['name'] + exec_msg = ('exec-based authentication is used for ' + f'Kubernetes context {ctx_name!r}.' + ' This may cause issues when running Managed Jobs ' + 'or SkyServe controller on Kubernetes. To fix, configure ' + 'SkyPilot to create a service account for running pods by ' + 'adding the following in ~/.sky/config.yaml:\n' + ' kubernetes:\n' + ' remote_identity: SERVICE_ACCOUNT\n' + ' More: https://skypilot.readthedocs.io/en/latest/' + 'reference/config.html') + return True, exec_msg + return False, None def get_current_kube_config_context_name() -> Optional[str]: diff --git a/sky/serve/constants.py b/sky/serve/constants.py index 17d52b2f167..07f3e837ed4 100644 --- a/sky/serve/constants.py +++ b/sky/serve/constants.py @@ -69,7 +69,7 @@ # automatically generated from this start port. CONTROLLER_PORT_START = 20001 LOAD_BALANCER_PORT_START = 30001 -LOAD_BALANCER_PORT_RANGE = '30001-30100' +LOAD_BALANCER_PORT_RANGE = '30001-30020' # Initial version of service. INITIAL_VERSION = 1 diff --git a/sky/serve/controller.py b/sky/serve/controller.py index b9d18d3eb58..8d7964f090b 100644 --- a/sky/serve/controller.py +++ b/sky/serve/controller.py @@ -3,6 +3,7 @@ Responsible for autoscaling and replica management. """ import logging +import os import threading import time import traceback @@ -39,7 +40,7 @@ class SkyServeController: """ def __init__(self, service_name: str, service_spec: serve.SkyServiceSpec, - task_yaml: str, port: int) -> None: + task_yaml: str, host: str, port: int) -> None: self._service_name = service_name self._replica_manager: replica_managers.ReplicaManager = ( replica_managers.SkyPilotReplicaManager(service_name=service_name, @@ -47,6 +48,7 @@ def __init__(self, service_name: str, service_spec: serve.SkyServiceSpec, task_yaml_path=task_yaml)) self._autoscaler: autoscalers.Autoscaler = ( autoscalers.Autoscaler.from_spec(service_name, service_spec)) + self._host = host self._port = port self._app = fastapi.FastAPI() @@ -150,15 +152,25 @@ def configure_logger(): threading.Thread(target=self._run_autoscaler).start() logger.info('SkyServe Controller started on ' - f'http://localhost:{self._port}') + f'http://{self._host}:{self._port}') - uvicorn.run(self._app, host='localhost', port=self._port) + uvicorn.run(self._app, host={self._host}, port=self._port) # TODO(tian): Probably we should support service that will stop the VM in # specific time period. def run_controller(service_name: str, service_spec: serve.SkyServiceSpec, task_yaml: str, controller_port: int): - controller = SkyServeController(service_name, service_spec, task_yaml, + # We expose the controller to the public network when running inside a + # kubernetes cluster to allow external load balancers (example, for + # high availability load balancers) to communicate with the controller. + def _get_host(): + if 'KUBERNETES_SERVICE_HOST' in os.environ: + return '0.0.0.0' + else: + return 'localhost' + + host = _get_host() + controller = SkyServeController(service_name, service_spec, task_yaml, host, controller_port) controller.run() diff --git a/sky/serve/core.py b/sky/serve/core.py index 086fca02984..9680b90de0c 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -186,13 +186,14 @@ def up( # whether the service is already running. If the id is the same # with the current job id, we know the service is up and running # for the first time; otherwise it is a name conflict. + idle_minutes_to_autodown = constants.CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP controller_job_id, controller_handle = sky.launch( task=controller_task, stream_logs=False, cluster_name=controller_name, detach_run=True, - idle_minutes_to_autostop=constants. - CONTROLLER_IDLE_MINUTES_TO_AUTOSTOP, + idle_minutes_to_autostop=idle_minutes_to_autodown, + down=True, retry_until_up=True, _disable_controller_check=True, ) @@ -253,7 +254,10 @@ def up( else: lb_port = serve_utils.load_service_initialization_result( lb_port_payload) - endpoint = f'{controller_handle.head_ip}:{lb_port}' + endpoint = backend_utils.get_endpoints( + controller_handle.cluster_name, lb_port, + skip_status_check=True).get(lb_port) + assert endpoint is not None, 'Did not get endpoint for controller.' sky_logging.print( f'{fore.CYAN}Service name: ' diff --git a/sky/serve/load_balancer.py b/sky/serve/load_balancer.py index b3b8fe5403e..7864e242148 100644 --- a/sky/serve/load_balancer.py +++ b/sky/serve/load_balancer.py @@ -87,7 +87,7 @@ async def _redirect_handler(self, request: fastapi.Request): 'Use "sky serve status [SERVICE_NAME]" ' 'to check the replica status.') - path = f'http://{ready_replica_url}{request.url.path}' + path = f'{ready_replica_url}{request.url.path}' logger.info(f'Redirecting request to {path}') return fastapi.responses.RedirectResponse(url=path) @@ -114,3 +114,19 @@ def run_load_balancer(controller_addr: str, load_balancer_port: int): load_balancer = SkyServeLoadBalancer(controller_url=controller_addr, load_balancer_port=load_balancer_port) load_balancer.run() + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--controller-addr', + required=True, + default='127.0.0.1', + help='The address of the controller.') + parser.add_argument('--load-balancer-port', + type=int, + required=True, + default=8890, + help='The port where the load balancer listens to.') + args = parser.parse_args() + run_load_balancer(args.controller_addr, args.load_balancer_port) diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index 70e5ba2c6dd..efb3ba3cf48 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -17,6 +17,7 @@ import sky from sky import backends +from sky import core from sky import exceptions from sky import global_user_state from sky import sky_logging @@ -428,7 +429,20 @@ def url(self) -> Optional[str]: handle = self.handle() if handle is None: return None - return f'{handle.head_ip}:{self.replica_port}' + replica_port_int = int(self.replica_port) + try: + endpoint_dict = core.endpoints(handle.cluster_name, + replica_port_int) + except exceptions.ClusterNotUpError: + return None + endpoint = endpoint_dict.get(replica_port_int, None) + if not endpoint: + return None + assert isinstance(endpoint, str), endpoint + # If replica doesn't start with http or https, add http:// + if not endpoint.startswith('http'): + endpoint = 'http://' + endpoint + return endpoint @property def status(self) -> serve_state.ReplicaStatus: @@ -446,6 +460,7 @@ def to_info_dict(self, with_handle: bool) -> Dict[str, Any]: 'name': self.cluster_name, 'status': self.status, 'version': self.version, + 'endpoint': self.url, 'is_spot': self.is_spot, 'launched_at': (cluster_record['launched_at'] if cluster_record is not None else None), @@ -487,7 +502,13 @@ def probe( try: msg = '' # TODO(tian): Support HTTPS in the future. - readiness_path = (f'http://{self.url}{readiness_path}') + url = self.url + if url is None: + logger.info(f'Error when probing {replica_identity}: ' + 'Cannot get the endpoint.') + return self, False, probe_time + readiness_path = (f'{url}{readiness_path}') + logger.info(f'Probing {replica_identity} with {readiness_path}.') if post_data is not None: msg += 'POST' response = requests.post( diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 0814441eb79..8a4387b40c0 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -24,6 +24,7 @@ from sky import exceptions from sky import global_user_state from sky import status_lib +from sky.backends import backend_utils from sky.serve import constants from sky.serve import serve_state from sky.skylet import constants as skylet_constants @@ -725,12 +726,21 @@ def get_endpoint(service_record: Dict[str, Any]) -> str: handle = global_user_state.get_handle_from_cluster_name( SKY_SERVE_CONTROLLER_NAME) assert isinstance(handle, backends.CloudVmRayResourceHandle) - if handle is None or handle.head_ip is None: + if handle is None: return '-' load_balancer_port = service_record['load_balancer_port'] if load_balancer_port is None: return '-' - return f'{handle.head_ip}:{load_balancer_port}' + try: + endpoint = backend_utils.get_endpoints(handle.cluster_name, + load_balancer_port).get( + load_balancer_port, None) + except exceptions.ClusterNotUpError: + return '-' + if endpoint is None: + return '-' + assert isinstance(endpoint, str), endpoint + return endpoint def format_service_table(service_records: List[Dict[str, Any]], @@ -794,7 +804,7 @@ def _format_replica_table(replica_records: List[Dict[str, Any]], return 'No existing replicas.' replica_columns = [ - 'SERVICE_NAME', 'ID', 'VERSION', 'IP', 'LAUNCHED', 'RESOURCES', + 'SERVICE_NAME', 'ID', 'VERSION', 'ENDPOINT', 'LAUNCHED', 'RESOURCES', 'STATUS', 'REGION' ] if show_all: @@ -808,10 +818,11 @@ def _format_replica_table(replica_records: List[Dict[str, Any]], replica_records = replica_records[:_REPLICA_TRUNC_NUM] for record in replica_records: + endpoint = record.get('endpoint', '-') service_name = record['service_name'] replica_id = record['replica_id'] version = (record['version'] if 'version' in record else '-') - replica_ip = '-' + replica_endpoint = endpoint if endpoint else '-' launched_at = log_utils.readable_time_duration(record['launched_at']) resources_str = '-' replica_status = record['status'] @@ -821,8 +832,6 @@ def _format_replica_table(replica_records: List[Dict[str, Any]], replica_handle: 'backends.CloudVmRayResourceHandle' = record['handle'] if replica_handle is not None: - if replica_handle.head_ip is not None: - replica_ip = replica_handle.head_ip resources_str = resources_utils.get_readable_resources_repr( replica_handle, simplify=not show_all) if replica_handle.launched_resources.region is not None: @@ -834,7 +843,7 @@ def _format_replica_table(replica_records: List[Dict[str, Any]], service_name, replica_id, version, - replica_ip, + replica_endpoint, launched_at, resources_str, status_str, diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index c64436c512e..172c958bb3e 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -85,6 +85,81 @@ provider: name: skypilot-service-account-role apiGroup: rbac.authorization.k8s.io + + # Role to access ingress services for fetching IP + autoscaler_ingress_role: + kind: Role + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + namespace: ingress-nginx + name: skypilot-service-account-ingress-role + labels: + parent: skypilot + rules: + - apiGroups: [ "" ] + resources: [ "services" ] + verbs: [ "list", "get", "watch" ] + - apiGroups: [ "rbac.authorization.k8s.io" ] + resources: [ "roles", "rolebindings" ] + verbs: [ "get", "list", "watch" ] + + # RoleBinding to access ingress services for fetching IP + autoscaler_ingress_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: RoleBinding + metadata: + namespace: ingress-nginx + name: skypilot-service-account-ingress-role-binding + labels: + parent: skypilot + subjects: + - kind: ServiceAccount + name: skypilot-service-account + roleRef: + kind: Role + name: skypilot-service-account-ingress-role + apiGroup: rbac.authorization.k8s.io + + # In addition to a role binding, we also need a cluster role binding to give + # the SkyPilot access to the cluster-wide resources such as nodes to get + # node resources. + autoscaler_cluster_role: + kind: ClusterRole + apiVersion: rbac.authorization.k8s.io/v1 + metadata: + labels: + parent: skypilot + name: skypilot-service-account-cluster-role + rules: + - apiGroups: [ "" ] + resources: [ "nodes" ] # Required for getting node resources. + verbs: [ "get", "list", "watch" ] + - apiGroups: [ "rbac.authorization.k8s.io" ] + resources: [ "clusterroles", "clusterrolebindings" ] # Required for launching more SkyPilot clusters from within the pod. + verbs: [ "get", "list", "watch" ] + - apiGroups: [ "node.k8s.io" ] + resources: [ "runtimeclasses" ] # Required for autodetecting the runtime class of the nodes. + verbs: [ "get", "list", "watch" ] + - apiGroups: [ "networking.k8s.io" ] # Required for exposing services. + resources: [ "ingressclasses" ] + verbs: [ "get", "list", "watch" ] + + # Bind cluster role to the service account + autoscaler_cluster_role_binding: + apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + labels: + parent: skypilot + name: skypilot-service-account-cluster-role-binding + subjects: + - kind: ServiceAccount + name: skypilot-service-account + roleRef: + kind: ClusterRole + name: skypilot-service-account-cluster-role + apiGroup: rbac.authorization.k8s.io + services: # Service to expose the head node pod's SSH port. - apiVersion: v1 @@ -154,9 +229,9 @@ available_node_types: container.apparmor.security.beta.kubernetes.io/ray-node: unconfined {% endif %} spec: - # Change this if you altered the autoscaler_service_account above - # or want to provide your own. - serviceAccountName: skypilot-service-account + # serviceAccountName: skypilot-service-account + serviceAccountName: {{k8s_service_account_name}} + automountServiceAccountToken: {{k8s_automount_sa_token}} restartPolicy: Never diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 2abefc6fea0..b5b9027de65 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -61,11 +61,18 @@ def get_usage_run_id() -> str: return _usage_run_id -def get_user_hash() -> str: +def get_user_hash(force_fresh_hash: bool = False) -> str: """Returns a unique user-machine specific hash as a user id. We cache the user hash in a file to avoid potential user_name or hostname changes causing a new user hash to be generated. + + Args: + force_fresh_hash: Bypasses the cached hash in USER_HASH_FILE and the + hash in the USER_ID_ENV_VAR and forces a fresh user-machine hash + to be generated. Used by `kubernetes.ssh_key_secret_field_name` to + avoid controllers sharing the same ssh key field name as the + local client. """ def _is_valid_user_hash(user_hash: Optional[str]) -> bool: @@ -77,12 +84,13 @@ def _is_valid_user_hash(user_hash: Optional[str]) -> bool: return False return len(user_hash) == USER_HASH_LENGTH - user_hash = os.getenv(constants.USER_ID_ENV_VAR) - if _is_valid_user_hash(user_hash): - assert user_hash is not None - return user_hash + if not force_fresh_hash: + user_hash = os.getenv(constants.USER_ID_ENV_VAR) + if _is_valid_user_hash(user_hash): + assert user_hash is not None + return user_hash - if os.path.exists(_USER_HASH_FILE): + if not force_fresh_hash and os.path.exists(_USER_HASH_FILE): # Read from cached user hash file. with open(_USER_HASH_FILE, 'r', encoding='utf-8') as f: # Remove invalid characters. @@ -96,8 +104,13 @@ def _is_valid_user_hash(user_hash: Optional[str]) -> bool: # A fallback in case the hash is invalid. user_hash = uuid.uuid4().hex[:USER_HASH_LENGTH] os.makedirs(os.path.dirname(_USER_HASH_FILE), exist_ok=True) - with open(_USER_HASH_FILE, 'w', encoding='utf-8') as f: - f.write(user_hash) + if not force_fresh_hash: + # Do not cache to file if force_fresh_hash is True since the file may + # be intentionally using a different hash, e.g. we want to keep the + # user_hash for usage collection the same on the jobs/serve controller + # as users' local client. + with open(_USER_HASH_FILE, 'w', encoding='utf-8') as f: + f.write(user_hash) return user_hash diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index b4a312ac1ab..9908fa54286 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -228,6 +228,25 @@ def _get_cloud_dependencies_installation_commands( 'pip list | grep google-cloud-storage > /dev/null 2>&1 || ' 'pip install google-cloud-storage > /dev/null 2>&1') commands.append(f'{gcp.GOOGLE_SDK_INSTALLATION_COMMAND}') + elif isinstance(cloud, clouds.Kubernetes): + commands.append( + f'echo -en "\\r{prefix_str}Kubernetes{empty_str}" && ' + 'pip list | grep kubernetes > /dev/null 2>&1 || ' + 'pip install "kubernetes>=20.0.0" > /dev/null 2>&1 &&' + # Install k8s + skypilot dependencies + 'sudo bash -c "if ' + '! command -v curl &> /dev/null || ' + '! command -v socat &> /dev/null || ' + '! command -v netcat &> /dev/null; ' + 'then apt update && apt install curl socat netcat -y; ' + 'fi" && ' + # Install kubectl + '(command -v kubectl &>/dev/null || ' + '(curl -s -LO "https://dl.k8s.io/release/' + '$(curl -L -s https://dl.k8s.io/release/stable.txt)' + '/bin/linux/amd64/kubectl" && ' + 'sudo install -o root -g root -m 0755 ' + 'kubectl /usr/local/bin/kubectl))') if controller == Controllers.JOBS_CONTROLLER: if isinstance(cloud, clouds.IBM): commands.append( @@ -239,11 +258,6 @@ def _get_cloud_dependencies_installation_commands( commands.append(f'echo -en "\\r{prefix_str}OCI{empty_str}" && ' 'pip list | grep oci > /dev/null 2>&1 || ' 'pip install oci > /dev/null 2>&1') - elif isinstance(cloud, clouds.Kubernetes): - commands.append( - f'echo -en "\\r{prefix_str}Kubernetes{empty_str}" && ' - 'pip list | grep kubernetes > /dev/null 2>&1 || ' - 'pip install "kubernetes>=20.0.0" > /dev/null 2>&1') elif isinstance(cloud, clouds.RunPod): commands.append( f'echo -en "\\r{prefix_str}RunPod{empty_str}" && ' @@ -671,7 +685,28 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task', # whenever task.storage_mounts is non-empty. logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.' f'{colorama.Style.RESET_ALL} See: sky storage ls') - task.sync_storage_mounts() + try: + task.sync_storage_mounts() + except ValueError as e: + if 'No enabled cloud for storage' in str(e): + data_src = None + if has_local_source_paths_file_mounts: + data_src = 'file_mounts' + if has_local_source_paths_workdir: + if data_src: + data_src += ' and workdir' + else: + data_src = 'workdir' + store_enabled_clouds = ', '.join(storage_lib.STORE_ENABLED_CLOUDS) + with ux_utils.print_exception_no_traceback(): + raise exceptions.NotSupportedError( + f'Unable to use {data_src} - no cloud with object store ' + 'is enabled. Please enable at least one cloud with ' + f'object store support ({store_enabled_clouds}) by running ' + f'`sky check`, or remove {data_src} from your task.' + '\nHint: If you do not have any cloud access, you may still' + ' download data and code over the network using curl or ' + 'other tools in the `setup` section of the task.') from None # Step 5: Add the file download into the file mounts, such as # /original-dst: s3://spot-fm-file-only-bucket-name/file-0 diff --git a/sky/utils/kubernetes/generate_static_kubeconfig.sh b/sky/utils/kubernetes/generate_static_kubeconfig.sh new file mode 100755 index 00000000000..30ea929177a --- /dev/null +++ b/sky/utils/kubernetes/generate_static_kubeconfig.sh @@ -0,0 +1,137 @@ +#!/bin/bash +# This script creates a new k8s Service Account and generates a kubeconfig with +# its credentials. This Service Account has all the necessary permissions for +# SkyPilot. The kubeconfig is written in the current directory. +# +# You must configure your local kubectl to point to the right k8s cluster and +# have admin-level access. +# +# Note: all of the k8s resources are created in namespace "skypilot". If you +# delete any of these objects, SkyPilot will stop working. +# +# You can override the default namespace "skypilot" using the +# SKYPILOT_NAMESPACE environment variable. +# You can override the default service account name "skypilot-sa" using the +# SKYPILOT_SA_NAME environment variable. + +set -eu -o pipefail + +# Allow passing in common name and username in environment. If not provided, +# use default. +SKYPILOT_SA=${SKYPILOT_SA_NAME:-skypilot-sa} +NAMESPACE=${SKYPILOT_NAMESPACE:-default} + +# Set OS specific values. +if [[ "$OSTYPE" == "linux-gnu" ]]; then + BASE64_DECODE_FLAG="-d" +elif [[ "$OSTYPE" == "darwin"* ]]; then + BASE64_DECODE_FLAG="-D" +elif [[ "$OSTYPE" == "linux-musl" ]]; then + BASE64_DECODE_FLAG="-d" +else + echo "Unknown OS ${OSTYPE}" + exit 1 +fi + +echo "Creating the Kubernetes Service Account with minimal RBAC permissions." +kubectl apply -f - < kubeconfig < ~/controller.log 2>&1 & + python -u -m fastchat.serve.controller --host 127.0.0.1 > ~/controller.log 2>&1 & sleep 10 echo 'Starting model worker...' python -u -m fastchat.serve.model_worker \ - --model-path lmsys/$MODEL_NAME 2>&1 \ + --host 127.0.0.1 \ + --model-path lmsys/$MODEL_NAME 2>&1 \ | tee model_worker.log & echo 'Waiting for model worker to start...' diff --git a/tests/skyserve/restart/user_bug.yaml b/tests/skyserve/restart/user_bug.yaml index b3cbf9e907d..959e725d23d 100644 --- a/tests/skyserve/restart/user_bug.yaml +++ b/tests/skyserve/restart/user_bug.yaml @@ -8,7 +8,6 @@ service: resources: ports: 8080 cpus: 2+ - use_spot: True workdir: tests/skyserve/restart diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 78305f4d559..c0469abb109 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -284,11 +284,13 @@ def test_minimal(generic_cloud: str): # Ensure the raylet process has the correct file descriptor limit. f'sky exec {name} "prlimit -n --pid=\$(pgrep -f \'raylet/raylet --raylet_socket_name\') | grep \'"\'1048576 1048576\'"\'"', f'sky logs {name} 2 --status', # Ensure the job succeeded. + # Install jq for the next test. + f'sky exec {name} \'sudo apt-get update && sudo apt-get install -y jq\'', # Check the cluster info f'sky exec {name} \'echo "$SKYPILOT_CLUSTER_INFO" | jq .cluster_name | grep {name}\'', - f'sky logs {name} 3 --status', # Ensure the job succeeded. - f'sky exec {name} \'echo "$SKYPILOT_CLUSTER_INFO" | jq .cloud | grep -i {generic_cloud}\'', f'sky logs {name} 4 --status', # Ensure the job succeeded. + f'sky exec {name} \'echo "$SKYPILOT_CLUSTER_INFO" | jq .cloud | grep -i {generic_cloud}\'', + f'sky logs {name} 5 --status', # Ensure the job succeeded. ], f'sky down -y {name}', _get_timeout(generic_cloud), @@ -3243,8 +3245,16 @@ def test_skyserve_azure_http(): run_one_test(test) +@pytest.mark.kubernetes +@pytest.mark.serve +def test_skyserve_kubernetes_http(): + """Test skyserve on Kubernetes""" + name = _get_service_name() + test = _get_skyserve_http_test(name, 'kubernetes', 30) + run_one_test(test) + + @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_llm(generic_cloud: str): """Test skyserve with real LLM usecase""" name = _get_service_name() @@ -3272,7 +3282,7 @@ def generate_llm_test_command(prompt: str, expected_output: str) -> str: ], ], _TEARDOWN_SERVICE.format(name=name), - timeout=25 * 60, + timeout=40 * 60, ) run_one_test(test) @@ -3366,7 +3376,6 @@ def test_skyserve_dynamic_ondemand_fallback(): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_user_bug_restart(generic_cloud: str): """Tests that we restart the service after user bug.""" # TODO(zhwu): this behavior needs some rethinking. @@ -3400,7 +3409,7 @@ def test_skyserve_user_bug_restart(generic_cloud: str): @pytest.mark.serve -@pytest.mark.no_kubernetes +@pytest.mark.no_kubernetes # Replicas on k8s may be running on the same node and have the same public IP def test_skyserve_load_balancer(generic_cloud: str): """Test skyserve load balancer round-robin policy""" name = _get_service_name() @@ -3466,7 +3475,6 @@ def test_skyserve_auto_restart(): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_cancel(generic_cloud: str): """Test skyserve with cancel""" name = _get_service_name() @@ -3492,7 +3500,6 @@ def test_skyserve_cancel(generic_cloud: str): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_update(generic_cloud: str): """Test skyserve with update""" name = _get_service_name() @@ -3521,7 +3528,6 @@ def test_skyserve_update(generic_cloud: str): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_rolling_update(generic_cloud: str): """Test skyserve with rolling update""" name = _get_service_name() @@ -3558,7 +3564,6 @@ def test_skyserve_rolling_update(generic_cloud: str): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_fast_update(generic_cloud: str): """Test skyserve with fast update (Increment version of old replicas)""" name = _get_service_name() @@ -3571,7 +3576,7 @@ def test_skyserve_fast_update(generic_cloud: str): f'{_SERVE_ENDPOINT_WAIT.format(name=name)}; curl -L http://$endpoint | grep "Hi, SkyPilot here"', f'sky serve update {name} --cloud {generic_cloud} --mode blue_green -y tests/skyserve/update/bump_version_after.yaml', # sleep to wait for update to be registered. - 'sleep 120', + 'sleep 30', # 2 on-deamnd (ready) + 1 on-demand (provisioning). ( _check_replica_in_status( @@ -3585,7 +3590,7 @@ def test_skyserve_fast_update(generic_cloud: str): # Test rolling update f'sky serve update {name} --cloud {generic_cloud} -y tests/skyserve/update/bump_version_before.yaml', # sleep to wait for update to be registered. - 'sleep 30', + 'sleep 15', # 2 on-deamnd (ready) + 1 on-demand (shutting down). _check_replica_in_status(name, [(2, False, 'READY'), (1, False, 'SHUTTING_DOWN')]), @@ -3600,7 +3605,6 @@ def test_skyserve_fast_update(generic_cloud: str): @pytest.mark.serve -@pytest.mark.no_kubernetes def test_skyserve_update_autoscale(generic_cloud: str): """Test skyserve update with autoscale""" name = _get_service_name() @@ -3637,8 +3641,8 @@ def test_skyserve_update_autoscale(generic_cloud: str): @pytest.mark.serve +@pytest.mark.no_kubernetes # Spot instances are not supported in Kubernetes @pytest.mark.parametrize('mode', ['rolling', 'blue_green']) -@pytest.mark.no_kubernetes def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): """Test skyserve with update that changes autoscaler""" name = _get_service_name() + mode From 12c156aa93ec46cdc058d50287bc070ec3035338 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 8 May 2024 00:32:27 -0700 Subject: [PATCH 7/7] [k8s] Disable autostop for controller on kubernetes (#3521) * playing around * wip with hacks * wip refactor get_endpoints * working get_endpoints * wip * fixed circular import * Working for ingress and loadbalancer svc * lint * add purging from #3094 * Use local catalog on the controller too * use externalip if available * add dshm_size_limit * optimize dependency installation * Add todo * optimize ingress * fix * fix * remove autostop timing * Fix URLs for raw IP:ports * fixes * wip * SA wip * Allow use of service accounts through remote_identity field * Make purge work for no clusters in kubeconfig * Handle ingress namespace not present * setup optimizations and critical SA key fix * fix docs * fix docs * Add support for skypilot.co/external-ip annotation for ingress * Remove dshm_size_limit * Undo kind changes * Update service account docs * minor docs * update comment * is_same_cloud to cloud_in_list * refactor query_ports to use head_ip * autodown + http prefixing in callers * fix ssh key issues when user hash is reused * linting * lint * lint, HOST_CONTROLLERS * add serve smoke tests for k8s * disallow file_mounts and workdir if no storage cloud is enabled * minor * lint * update fastchat to use --host 127.0.0.1 * extend timeout * docs comments * rename to port * add to core.py * docstrs * add docs on exec based auth * expand elif * add lb comment * refactor * refactor * fix docs build * add PODIP mode support * make ssh services optional * nits * Revert "make ssh services optional" This reverts commit 87d4d25daff8471241eefb9349e18a0d8af1264b. * Revert "add PODIP mode support" This reverts commit 750d4d4dbeaea470ceb8bd7b708fd82dccbb5e81. * nits * use 0.0.0.0 when on k8s; use common impl for other clouds * return dict instead of raising errors in core.endpoints() * lint * merge fixes * merge fixes * merge fixes * lint * fix smoke tests * fix smoke tests * comment * add enum for remote identity * lint * disable autostop for kubernetes * add skip_status_check * remove zone requirement * fix timings for test * silence curl download * move jq from yaml to test_minimal * move jq from yaml to test_minimal * add assert * lint * lint --- sky/backends/cloud_vm_ray_backend.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index ca152a75f91..e17845f4989 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1991,9 +1991,21 @@ def provision_with_retries( cloud_user = None else: cloud_user = to_provision.cloud.get_current_user_identity() + + requested_features = self._requested_features.copy() + # Skip stop feature for Kubernetes jobs controller. + if isinstance(to_provision.cloud, clouds.Kubernetes + ) and controller_utils.Controllers.from_name( + cluster_name + ) == controller_utils.Controllers.JOBS_CONTROLLER: + assert (clouds.CloudImplementationFeatures.STOP + in requested_features), requested_features + requested_features.remove( + clouds.CloudImplementationFeatures.STOP) + # Skip if to_provision.cloud does not support requested features to_provision.cloud.check_features_are_supported( - to_provision, self._requested_features) + to_provision, requested_features) config_dict = self._retry_zones( to_provision, @@ -4053,6 +4065,16 @@ def set_autostop(self, # The core.autostop() function should have already checked that the # cloud and resources support requested autostop. if idle_minutes_to_autostop is not None: + # Skip auto-stop for Kubernetes clusters. + if isinstance(handle.launched_resources.cloud, clouds.Kubernetes): + # We should hit this code path only for the jobs controller on + # Kubernetes clusters. + assert (controller_utils.Controllers.from_name( + handle.cluster_name) == controller_utils.Controllers. + JOBS_CONTROLLER), handle.cluster_name + logger.info('Auto-stop is not supported for Kubernetes ' + 'clusters. Skipping.') + return # Check if we're stopping spot assert (handle.launched_resources is not None and