From 731966e6560dbb6c3f65c7176f8b4fd8b803d1b3 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Sat, 21 Sep 2024 13:30:11 -0700 Subject: [PATCH 1/9] add compression to local files --- sky/data/storage.py | 54 +++++++++++++++++++-- sky/task.py | 89 +++++++++++++++++++++++++++++++++++ sky/utils/controller_utils.py | 3 +- 3 files changed, 142 insertions(+), 4 deletions(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index 5214799d2f3..faa3a84bf01 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -9,7 +9,7 @@ import typing from typing import Any, Dict, List, Optional, Tuple, Type, Union import urllib.parse - +import zipfile import colorama from sky import check as sky_check @@ -848,7 +848,8 @@ def from_metadata(cls, metadata: StorageMetadata, def add_store(self, store_type: Union[str, StoreType], - region: Optional[str] = None) -> AbstractStore: + region: Optional[str] = None, + compress_local: bool = False) -> AbstractStore: """Initializes and adds a new store to the storage. Invoked by the optimizer after it has selected a store to @@ -922,7 +923,10 @@ def add_store(self, self._add_store(store) # Upload source to store - self._sync_store(store) + if compress_local: + self._compress_sync_store(store) + else: + self._sync_store(store) return store @@ -985,6 +989,50 @@ def sync_all_stores(self): """Syncs the source and destinations of all stores in the Storage""" for _, store in self.stores.items(): self._sync_store(store) + + def _compress_sync_store(self, store: AbstractStore): + """ + TODO: WARRICK HE + Compresses everything in source and uploads that, specific to local->bucket + """ + zip_filename = os.getcwd()+"/skypilot-source-files.zip" #can change name later + filepaths = self.source + if type(self.source) == str: + filepaths = [self.source] + try: + with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: + for filepath in filepaths: + if os.path.isdir(filepath): + for root, dirs, files in os.walk(filepath): + for file in files: + zipf.write(os.path.join(root, file), os.path.join(root.replace(filepath, '', 1), file) + '/') + else: + zipf.write(filepath, filepath.replace(os.path.sep, '/')) + except: + if os.path.exists(zip_filename): + os.remove(zip_filename) + raise + try: + # overwrite source temporarily! + store.source = zip_filename + store.upload() + store.source = filepaths + except exceptions.StorageUploadError: + if os.path.exists(zip_filename): + os.remove(zip_filename) + self.source = filepaths + logger.error(f'Could not upload {self.source!r} to store ' + f'name {store.name!r}.') + if store.is_sky_managed: + global_user_state.set_storage_status( + self.name, StorageStatus.UPLOAD_FAILED) + raise + if os.path.exists(zip_filename): + os.remove(zip_filename) + # Upload succeeded - update state + if store.is_sky_managed: + global_user_state.set_storage_status(self.name, StorageStatus.READY) + def _sync_store(self, store: AbstractStore): """Runs the upload routine for the store and handles failures""" diff --git a/sky/task.py b/sky/task.py index cebc616dc6d..894f897364d 100644 --- a/sky/task.py +++ b/sky/task.py @@ -23,6 +23,7 @@ from sky.utils import common_utils from sky.utils import schemas from sky.utils import ux_utils +import zipfile if typing.TYPE_CHECKING: from sky import resources as resources_lib @@ -1035,6 +1036,94 @@ def sync_storage_mounts(self) -> None: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Storage Type {store_type} ' 'does not exist!') + + def compress_local_sync_storage_mounts(self) -> None: + """(INTERNAL) Eagerly syncs local storage mounts to cloud storage after compressing + + After syncing up, COPY-mode storage mounts are translated into regular + file_mounts of the form ``{ /remote/path: {s3,gs,..}:// + }``. + """ + for storage in self.storage_mounts.values(): + if len(storage.stores) == 0: + store_type, store_region = self._get_preferred_store() + self.storage_plans[storage] = store_type + storage.add_store(store_type, store_region, True) + else: + # We will download the first store that is added to remote. + self.storage_plans[storage] = list(storage.stores.keys())[0] + + storage_mounts = self.storage_mounts + storage_plans = self.storage_plans + for mnt_path, storage in storage_mounts.items(): + if storage.mode == storage_lib.StorageMode.COPY: + store_type = storage_plans[storage] + if store_type is storage_lib.StoreType.S3: + # TODO: allow for Storage mounting of different clouds + if isinstance(storage.source, + str) and storage.source.startswith('s3://'): + blob_path = storage.source + else: + assert storage.name is not None, storage + blob_path = 's3://' + storage.name + self.update_file_mounts({ + mnt_path: blob_path, + }) + elif store_type is storage_lib.StoreType.GCS: + if isinstance(storage.source, + str) and storage.source.startswith('gs://'): + blob_path = storage.source + else: + assert storage.name is not None, storage + blob_path = 'gs://' + storage.name + self.update_file_mounts({ + mnt_path: blob_path, + }) + elif store_type is storage_lib.StoreType.AZURE: + if (isinstance(storage.source, str) and + data_utils.is_az_container_endpoint( + storage.source)): + blob_path = storage.source + else: + assert storage.name is not None, storage + store_object = storage.stores[ + storage_lib.StoreType.AZURE] + assert isinstance(store_object, + storage_lib.AzureBlobStore) + storage_account_name = store_object.storage_account_name + blob_path = data_utils.AZURE_CONTAINER_URL.format( + storage_account_name=storage_account_name, + container_name=storage.name) + self.update_file_mounts({ + mnt_path: blob_path, + }) + elif store_type is storage_lib.StoreType.R2: + if storage.source is not None and not isinstance( + storage.source, + list) and storage.source.startswith('r2://'): + blob_path = storage.source + else: + blob_path = 'r2://' + storage.name + self.update_file_mounts({ + mnt_path: blob_path, + }) + elif store_type is storage_lib.StoreType.IBM: + if isinstance(storage.source, + str) and storage.source.startswith('cos://'): + # source is a cos bucket's uri + blob_path = storage.source + else: + # source is a bucket name. + assert storage.name is not None, storage + # extract region from rclone.conf + cos_region = data_utils.Rclone.get_region_from_rclone( + storage.name, data_utils.Rclone.RcloneClouds.IBM) + blob_path = f'cos://{cos_region}/{storage.name}' + self.update_file_mounts({mnt_path: blob_path}) + else: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Storage Type {store_type} ' + 'does not exist!') def get_local_to_remote_file_mounts(self) -> Optional[Dict[str, str]]: """Returns file mounts of the form (dst=VM path, src=local path). diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 118f9a2b718..44aa06e6353 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -711,7 +711,8 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task', logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.' f'{colorama.Style.RESET_ALL} See: sky storage ls') try: - task.sync_storage_mounts() + # Optimize filemount translation + task.compress_local_sync_storage_mounts() except ValueError as e: if 'No enabled cloud for storage' in str(e): data_src = None From 07a6bcb40cbfde63fdab7e40709731ec42d06688 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Sun, 22 Sep 2024 10:43:03 -0700 Subject: [PATCH 2/9] Add Decompression Logic For Workdirs --- sky/backends/cloud_vm_ray_backend.py | 12 ++++++++++++ sky/data/storage.py | 20 +++++++++++++++----- sky/task.py | 3 ++- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 4d6e0eb4fb7..4d8a75b3d83 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4574,6 +4574,18 @@ def _symlink_node(runner: command_runner.CommandRunner): f'may already exist. Log: {log_path}') subprocess_utils.run_in_parallel(_symlink_node, runners) + + #in case of compression, inflate all zip files + # can optimize further by going to specific filemounts & workdir only + def _decompress_filemount_zips(runner: command_runner.CommandRunner): + zip_filename = f"{constants.SKY_REMOTE_WORKDIR}/skypilot-filemounts.zip" + decompress_command = (f'unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} && rm {zip_filename}') + returncode = runner.run(decompress_command, log_path=log_path) + # subprocess_utils.handle_returncode( + # returncode, decompress_command, + # 'Failed to inflate or remove skypilot-filemounts.zip, check permissions' + # f'Log: {log_path}') + subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners) end = time.time() logger.debug(f'File mount sync took {end - start} seconds.') diff --git a/sky/data/storage.py b/sky/data/storage.py index faa3a84bf01..0c5b1c36bf4 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -995,31 +995,40 @@ def _compress_sync_store(self, store: AbstractStore): TODO: WARRICK HE Compresses everything in source and uploads that, specific to local->bucket """ - zip_filename = os.getcwd()+"/skypilot-source-files.zip" #can change name later + zip_filepath = os.path.join(os.getcwd(),f'{store.name}-compressed') + zip_filename = os.path.join(zip_filepath,f'skypilot-filemounts.zip') #can change name later filepaths = self.source if type(self.source) == str: filepaths = [self.source] try: - with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf: + if not os.path.exists(zip_filepath): + os.mkdir(zip_filepath) + with zipfile.ZipFile(os.path.join(zip_filepath,zip_filename), 'w', zipfile.ZIP_DEFLATED) as zipf: for filepath in filepaths: - if os.path.isdir(filepath): + logger.info(f"Filepath: {filepath}") + if os.path.isdir(os.path.expanduser(filepath)): for root, dirs, files in os.walk(filepath): for file in files: + if file == "skypilot-filemounts.zip": + continue zipf.write(os.path.join(root, file), os.path.join(root.replace(filepath, '', 1), file) + '/') else: + logger.info(f"Filepath Zipping {filepath}") zipf.write(filepath, filepath.replace(os.path.sep, '/')) except: if os.path.exists(zip_filename): os.remove(zip_filename) + os.rmdir(zip_filepath) raise try: - # overwrite source temporarily! - store.source = zip_filename + # overwrite source with zip + store.source = zip_filepath store.upload() store.source = filepaths except exceptions.StorageUploadError: if os.path.exists(zip_filename): os.remove(zip_filename) + os.rmdir(zip_filepath) self.source = filepaths logger.error(f'Could not upload {self.source!r} to store ' f'name {store.name!r}.') @@ -1029,6 +1038,7 @@ def _compress_sync_store(self, store: AbstractStore): raise if os.path.exists(zip_filename): os.remove(zip_filename) + os.rmdir(zip_filepath) # Upload succeeded - update state if store.is_sky_managed: global_user_state.set_storage_status(self.name, StorageStatus.READY) diff --git a/sky/task.py b/sky/task.py index 894f897364d..e2bd99b12d3 100644 --- a/sky/task.py +++ b/sky/task.py @@ -1048,7 +1048,8 @@ def compress_local_sync_storage_mounts(self) -> None: if len(storage.stores) == 0: store_type, store_region = self._get_preferred_store() self.storage_plans[storage] = store_type - storage.add_store(store_type, store_region, True) + is_workdir = storage.name.startswith('skypilot-workdir') + storage.add_store(store_type, store_region, is_workdir) else: # We will download the first store that is added to remote. self.storage_plans[storage] = list(storage.stores.keys())[0] From 98f7aaf9c7b39c24b62d55cf824c90d5cd6f6c1d Mon Sep 17 00:00:00 2001 From: Warrick He Date: Tue, 1 Oct 2024 01:05:10 -0700 Subject: [PATCH 3/9] Add Formatting Changes --- sky/backends/cloud_vm_ray_backend.py | 17 +++++++++------- sky/data/storage.py | 30 ++++++++++++++-------------- sky/task.py | 6 +++--- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 4d8a75b3d83..ba686a4c5f4 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4574,17 +4574,20 @@ def _symlink_node(runner: command_runner.CommandRunner): f'may already exist. Log: {log_path}') subprocess_utils.run_in_parallel(_symlink_node, runners) - #in case of compression, inflate all zip files # can optimize further by going to specific filemounts & workdir only def _decompress_filemount_zips(runner: command_runner.CommandRunner): - zip_filename = f"{constants.SKY_REMOTE_WORKDIR}/skypilot-filemounts.zip" - decompress_command = (f'unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} && rm {zip_filename}') + zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}' + '/skypilot-filemounts.zip') + decompress_command = ( + f'unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} ' + '&& rm {zip_filename}') returncode = runner.run(decompress_command, log_path=log_path) - # subprocess_utils.handle_returncode( - # returncode, decompress_command, - # 'Failed to inflate or remove skypilot-filemounts.zip, check permissions' - # f'Log: {log_path}') + subprocess_utils.handle_returncode( + returncode, decompress_command, + 'Failed to inflate or remove skypilot-filemounts.zip, ' + f'check permissions. Log: {log_path}') + subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners) end = time.time() logger.debug(f'File mount sync took {end - start} seconds.') diff --git a/sky/data/storage.py b/sky/data/storage.py index 0c5b1c36bf4..db14e4e159c 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -989,31 +989,32 @@ def sync_all_stores(self): """Syncs the source and destinations of all stores in the Storage""" for _, store in self.stores.items(): self._sync_store(store) - + def _compress_sync_store(self, store: AbstractStore): - """ - TODO: WARRICK HE - Compresses everything in source and uploads that, specific to local->bucket - """ - zip_filepath = os.path.join(os.getcwd(),f'{store.name}-compressed') - zip_filename = os.path.join(zip_filepath,f'skypilot-filemounts.zip') #can change name later + """Same as sync_store, but compresses before uploading""" + zip_filepath = os.path.join(os.getcwd(), f'{store.name}-compressed') + zip_filename = os.path.join(zip_filepath, 'skypilot-filemounts.zip') filepaths = self.source - if type(self.source) == str: + if isinstance(self.source, str): filepaths = [self.source] try: if not os.path.exists(zip_filepath): os.mkdir(zip_filepath) - with zipfile.ZipFile(os.path.join(zip_filepath,zip_filename), 'w', zipfile.ZIP_DEFLATED) as zipf: + with zipfile.ZipFile(os.path.join(zip_filepath, zip_filename), 'w', + zipfile.ZIP_DEFLATED) as zipf: for filepath in filepaths: - logger.info(f"Filepath: {filepath}") + logger.info('Filepath: {filepath}') if os.path.isdir(os.path.expanduser(filepath)): - for root, dirs, files in os.walk(filepath): + for root, _, files in os.walk(filepath): for file in files: - if file == "skypilot-filemounts.zip": + if file == 'skypilot-filemounts.zip': continue - zipf.write(os.path.join(root, file), os.path.join(root.replace(filepath, '', 1), file) + '/') + zipf.write( + os.path.join(root, file), + os.path.join(root.replace(filepath, '', 1), + file) + '/') else: - logger.info(f"Filepath Zipping {filepath}") + logger.info(f'Filepath Zipping {filepath}') zipf.write(filepath, filepath.replace(os.path.sep, '/')) except: if os.path.exists(zip_filename): @@ -1043,7 +1044,6 @@ def _compress_sync_store(self, store: AbstractStore): if store.is_sky_managed: global_user_state.set_storage_status(self.name, StorageStatus.READY) - def _sync_store(self, store: AbstractStore): """Runs the upload routine for the store and handles failures""" diff --git a/sky/task.py b/sky/task.py index e2bd99b12d3..2c14232d9e7 100644 --- a/sky/task.py +++ b/sky/task.py @@ -23,7 +23,6 @@ from sky.utils import common_utils from sky.utils import schemas from sky.utils import ux_utils -import zipfile if typing.TYPE_CHECKING: from sky import resources as resources_lib @@ -1036,9 +1035,10 @@ def sync_storage_mounts(self) -> None: with ux_utils.print_exception_no_traceback(): raise ValueError(f'Storage Type {store_type} ' 'does not exist!') - + def compress_local_sync_storage_mounts(self) -> None: - """(INTERNAL) Eagerly syncs local storage mounts to cloud storage after compressing + """(INTERNAL) Eagerly syncs local storage mounts to cloud storage + after compression After syncing up, COPY-mode storage mounts are translated into regular file_mounts of the form ``{ /remote/path: {s3,gs,..}:// From f7405bfeb11b25aae0321a10764233da06d83e50 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Tue, 1 Oct 2024 01:26:40 -0700 Subject: [PATCH 4/9] Fix None Error --- sky/data/storage.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sky/data/storage.py b/sky/data/storage.py index db14e4e159c..6ae882e3028 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union import urllib.parse import zipfile + import colorama from sky import check as sky_check @@ -992,6 +993,9 @@ def sync_all_stores(self): def _compress_sync_store(self, store: AbstractStore): """Same as sync_store, but compresses before uploading""" + if self.source is None: + self._sync_store(store) + return zip_filepath = os.path.join(os.getcwd(), f'{store.name}-compressed') zip_filename = os.path.join(zip_filepath, 'skypilot-filemounts.zip') filepaths = self.source From 77a4044da468d8d23e7c38d893fc1f0d46ba6443 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Tue, 1 Oct 2024 04:47:26 -0700 Subject: [PATCH 5/9] Add Error Reporting --- sky/backends/cloud_vm_ray_backend.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index ba686a4c5f4..87249c40746 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4580,15 +4580,16 @@ def _decompress_filemount_zips(runner: command_runner.CommandRunner): zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}' '/skypilot-filemounts.zip') decompress_command = ( - f'unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} ' - '&& rm {zip_filename}') + f'[ -f {zip_filename} ] && ' + f'(unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} ' + f'&& rm {zip_filename}) || echo "Zip not on this node"') returncode = runner.run(decompress_command, log_path=log_path) subprocess_utils.handle_returncode( returncode, decompress_command, 'Failed to inflate or remove skypilot-filemounts.zip, ' f'check permissions. Log: {log_path}') - - subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners) + subprocess_utils.run_in_parallel(_decompress_filemount_zips, + runners) end = time.time() logger.debug(f'File mount sync took {end - start} seconds.') From d1b663cc191a0abbe6023894157d89c773cef656 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Tue, 1 Oct 2024 04:59:52 -0700 Subject: [PATCH 6/9] fix formatting --- sky/backends/cloud_vm_ray_backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 87249c40746..88f316182fa 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4588,8 +4588,8 @@ def _decompress_filemount_zips(runner: command_runner.CommandRunner): returncode, decompress_command, 'Failed to inflate or remove skypilot-filemounts.zip, ' f'check permissions. Log: {log_path}') - subprocess_utils.run_in_parallel(_decompress_filemount_zips, - runners) + + subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners) end = time.time() logger.debug(f'File mount sync took {end - start} seconds.') From ba5db23e291a7bd233275ac5463813ccf1cc9e55 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Tue, 1 Oct 2024 19:07:14 -0700 Subject: [PATCH 7/9] Remove Unnecessary Logging --- sky/data/storage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sky/data/storage.py b/sky/data/storage.py index 6ae882e3028..5b095d140a8 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -1007,7 +1007,6 @@ def _compress_sync_store(self, store: AbstractStore): with zipfile.ZipFile(os.path.join(zip_filepath, zip_filename), 'w', zipfile.ZIP_DEFLATED) as zipf: for filepath in filepaths: - logger.info('Filepath: {filepath}') if os.path.isdir(os.path.expanduser(filepath)): for root, _, files in os.walk(filepath): for file in files: From faeb223bed2cd3dbaeb3f928c05b7276063c3454 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Mon, 7 Oct 2024 17:00:33 -0700 Subject: [PATCH 8/9] Change compression methods & clean code --- sky/backends/cloud_vm_ray_backend.py | 11 +-- sky/data/storage.py | 106 ++++++++++++++------------- sky/task.py | 90 ----------------------- sky/utils/controller_utils.py | 2 +- 4 files changed, 62 insertions(+), 147 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 88f316182fa..8c62cb178b9 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4574,15 +4574,16 @@ def _symlink_node(runner: command_runner.CommandRunner): f'may already exist. Log: {log_path}') subprocess_utils.run_in_parallel(_symlink_node, runners) - #in case of compression, inflate all zip files - # can optimize further by going to specific filemounts & workdir only + # In case of compression, inflate all zip files + # Can optimize further by going to specific filemounts & workdir only def _decompress_filemount_zips(runner: command_runner.CommandRunner): zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}' - '/skypilot-filemounts.zip') + '/skypilot-filemounts*.zip') decompress_command = ( f'[ -f {zip_filename} ] && ' - f'(unzip {zip_filename} -d {constants.SKY_REMOTE_WORKDIR} ' - f'&& rm {zip_filename}) || echo "Zip not on this node"') + f'(tar -xzf {zip_filename} -C {constants.SKY_REMOTE_WORKDIR} ' + f'| cat && rm {zip_filename}) || ' + 'echo "Zipfile not found on this node"') returncode = runner.run(decompress_command, log_path=log_path) subprocess_utils.handle_returncode( returncode, decompress_command, diff --git a/sky/data/storage.py b/sky/data/storage.py index 5b095d140a8..6700fcb2dff 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -5,11 +5,12 @@ import re import shlex import subprocess +import tempfile import time import typing from typing import Any, Dict, List, Optional, Tuple, Type, Union import urllib.parse -import zipfile +import uuid import colorama @@ -860,6 +861,8 @@ def add_store(self, store_type: StoreType; Type of the storage [S3, GCS, AZURE, R2, IBM] region: str; Region to place the bucket in. Caller must ensure that the region is valid for the chosen store_type. + compress_local: boolean; Decides whether we want to compress the + filemount before uploaidng to the bucket. """ if isinstance(store_type, str): store_type = StoreType(store_type) @@ -925,7 +928,7 @@ def add_store(self, # Upload source to store if compress_local: - self._compress_sync_store(store) + self._maybe_compress_sync_store(store) else: self._sync_store(store) @@ -991,61 +994,62 @@ def sync_all_stores(self): for _, store in self.stores.items(): self._sync_store(store) - def _compress_sync_store(self, store: AbstractStore): + def _maybe_compress_sync_store(self, store: AbstractStore): """Same as sync_store, but compresses before uploading""" - if self.source is None: + # Only supports workdir, self.source should be type str + if self.source is None or not isinstance(self.source, str): self._sync_store(store) return - zip_filepath = os.path.join(os.getcwd(), f'{store.name}-compressed') - zip_filename = os.path.join(zip_filepath, 'skypilot-filemounts.zip') - filepaths = self.source - if isinstance(self.source, str): - filepaths = [self.source] - try: - if not os.path.exists(zip_filepath): - os.mkdir(zip_filepath) - with zipfile.ZipFile(os.path.join(zip_filepath, zip_filename), 'w', - zipfile.ZIP_DEFLATED) as zipf: - for filepath in filepaths: - if os.path.isdir(os.path.expanduser(filepath)): - for root, _, files in os.walk(filepath): - for file in files: - if file == 'skypilot-filemounts.zip': - continue - zipf.write( - os.path.join(root, file), - os.path.join(root.replace(filepath, '', 1), - file) + '/') - else: - logger.info(f'Filepath Zipping {filepath}') - zipf.write(filepath, filepath.replace(os.path.sep, '/')) - except: - if os.path.exists(zip_filename): - os.remove(zip_filename) - os.rmdir(zip_filepath) - raise + + def num_files(source, excluded_list): + count = 0 + find_cmd = [ + 'find', source, '-type', 'd', '-name', '.git', '-prune', '-o', + '-print' + ] + grep_cmd = ['cat'] + if len(excluded_list) > 0: + grep_cmd = ['grep', '-vE', f'"({"|".join(excluded_list)})"'] + all_files = subprocess.Popen(find_cmd, stdout=subprocess.PIPE) + relevant_files = subprocess.Popen(grep_cmd, + stdin=all_files.stdout, + stdout=subprocess.PIPE) + all_files.stdout.close() + num_files = subprocess.Popen(['wc', '-l'], + stdin=relevant_files.stdout, + stdout=subprocess.PIPE) + relevant_files.stdout.close() + count += int(num_files.communicate()[0]) + num_files.stdout.close() + return count + + excluded_list = storage_utils.\ + get_excluded_files_from_gitignore(self.source) + excluded_list.append('.git') + + if num_files(self.source, excluded_list) <= 10: + self._sync_store(store) + return + try: - # overwrite source with zip - store.source = zip_filepath - store.upload() - store.source = filepaths + with tempfile.TemporaryDirectory() as tmpdirname: + # uuid used to avoid collisions if compressing multiple sources + file_name = f'skypilot-filemounts-{uuid.uuid1()}.zip' + tmp_path = os.path.join(tmpdirname, file_name) + command = ['tar', '-czf', tmp_path] + for ignored_file in excluded_list: + command.append(f'--exclude=./{ignored_file}') + # TODO(warrickhe): Possibly extend compression to all filemounts + # Currently only supports workdirs + command.extend(['-C', self.source, '.']) + subprocess.Popen(command) + original_source = store.source + store.source = tmpdirname + self._sync_store(store) + store.source = original_source except exceptions.StorageUploadError: - if os.path.exists(zip_filename): - os.remove(zip_filename) - os.rmdir(zip_filepath) - self.source = filepaths - logger.error(f'Could not upload {self.source!r} to store ' - f'name {store.name!r}.') - if store.is_sky_managed: - global_user_state.set_storage_status( - self.name, StorageStatus.UPLOAD_FAILED) + logger.error('Problem when trying to upload compressed file') raise - if os.path.exists(zip_filename): - os.remove(zip_filename) - os.rmdir(zip_filepath) - # Upload succeeded - update state - if store.is_sky_managed: - global_user_state.set_storage_status(self.name, StorageStatus.READY) def _sync_store(self, store: AbstractStore): """Runs the upload routine for the store and handles failures""" diff --git a/sky/task.py b/sky/task.py index 2c14232d9e7..a6a7713f93d 100644 --- a/sky/task.py +++ b/sky/task.py @@ -951,95 +951,6 @@ def _get_preferred_store( def sync_storage_mounts(self) -> None: """(INTERNAL) Eagerly syncs storage mounts to cloud storage. - After syncing up, COPY-mode storage mounts are translated into regular - file_mounts of the form ``{ /remote/path: {s3,gs,..}:// - }``. - """ - for storage in self.storage_mounts.values(): - if len(storage.stores) == 0: - store_type, store_region = self._get_preferred_store() - self.storage_plans[storage] = store_type - storage.add_store(store_type, store_region) - else: - # We will download the first store that is added to remote. - self.storage_plans[storage] = list(storage.stores.keys())[0] - - storage_mounts = self.storage_mounts - storage_plans = self.storage_plans - for mnt_path, storage in storage_mounts.items(): - if storage.mode == storage_lib.StorageMode.COPY: - store_type = storage_plans[storage] - if store_type is storage_lib.StoreType.S3: - # TODO: allow for Storage mounting of different clouds - if isinstance(storage.source, - str) and storage.source.startswith('s3://'): - blob_path = storage.source - else: - assert storage.name is not None, storage - blob_path = 's3://' + storage.name - self.update_file_mounts({ - mnt_path: blob_path, - }) - elif store_type is storage_lib.StoreType.GCS: - if isinstance(storage.source, - str) and storage.source.startswith('gs://'): - blob_path = storage.source - else: - assert storage.name is not None, storage - blob_path = 'gs://' + storage.name - self.update_file_mounts({ - mnt_path: blob_path, - }) - elif store_type is storage_lib.StoreType.AZURE: - if (isinstance(storage.source, str) and - data_utils.is_az_container_endpoint( - storage.source)): - blob_path = storage.source - else: - assert storage.name is not None, storage - store_object = storage.stores[ - storage_lib.StoreType.AZURE] - assert isinstance(store_object, - storage_lib.AzureBlobStore) - storage_account_name = store_object.storage_account_name - blob_path = data_utils.AZURE_CONTAINER_URL.format( - storage_account_name=storage_account_name, - container_name=storage.name) - self.update_file_mounts({ - mnt_path: blob_path, - }) - elif store_type is storage_lib.StoreType.R2: - if storage.source is not None and not isinstance( - storage.source, - list) and storage.source.startswith('r2://'): - blob_path = storage.source - else: - blob_path = 'r2://' + storage.name - self.update_file_mounts({ - mnt_path: blob_path, - }) - elif store_type is storage_lib.StoreType.IBM: - if isinstance(storage.source, - str) and storage.source.startswith('cos://'): - # source is a cos bucket's uri - blob_path = storage.source - else: - # source is a bucket name. - assert storage.name is not None, storage - # extract region from rclone.conf - cos_region = data_utils.Rclone.get_region_from_rclone( - storage.name, data_utils.Rclone.RcloneClouds.IBM) - blob_path = f'cos://{cos_region}/{storage.name}' - self.update_file_mounts({mnt_path: blob_path}) - else: - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Storage Type {store_type} ' - 'does not exist!') - - def compress_local_sync_storage_mounts(self) -> None: - """(INTERNAL) Eagerly syncs local storage mounts to cloud storage - after compression - After syncing up, COPY-mode storage mounts are translated into regular file_mounts of the form ``{ /remote/path: {s3,gs,..}:// }``. @@ -1060,7 +971,6 @@ def compress_local_sync_storage_mounts(self) -> None: if storage.mode == storage_lib.StorageMode.COPY: store_type = storage_plans[storage] if store_type is storage_lib.StoreType.S3: - # TODO: allow for Storage mounting of different clouds if isinstance(storage.source, str) and storage.source.startswith('s3://'): blob_path = storage.source diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 44aa06e6353..aaebb72ad16 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -712,7 +712,7 @@ def maybe_translate_local_file_mounts_and_sync_up(task: 'task_lib.Task', f'{colorama.Style.RESET_ALL} See: sky storage ls') try: # Optimize filemount translation - task.compress_local_sync_storage_mounts() + task.sync_storage_mounts() except ValueError as e: if 'No enabled cloud for storage' in str(e): data_src = None From f0f9155e11dc7f747c4ef009d511de0504618806 Mon Sep 17 00:00:00 2001 From: Warrick He Date: Sun, 20 Oct 2024 23:39:15 -0700 Subject: [PATCH 9/9] Polish Code --- sky/backends/cloud_vm_ray_backend.py | 8 +++++--- sky/data/storage.py | 13 +++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 8c62cb178b9..d75e63764b9 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -4574,9 +4574,11 @@ def _symlink_node(runner: command_runner.CommandRunner): f'may already exist. Log: {log_path}') subprocess_utils.run_in_parallel(_symlink_node, runners) - # In case of compression, inflate all zip files + # (3) In case of compression, inflate all zip files + # TODO(warrickhe): track filenames for compression edgecase # Can optimize further by going to specific filemounts & workdir only - def _decompress_filemount_zips(runner: command_runner.CommandRunner): + def _decompress_filemount_zips( + runner: command_runner.CommandRunner) -> None: zip_filename = (f'{constants.SKY_REMOTE_WORKDIR}' '/skypilot-filemounts*.zip') decompress_command = ( @@ -4587,7 +4589,7 @@ def _decompress_filemount_zips(runner: command_runner.CommandRunner): returncode = runner.run(decompress_command, log_path=log_path) subprocess_utils.handle_returncode( returncode, decompress_command, - 'Failed to inflate or remove skypilot-filemounts.zip, ' + 'Failed to inflate or remove skypilot-filemounts-uuid.zip, ' f'check permissions. Log: {log_path}') subprocess_utils.run_in_parallel(_decompress_filemount_zips, runners) diff --git a/sky/data/storage.py b/sky/data/storage.py index 6700fcb2dff..6fb56fb3050 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -62,6 +62,8 @@ # Maximum number of concurrent rsync upload processes _MAX_CONCURRENT_UPLOADS = 32 +_MIN_FILES_TO_COMPRESS = 10 + _BUCKET_FAIL_TO_CONNECT_MESSAGE = ( 'Failed to access existing bucket {name!r}. ' 'This is likely because it is a private bucket you do not have access to.\n' @@ -862,7 +864,7 @@ def add_store(self, region: str; Region to place the bucket in. Caller must ensure that the region is valid for the chosen store_type. compress_local: boolean; Decides whether we want to compress the - filemount before uploaidng to the bucket. + filemount before uploading to the bucket. """ if isinstance(store_type, str): store_type = StoreType(store_type) @@ -1023,11 +1025,14 @@ def num_files(source, excluded_list): num_files.stdout.close() return count - excluded_list = storage_utils.\ - get_excluded_files_from_gitignore(self.source) + excluded_list = (storage_utils.get_excluded_files_from_gitignore( + self.source)) excluded_list.append('.git') - if num_files(self.source, excluded_list) <= 10: + # Checks for total number of files before compressing + # Rsync main latency delay is in compressing a certain number of files. + # Testing shows that with 10+ files it's worth compressing. + if num_files(self.source, excluded_list) <= _MIN_FILES_TO_COMPRESS: self._sync_store(store) return