Skip to content

Commit

Permalink
[Core] Trigger failover based on whether the previous cluster was eve…
Browse files Browse the repository at this point in the history
…r UP (#2977)

* cluster ever up

* fix comment

* simplify failover logic

* remove stalled comments

* Fix comments

* Address comments

* Avoid showing the restarting of ray cluster

* address comments

* format

* Update sky/backends/cloud_vm_ray_backend.py

Co-authored-by: Zongheng Yang <[email protected]>

* Update sky/global_user_state.py

Co-authored-by: Zongheng Yang <[email protected]>

* Fix wait_operation by raising errors

* fix nonetype error

* Fix http exceptions

* fix wait_operations

* fix names API

---------

Co-authored-by: Zongheng Yang <[email protected]>
  • Loading branch information
Michaelvll and concretevitamin authored Jan 18, 2024
1 parent f0bb6ba commit c1f28bc
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 262 deletions.
6 changes: 3 additions & 3 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2016,7 +2016,7 @@ def _update_cluster_status(
return global_user_state.get_cluster_from_name(cluster_name)


def _refresh_cluster_record(
def refresh_cluster_record(
cluster_name: str,
*,
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] = None,
Expand Down Expand Up @@ -2086,7 +2086,7 @@ def refresh_cluster_status_handle(
handle of the cluster.
Please refer to the docstring of refresh_cluster_record for the details.
"""
record = _refresh_cluster_record(
record = refresh_cluster_record(
cluster_name,
force_refresh_statuses=force_refresh_statuses,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock)
Expand Down Expand Up @@ -2408,7 +2408,7 @@ def _is_local_cluster(record):

def _refresh_cluster(cluster_name):
try:
record = _refresh_cluster_record(
record = refresh_cluster_record(
cluster_name,
force_refresh_statuses=set(status_lib.ClusterStatus),
acquire_per_cluster_status_lock=True)
Expand Down
160 changes: 68 additions & 92 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ def _gcp_handler(blocked_resources: Set['resources_lib.Resources'],
assert zones and len(zones) == 1, zones
zone = zones[0]

if not isinstance(err, provision_common.ProvisionError):
if not isinstance(err, provision_common.ProvisionerError):
logger.warning(f'{colorama.Style.DIM}Got an unparsed error: {err}; '
f'blocking resources by its zone {zone.name}'
f'{colorama.Style.RESET_ALL}')
Expand Down Expand Up @@ -1192,13 +1192,15 @@ def __init__(
num_nodes: int,
prev_cluster_status: Optional[status_lib.ClusterStatus],
prev_handle: Optional['CloudVmRayResourceHandle'],
prev_cluster_ever_up: bool,
) -> None:
assert cluster_name is not None, 'cluster_name must be specified.'
self.cluster_name = cluster_name
self.resources = resources
self.num_nodes = num_nodes
self.prev_cluster_status = prev_cluster_status
self.prev_handle = prev_handle
self.prev_cluster_ever_up = prev_cluster_ever_up

def __init__(self,
log_dir: str,
Expand All @@ -1222,9 +1224,10 @@ def __init__(self,
self._wheel_hash = wheel_hash

def _yield_zones(
self, to_provision: resources_lib.Resources, num_nodes: int,
cluster_name: str,
prev_cluster_status: Optional[status_lib.ClusterStatus]
self, to_provision: resources_lib.Resources, num_nodes: int,
cluster_name: str,
prev_cluster_status: Optional[status_lib.ClusterStatus],
prev_cluster_ever_up: bool
) -> Iterable[Optional[List[clouds.Zone]]]:
"""Yield zones within the given region to try for provisioning.
Expand Down Expand Up @@ -1284,57 +1287,24 @@ def _get_previously_launched_zones() -> Optional[List[clouds.Zone]]:
f'Cluster {cluster_name!r} (status: '
f'{prev_cluster_status.value}) was previously launched '
f'in {cloud} {region.name}. Relaunching in that region.')
# TODO(zhwu): The cluster being killed by cloud provider should
# be tested whether re-launching a cluster killed spot instance
# will recover the data.
yield zones

# TODO(zhwu): update the following logics, since we have added
# the support for refreshing the cluster status from the cloud
# provider.
# If it reaches here: the cluster status in the database gets
# set to INIT, since a launch request was issued but failed.
#
# Cluster with status UP can reach here, if it was killed by the
# cloud provider and no available resources in that region to
# relaunch, which can happen to spot instance.
if prev_cluster_status == status_lib.ClusterStatus.UP:
message = (
f'Failed to connect to the cluster {cluster_name!r}. '
'It is possibly killed by cloud provider or manually '
'in the cloud provider console. To remove the cluster '
f'please run: sky down {cluster_name}')
# Reset to UP (rather than keeping it at INIT), as INIT
# mode will enable failover to other regions, causing
# data lose.
# TODO(zhwu): This is set to UP to be more conservative,
# we may need to confirm whether the cluster is down in all
# cases.
global_user_state.set_cluster_status(
cluster_name, status_lib.ClusterStatus.UP)
# set to either STOPPED or None, since a launch request was issued
# but failed, and the provisioning loop (_retry_zones()) stopped the
# cluster if `cluster_ever_up` is True; or terminated the cluster
# otherwise.
if prev_cluster_ever_up:
message = (f'Failed to launch cluster {cluster_name!r} '
f'(previous status: {prev_cluster_status.value}). '
'To retry launching the cluster, run: '
f'sky start {cluster_name}')
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(message,
no_failover=True)

# Check the *previous* cluster status. If the cluster is previously
# stopped, we should not retry other regions, since the previously
# attached volumes are not visible on another region.
elif prev_cluster_status == status_lib.ClusterStatus.STOPPED:
message = (
'Failed to acquire resources to restart the stopped '
f'cluster {cluster_name} in {region.name}. Please retry '
'again later.')

# Reset to STOPPED (rather than keeping it at INIT), because
# (1) the cluster is not up (2) it ensures future `sky start`
# will disable auto-failover too.
global_user_state.set_cluster_status(
cluster_name, status_lib.ClusterStatus.STOPPED)

with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(message,
no_failover=True)
assert prev_cluster_status == status_lib.ClusterStatus.INIT
assert (prev_cluster_status == status_lib.ClusterStatus.INIT
), prev_cluster_status
message = (f'Failed to launch cluster {cluster_name!r} '
f'(previous status: {prev_cluster_status.value}) '
f'with the original resources: {to_provision}.')
Expand Down Expand Up @@ -1390,6 +1360,7 @@ def _retry_zones(
cloud_user_identity: Optional[List[str]],
prev_cluster_status: Optional[status_lib.ClusterStatus],
prev_handle: Optional['CloudVmRayResourceHandle'],
prev_cluster_ever_up: bool,
) -> Dict[str, Any]:
"""The provision retry loop."""
style = colorama.Style
Expand All @@ -1406,9 +1377,6 @@ def _retry_zones(

# Get previous cluster status
cluster_exists = prev_cluster_status is not None
is_prev_cluster_healthy = prev_cluster_status in [
status_lib.ClusterStatus.STOPPED, status_lib.ClusterStatus.UP
]

assert to_provision.region is not None, (
to_provision, 'region should have been set by the optimizer.')
Expand Down Expand Up @@ -1445,7 +1413,8 @@ def _retry_zones(
)

for zones in self._yield_zones(to_provision, num_nodes, cluster_name,
prev_cluster_status):
prev_cluster_status,
prev_cluster_ever_up):
# Filter out zones that are blocked, if any.
# This optimize the provision loop by skipping zones that are
# indicated to be unavailable from previous provision attempts.
Expand Down Expand Up @@ -1555,7 +1524,7 @@ def _retry_zones(
handle.cluster_name_on_cloud),
num_nodes=num_nodes,
cluster_yaml=handle.cluster_yaml,
is_prev_cluster_healthy=is_prev_cluster_healthy,
prev_cluster_ever_up=prev_cluster_ever_up,
log_dir=self.log_dir)
# NOTE: We will handle the logic of '_ensure_cluster_ray_started'
# in 'provision_utils.post_provision_runtime_setup()' in the caller.
Expand All @@ -1572,7 +1541,7 @@ def _retry_zones(
# cluster does not exist. Also we are fast at
# cleaning up clusters now if there is no existing node..
CloudVmRayBackend().post_teardown_cleanup(
handle, terminate=not is_prev_cluster_healthy)
handle, terminate=not prev_cluster_ever_up)
# TODO(suquark): other clouds may have different zone
# blocking strategy. See '_update_blocklist_on_error'
# for details.
Expand Down Expand Up @@ -1634,12 +1603,8 @@ def _retry_zones(
# The cluster is not ready. We must perform error recording and/or
# cleanup.

# If cluster was previously UP or STOPPED, stop it; otherwise
# terminate.
# FIXME(zongheng): terminating a potentially live cluster is
# scary. Say: users have an existing cluster that got into INIT, do
# sky launch, somehow failed, then we may be terminating it here.
terminate_or_stop = not is_prev_cluster_healthy
# If cluster was ever up, stop it; otherwise terminate.
terminate_or_stop = not prev_cluster_ever_up
definitely_no_nodes_launched = False
if status == GangSchedulingStatus.HEAD_FAILED:
# ray up failed for the head node.
Expand Down Expand Up @@ -1706,10 +1671,10 @@ def _retry_zones(
message = (f'Failed to acquire resources in {to_provision.cloud}. '
'Try changing resource requirements or use another '
'cloud provider.')
# Do not failover to other clouds if the cluster was previously
# UP or STOPPED, since the user can have some data on the cluster.
# Do not failover to other locations if the cluster was ever up, since
# the user can have some data on the cluster.
raise exceptions.ResourcesUnavailableError(
message, no_failover=is_prev_cluster_healthy)
message, no_failover=prev_cluster_ever_up)

# TODO(suquark): Deprecate this method
# once the `provision_utils` is adopted for all the clouds.
Expand Down Expand Up @@ -2020,6 +1985,7 @@ def provision_with_retries(
num_nodes = to_provision_config.num_nodes
prev_cluster_status = to_provision_config.prev_cluster_status
prev_handle = to_provision_config.prev_handle
prev_cluster_ever_up = to_provision_config.prev_cluster_ever_up
launchable_retries_disabled = (self._dag is None or
self._optimize_target is None)

Expand Down Expand Up @@ -2049,7 +2015,8 @@ def provision_with_retries(
cluster_name=cluster_name,
cloud_user_identity=cloud_user,
prev_cluster_status=prev_cluster_status,
prev_handle=prev_handle)
prev_handle=prev_handle,
prev_cluster_ever_up=prev_cluster_ever_up)
if dryrun:
return config_dict
except (exceptions.InvalidClusterNameError,
Expand Down Expand Up @@ -4290,33 +4257,39 @@ def _check_existing_cluster(
# TODO(zhwu): complete the list of exceptions.
"""
record = global_user_state.get_cluster_from_name(cluster_name)
handle_before_refresh = None if record is None else record['handle']
status_before_refresh = None if record is None else record['status']
if record is None:
handle_before_refresh = None
status_before_refresh = None
else:
handle_before_refresh = record['handle']
status_before_refresh = record['status']

prev_cluster_status, handle = (status_before_refresh,
handle_before_refresh)

if not dryrun:
prev_cluster_status, handle = (
backend_utils.refresh_cluster_status_handle(
cluster_name,
# We force refresh for the init status to determine the
# actual state of a previous cluster in INIT state.
#
# This is important for the case, where an existing cluster
# is transitioned into INIT state due to key interruption
# during launching, with the following steps:
# (1) launch, after answering prompt immediately ctrl-c;
# (2) launch again.
# If we don't refresh the state of the cluster and reset it
# back to STOPPED, our failover logic will consider it as an
# abnormal cluster after hitting resources capacity limit on
# the cloud, and will start failover. This is not desired,
# because the user may want to keep the data on the disk of
# that cluster.
force_refresh_statuses={status_lib.ClusterStatus.INIT},
acquire_per_cluster_status_lock=False,
))
# We force refresh any cluster (1) with INIT status, or (2) has
# autostop set. This is to determine the actual state of such a
# cluster and to make the hint that uses prev_cluster_status more
# accurate.
record = backend_utils.refresh_cluster_record(
cluster_name,
force_refresh_statuses={status_lib.ClusterStatus.INIT},
acquire_per_cluster_status_lock=False,
)
if record is not None:
prev_cluster_status = record['status']
handle = record['handle']
else:
prev_cluster_status = None
handle = None
# We should check the cluster_ever_up after refresh, because if the
# cluster is terminated (through console or auto-dwon), the record will
# become None and the cluster_ever_up should be considered as False.
cluster_ever_up = record is not None and record['cluster_ever_up']
logger.debug(f'cluster_ever_up: {cluster_ever_up}')
logger.debug(f'record: {record}')

if prev_cluster_status is not None:
assert handle is not None
# Cluster already exists.
Expand All @@ -4339,7 +4312,8 @@ def _check_existing_cluster(
to_provision,
handle.launched_nodes,
prev_cluster_status=prev_cluster_status,
prev_handle=handle)
prev_handle=handle,
prev_cluster_ever_up=cluster_ever_up)
usage_lib.messages.usage.set_new_cluster()
# Use the task_cloud, because the cloud in `to_provision` can be changed
# later during the retry.
Expand Down Expand Up @@ -4394,11 +4368,13 @@ def _check_existing_cluster(
'Tip: to reuse an existing cluster, '
'specify --cluster (-c). '
'Run `sky status` to see existing clusters.')
return RetryingVmProvisioner.ToProvisionConfig(cluster_name,
to_provision,
task.num_nodes,
prev_cluster_status=None,
prev_handle=None)
return RetryingVmProvisioner.ToProvisionConfig(
cluster_name,
to_provision,
task.num_nodes,
prev_cluster_status=None,
prev_handle=None,
prev_cluster_ever_up=False)

def _execute_file_mounts(self, handle: CloudVmRayResourceHandle,
file_mounts: Optional[Dict[Path, Path]]):
Expand Down
Loading

0 comments on commit c1f28bc

Please sign in to comment.