Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spot/Serve] Optimize the translation of filemounts #4016

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4574,6 +4574,22 @@ def _symlink_node(runner: command_runner.CommandRunner):
f'may already exist. Log: {log_path}')

subprocess_utils.run_in_parallel(_symlink_node, runners)
#in case of compression, inflate all zip files
cblmemo marked this conversation as resolved.
Show resolved Hide resolved
# can optimize further by going to specific filemounts & workdir only
def _decompress_filemount_zips(runner: command_runner.CommandRunner):
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}'
'/skypilot-filemounts.zip')
decompress_command = (
f'[ -f {zip_filename} ] && '
f'(unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} '
f'&& rm {zip_filename}) || echo "Zip not on this node"')
returncode = runner.run(decompress_command, log_path=log_path)
subprocess_utils.handle_returncode(
returncode, decompress_command,
'Failed to inflate or remove skypilot-filemounts.zip, '
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
f'check permissions. Log: {log_path}')
warrickhe marked this conversation as resolved.
Show resolved Hide resolved

subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners)
end = time.time()
logger.debug(f'File mount sync took {end - start} seconds.')

Expand Down
65 changes: 63 additions & 2 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import typing
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import urllib.parse
import zipfile

import colorama

Expand Down Expand Up @@ -848,7 +849,8 @@ def from_metadata(cls, metadata: StorageMetadata,

def add_store(self,
store_type: Union[str, StoreType],
region: Optional[str] = None) -> AbstractStore:
region: Optional[str] = None,
compress_local: bool = False) -> AbstractStore:
cblmemo marked this conversation as resolved.
Show resolved Hide resolved
"""Initializes and adds a new store to the storage.

Invoked by the optimizer after it has selected a store to
Expand Down Expand Up @@ -922,7 +924,10 @@ def add_store(self,
self._add_store(store)

# Upload source to store
self._sync_store(store)
if compress_local:
self._compress_sync_store(store)
else:
self._sync_store(store)

return store

Expand Down Expand Up @@ -986,6 +991,62 @@ def sync_all_stores(self):
for _, store in self.stores.items():
self._sync_store(store)

def _compress_sync_store(self, store: AbstractStore):
"""Same as sync_store, but compresses before uploading"""
if self.source is None:
self._sync_store(store)
return
zip_filepath = os.path.join(os.getcwd(), f'{store.name}-compressed')
zip_filename = os.path.join(zip_filepath, 'skypilot-filemounts.zip')
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
filepaths = self.source
if isinstance(self.source, str):
filepaths = [self.source]
try:
if not os.path.exists(zip_filepath):
os.mkdir(zip_filepath)
with zipfile.ZipFile(os.path.join(zip_filepath, zip_filename), 'w',
zipfile.ZIP_DEFLATED) as zipf:
for filepath in filepaths:
if os.path.isdir(os.path.expanduser(filepath)):
for root, _, files in os.walk(filepath):
for file in files:
if file == 'skypilot-filemounts.zip':
continue
zipf.write(
os.path.join(root, file),
os.path.join(root.replace(filepath, '', 1),
file) + '/')
else:
logger.info(f'Filepath Zipping {filepath}')
zipf.write(filepath, filepath.replace(os.path.sep, '/'))
except:
if os.path.exists(zip_filename):
os.remove(zip_filename)
os.rmdir(zip_filepath)
raise
cblmemo marked this conversation as resolved.
Show resolved Hide resolved
try:
# overwrite source with zip
store.source = zip_filepath
store.upload()
store.source = filepaths
except exceptions.StorageUploadError:
if os.path.exists(zip_filename):
os.remove(zip_filename)
os.rmdir(zip_filepath)
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
self.source = filepaths
logger.error(f'Could not upload {self.source!r} to store '
f'name {store.name!r}.')
if store.is_sky_managed:
global_user_state.set_storage_status(
self.name, StorageStatus.UPLOAD_FAILED)
raise
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
if os.path.exists(zip_filename):
os.remove(zip_filename)
os.rmdir(zip_filepath)
# Upload succeeded - update state
if store.is_sky_managed:
global_user_state.set_storage_status(self.name, StorageStatus.READY)

def _sync_store(self, store: AbstractStore):
"""Runs the upload routine for the store and handles failures"""

Expand Down
90 changes: 90 additions & 0 deletions sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,96 @@ def sync_storage_mounts(self) -> None:
raise ValueError(f'Storage Type {store_type} '
'does not exist!')

def compress_local_sync_storage_mounts(self) -> None:
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
"""(INTERNAL) Eagerly syncs local storage mounts to cloud storage
after compression

