Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove field _is_sky_managed for intermediate bucket #4545

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
deduce is_sky_managed
  • Loading branch information
zpoint committed Jan 8, 2025
commit ce4500173864d333da2d43919f264b39ced7dfb0
62 changes: 37 additions & 25 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
_STORAGE_LOG_FILE_NAME = 'storage_sync.log'


def _is_sky_managed_intermediate_bucket(bucket_name: str) -> bool:
return re.match(r'skypilot-filemounts-.+-[a-f0-9]{8}',
bucket_name) is not None
Comment on lines +81 to +83
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basing on bucket name is not a robust solution. Can't we manually set the force_delete accordingly in maybe_translate...?

Copy link
Collaborator Author

@zpoint zpoint Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_force_delete is already True in our cases. The controller will call delete anyway; it just doesn’t know whether the bucket is sky_managed. Should it delete the entire bucket or just the sub_path



def get_cached_enabled_storage_clouds_or_refresh(
raise_if_no_cloud_access: bool = False) -> List[str]:
# This is a temporary solution until https://github.com/skypilot-org/skypilot/issues/1943 # pylint: disable=line-too-long
Expand Down Expand Up @@ -392,7 +397,7 @@ def upload(self) -> None:
"""
raise NotImplementedError

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
"""Removes the Storage from the cloud."""
raise NotImplementedError

Expand Down Expand Up @@ -1086,7 +1091,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
else:
global_user_state.set_storage_handle(self.name, self.handle)
elif self.force_delete:
store.delete(force_delete_bucket=True)
store.delete()
# Remove store from bookkeeping
del self.stores[store_type]
else:
Expand All @@ -1095,7 +1100,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
self.handle.remove_store(store)
store.delete()
elif self.force_delete:
store.delete(force_delete_bucket=True)
store.delete()
self.stores = {}
# Remove storage from global_user_state if present
global_user_state.remove_storage(self.name)
Expand Down Expand Up @@ -1364,7 +1369,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))
Comment on lines +1371 to +1372
Copy link
Collaborator

@romilbhardwaj romilbhardwaj Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_sky_managed should not be inferred on bucket name, it depends on whether the bucket is created by sky or not. Can we instead pass this information to the controller using force_delete, since this field is used mainly for deletion logic?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_force_delete is already True in our cases. The controller will call delete anyway; it just doesn’t know whether the bucket is sky_managed. Should it delete the entire bucket or just the sub_path


def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1395,9 +1402,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_s3_bucket(self.name)
Expand Down Expand Up @@ -1848,7 +1854,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1881,9 +1889,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_gcs_bucket(self.name)
Expand Down Expand Up @@ -2424,7 +2431,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def _update_storage_account_name_and_resource(self):
self.storage_account_name, self.resource_group_name = (
Expand Down Expand Up @@ -2712,10 +2721,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
"""Deletes the storage."""
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_az_bucket(self.name)
Expand Down Expand Up @@ -3135,7 +3143,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -3166,9 +3176,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

deleted_by_skypilot = self._delete_r2_bucket(self.name)
Expand Down Expand Up @@ -3615,7 +3624,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads files from local machine to bucket.
Expand Down Expand Up @@ -3649,9 +3660,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
return self._delete_sub_path()

self._delete_cos_bucket()
Expand Down Expand Up @@ -4060,7 +4070,9 @@ def initialize(self):
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.is_sky_managed = (is_new_bucket or
_is_sky_managed_intermediate_bucket(
self.name))

def upload(self):
"""Uploads source to store bucket.
Expand All @@ -4085,7 +4097,7 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, force_delete_bucket: bool = False) -> None:
def delete(self) -> None:
deleted_by_skypilot = self._delete_oci_bucket(self.name)
if deleted_by_skypilot:
msg_str = f'Deleted OCI bucket {self.name}.'
Expand Down
12 changes: 2 additions & 10 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
# we store all these files in same bucket from config.
bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None)
store_kwargs: Dict[str, Any] = {}
# Controllers don't have the knowledge of whether the bucket is managed by
# sky or not, By default we consider the sky create and managed the
# intermediate bucket so we let controller delete the buckets after job
# finishes.
force_delete = True
if bucket_wth_prefix is None:
store_type = store_cls = sub_path = None
storage_account_name = region = None
Expand All @@ -718,8 +713,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
store_kwargs['storage_account_name'] = storage_account_name
if region is not None:
store_kwargs['region'] = region
# If the bucket is not managed by sky, we should not force delete it.
force_delete = False

# Step 1: Translate the workdir to SkyPilot storage.
new_storage_mounts = {}
if task.workdir is not None:
Expand Down Expand Up @@ -751,7 +745,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[constants.SKY_REMOTE_WORKDIR] = storage_obj
# Check of the existence of the workdir in file_mounts is done in
# the task construction.
Expand Down Expand Up @@ -789,7 +782,6 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[dst] = storage_obj
logger.info(f' {colorama.Style.DIM}Folder : {src!r} '
f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}')
Expand Down Expand Up @@ -829,7 +821,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.MOUNT,
stores=stores,
_bucket_sub_path=file_mounts_tmp_subpath)
storage_obj.force_delete = force_delete

new_storage_mounts[file_mount_remote_tmp_dir] = storage_obj
if file_mount_remote_tmp_dir in original_storage_mounts:
with ux_utils.print_exception_no_traceback():
Expand Down
Loading