Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove field _is_sky_managed for intermediate bucket #4545

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
remove field _is_sky_managed
  • Loading branch information
zpoint committed Jan 8, 2025
commit 3f9525c73c75513d5d719d46767beb481f1c7b58
55 changes: 20 additions & 35 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,11 @@ def upload(self) -> None:
"""
raise NotImplementedError

def delete(self) -> None:
def delete(self, force_delete_bucket: bool = False) -> None:
"""Removes the Storage from the cloud."""
raise NotImplementedError

def _delete_sub_path(self) -> None:
def delete_sub_path(self) -> None:
"""Removes objects from the sub path in the bucket."""
raise NotImplementedError

Expand Down Expand Up @@ -550,8 +550,6 @@ def __init__(
mode: StorageMode = StorageMode.MOUNT,
sync_on_reconstruction: bool = True,
# pylint: disable=invalid-name
_is_sky_managed: Optional[bool] = None,
# pylint: disable=invalid-name
_bucket_sub_path: Optional[str] = None
) -> None:
"""Initializes a Storage object.
Expand Down Expand Up @@ -591,16 +589,6 @@ def __init__(
there. This is set to false when the Storage object is created not
for direct use, e.g. for 'sky storage delete', or the storage is
being re-used, e.g., for `sky start` on a stopped cluster.
_is_sky_managed: Optional[bool]; Indicates if the storage is managed
by Sky. Without this argument, the controller's behavior differs
from the local machine. For example, if a bucket does not exist:
Local Machine (is_sky_managed=True) →
Controller (is_sky_managed=False).
With this argument, the controller aligns with the local machine,
ensuring it retains the is_sky_managed information from the YAML.
During teardown, if is_sky_managed is True, the controller should
delete the bucket. Otherwise, it might mistakenly delete only the
sub-path, assuming is_sky_managed is False.
_bucket_sub_path: Optional[str]; The subdirectory to use for the
storage object.
"""
Expand All @@ -610,7 +598,6 @@ def __init__(
self.mode = mode
assert mode in StorageMode
self.sync_on_reconstruction = sync_on_reconstruction
self._is_sky_managed = _is_sky_managed
self._bucket_sub_path = _bucket_sub_path

# TODO(romilb, zhwu): This is a workaround to support storage deletion
Expand Down Expand Up @@ -1024,7 +1011,6 @@ def add_store(self,
source=self.source,
region=region,
sync_on_reconstruction=self.sync_on_reconstruction,
is_sky_managed=self._is_sky_managed,
_bucket_sub_path=self._bucket_sub_path)
except exceptions.StorageBucketCreateError:
# Creation failed, so this must be sky managed store. Add failure
Expand Down Expand Up @@ -1100,7 +1086,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
else:
global_user_state.set_storage_handle(self.name, self.handle)
elif self.force_delete:
store.delete()
store.delete(force_delete_bucket=True)
# Remove store from bookkeeping
del self.stores[store_type]
else:
Expand All @@ -1109,7 +1095,7 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
self.handle.remove_store(store)
store.delete()
elif self.force_delete:
store.delete()
store.delete(force_delete_bucket=True)
self.stores = {}
# Remove storage from global_user_state if present
global_user_state.remove_storage(self.name)
Expand Down Expand Up @@ -1157,8 +1143,6 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':
mode_str = config.pop('mode', None)
force_delete = config.pop('_force_delete', None)
# pylint: disable=invalid-name
_is_sky_managed = config.pop('_is_sky_managed', None)
# pylint: disable=invalid-name
_bucket_sub_path = config.pop('_bucket_sub_path', None)
if force_delete is None:
force_delete = False
Expand All @@ -1180,7 +1164,6 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':
source=source,
persistent=persistent,
mode=mode,
_is_sky_managed=_is_sky_managed,
_bucket_sub_path=_bucket_sub_path)
if store is not None:
storage_obj.add_store(StoreType(store.upper()))
Expand All @@ -1205,12 +1188,9 @@ def add_if_not_none(key: str, value: Optional[Any]):
add_if_not_none('source', self.source)

stores = None
is_sky_managed = self._is_sky_managed
if self.stores:
stores = ','.join([store.value for store in self.stores])
is_sky_managed = list(self.stores.values())[0].is_sky_managed
add_if_not_none('store', stores)
add_if_not_none('_is_sky_managed', is_sky_managed)
add_if_not_none('persistent', self.persistent)
add_if_not_none('mode', self.mode.value)
if self.force_delete:
Expand Down Expand Up @@ -1415,8 +1395,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
return self._delete_sub_path()

deleted_by_skypilot = self._delete_s3_bucket(self.name)
Expand Down Expand Up @@ -1900,8 +1881,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
return self._delete_sub_path()

deleted_by_skypilot = self._delete_gcs_bucket(self.name)
Expand Down Expand Up @@ -2730,9 +2712,10 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
def delete(self, force_delete_bucket: bool = False) -> None:
"""Deletes the storage."""
if self._bucket_sub_path is not None and not self.is_sky_managed:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
return self._delete_sub_path()

deleted_by_skypilot = self._delete_az_bucket(self.name)
Expand Down Expand Up @@ -3183,8 +3166,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
return self._delete_sub_path()

deleted_by_skypilot = self._delete_r2_bucket(self.name)
Expand Down Expand Up @@ -3665,8 +3649,9 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
if self._bucket_sub_path is not None and not self.is_sky_managed:
def delete(self, force_delete_bucket: bool = False) -> None:
if (not force_delete_bucket and self._bucket_sub_path is not None and
not self.is_sky_managed):
return self._delete_sub_path()

self._delete_cos_bucket()
Expand Down Expand Up @@ -4100,7 +4085,7 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self) -> None:
def delete(self, force_delete_bucket: bool = False) -> None:
deleted_by_skypilot = self._delete_oci_bucket(self.name)
if deleted_by_skypilot:
msg_str = f'Deleted OCI bucket {self.name}.'
Expand Down
12 changes: 10 additions & 2 deletions sky/utils/controller_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
# we store all these files in same bucket from config.
bucket_wth_prefix = skypilot_config.get_nested(('jobs', 'bucket'), None)
store_kwargs: Dict[str, Any] = {}
# Controllers don't have the knowledge of whether the bucket is managed by
# sky or not, By default we consider the sky create and managed the
# intermediate bucket so we let controller delete the buckets after job
# finishes.
force_delete = True
if bucket_wth_prefix is None:
store_type = store_cls = sub_path = None
storage_account_name = region = None
Expand All @@ -713,7 +718,8 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
store_kwargs['storage_account_name'] = storage_account_name
if region is not None:
store_kwargs['region'] = region

# If the bucket is not managed by sky, we should not force delete it.
force_delete = False
# Step 1: Translate the workdir to SkyPilot storage.
new_storage_mounts = {}
if task.workdir is not None:
Expand Down Expand Up @@ -745,6 +751,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[constants.SKY_REMOTE_WORKDIR] = storage_obj
# Check of the existence of the workdir in file_mounts is done in
# the task construction.
Expand Down Expand Up @@ -782,6 +789,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.COPY,
stores=stores,
_bucket_sub_path=bucket_sub_path)
storage_obj.force_delete = force_delete
new_storage_mounts[dst] = storage_obj
logger.info(f' {colorama.Style.DIM}Folder : {src!r} '
f'-> storage: {bucket_name!r}.{colorama.Style.RESET_ALL}')
Expand Down Expand Up @@ -821,7 +829,7 @@ def assert_no_bucket_creation(store: storage_lib.AbstractStore) -> None:
mode=storage_lib.StorageMode.MOUNT,
stores=stores,
_bucket_sub_path=file_mounts_tmp_subpath)

storage_obj.force_delete = force_delete
new_storage_mounts[file_mount_remote_tmp_dir] = storage_obj
if file_mount_remote_tmp_dir in original_storage_mounts:
with ux_utils.print_exception_no_traceback():
Expand Down
3 changes: 0 additions & 3 deletions sky/utils/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ def get_storage_schema():
mode.value for mode in storage.StorageMode
]
},
'_is_sky_managed': {
'type': 'boolean',
},
'_bucket_sub_path': {
'type': 'string',
},
Expand Down