After syncing up, COPY-mode storage mounts are translated into regular
file_mounts of the form ``{ /remote/path: {s3,gs,..}://<bucket path>
}``.
"""
for storage in self.storage_mounts.values():
if len(storage.stores) == 0:
store_type, store_region = self._get_preferred_store()
self.storage_plans[storage] = store_type
is_workdir = storage.name.startswith('skypilot-workdir')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a config for toggle this feature. cc @Michaelvll for a look here?

storage.add_store(store_type, store_region, is_workdir)
else:
# We will download the first store that is added to remote.
self.storage_plans[storage] = list(storage.stores.keys())[0]

storage_mounts = self.storage_mounts
storage_plans = self.storage_plans
for mnt_path, storage in storage_mounts.items():
if storage.mode == storage_lib.StorageMode.COPY:
store_type = storage_plans[storage]
if store_type is storage_lib.StoreType.S3:
# TODO: allow for Storage mounting of different clouds
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(storage.source,
str) and storage.source.startswith('s3://'):
blob_path = storage.source
else:
assert storage.name is not None, storage
blob_path = 's3://' + storage.name
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.GCS:
if isinstance(storage.source,
str) and storage.source.startswith('gs://'):
blob_path = storage.source
else:
assert storage.name is not None, storage
blob_path = 'gs://' + storage.name
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.AZURE:
if (isinstance(storage.source, str) and
data_utils.is_az_container_endpoint(
storage.source)):
blob_path = storage.source
else:
assert storage.name is not None, storage
store_object = storage.stores[
storage_lib.StoreType.AZURE]
assert isinstance(store_object,
storage_lib.AzureBlobStore)
storage_account_name = store_object.storage_account_name
blob_path = data_utils.AZURE_CONTAINER_URL.format(
storage_account_name=storage_account_name,
container_name=storage.name)
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.R2:
if storage.source is not None and not isinstance(
storage.source,
list) and storage.source.startswith('r2://'):
blob_path = storage.source
else:
blob_path = 'r2://' + storage.name
self.update_file_mounts({
mnt_path: blob_path,
})
elif store_type is storage_lib.StoreType.IBM:
if isinstance(storage.source,
str) and storage.source.startswith('cos://'):
# source is a cos bucket's uri
blob_path = storage.source
else:
# source is a bucket name.
assert storage.name is not None, storage
# extract region from rclone.conf
cos_region = data_utils.Rclone.get_region_from_rclone(
storage.name, data_utils.Rclone.RcloneClouds.IBM)
blob_path = f'cos://{cos_region}/{storage.name}'
self.update_file_mounts({mnt_path: blob_path})
else:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Storage Type {store_type} '
'does not exist!')

def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]:
"""Returns file mounts of the form (dst=VM path, src=local path).

Expand Down
3 changes: 2 additions & 1 deletion sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task',
logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.'
f'{colorama.Style.RESET_ALL} See: sky storage ls')
try:
task.sync_storage_mounts()
# Optimize filemount translation
task.compress_local_sync_storage_mounts()
warrickhe marked this conversation as resolved.
Show resolved Hide resolved
except ValueError as e:
if 'No enabled cloud for storage' in str(e):
data_src = None
Expand Down
Loading