diff --git a/tosfs/consts.py b/tosfs/consts.py index 0ebf0ac..f83b0e5 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -17,6 +17,10 @@ # Tos server response status codes TOS_SERVER_STATUS_CODE_NOT_FOUND = 404 +# tos bucket type (hns, fns) +TOS_BUCKET_TYPE_HNS = "hns" +TOS_BUCKET_TYPE_FNS = "fns" + MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 diff --git a/tosfs/core.py b/tosfs/core.py index 5b75853..a8ed6fb 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -50,11 +50,14 @@ MPU_PART_SIZE_THRESHOLD, PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, + TOS_BUCKET_TYPE_FNS, + TOS_BUCKET_TYPE_HNS, TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate +from tosfs.models import DeletingObject from tosfs.mpu import MultipartUploader from tosfs.retry import retryable_func_executor from tosfs.tag import BucketTagMgr @@ -541,16 +544,7 @@ def exists(self, path: str, **kwargs: Any) -> bool: ) except TosServerError as ex: if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: - resp = retryable_func_executor( - lambda: self.tos_client.list_objects_type2( - bucket, - key.rstrip("/") + "/", - start_after=key.rstrip("/") + "/", - max_keys=1, - ), - max_retry_num=self.max_retry_num, - ) - return len(resp.contents) > 0 + return self._prefix_search_for_exists(bucket, key) else: raise ex else: @@ -1314,54 +1308,97 @@ def _rm(self, path: str) -> None: ######################## private methods ######################## def _list_and_batch_delete_objs(self, bucket: str, key: str) -> None: + bucket_type = self.tos_client._get_bucket_type(bucket).lower() is_truncated = True continuation_token = "" all_results = [] - class DeletingObject: - def __init__(self, key: str, version_id: Optional[str] = None): - self.key = key - self.version_id = version_id - - def delete_objects(deleting_objects: List[DeletingObject]) -> None: - delete_resp = retryable_func_executor( - lambda: self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True + def list_objects(continuation_token: str = "") -> ListObjectType2Output: + return retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, ), max_retry_num=self.max_retry_num, ) - if delete_resp.error: - for d in delete_resp.error: - logger.warning("Deleted object: %s failed", d) + + if bucket_type == TOS_BUCKET_TYPE_FNS: + while is_truncated: + resp = list_objects(continuation_token) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + all_results = resp.contents + + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] + + if deleting_objects: + self._delete_objects(bucket, deleting_objects) + elif bucket_type == TOS_BUCKET_TYPE_HNS: + all_results = self._list_and_collect_objects( + bucket, bucket_type, key.rstrip("/") + "/" + ) + if all_results: + self._delete_objects(bucket, all_results) + else: + raise ValueError(f"Unsupported bucket type: {bucket_type}") + + def _delete_objects( + self, bucket: str, deleting_objects: list[DeletingObject] + ) -> None: + delete_resp = retryable_func_executor( + lambda: self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ), + max_retry_num=self.max_retry_num, + ) + if delete_resp.error: + for d in delete_resp.error: + logger.warning("Deleted object: %s failed", d) + + def _list_and_collect_objects( + self, bucket: str, bucket_type: str, prefix: str + ) -> List[DeletingObject]: + collected_objects: list[DeletingObject] = [] + is_truncated = True + continuation_token = "" while is_truncated: def _call_list_objects_type2( - continuation_token: str = continuation_token, + continuation_token: str = continuation_token, prefix: str = prefix ) -> ListObjectType2Output: return self.tos_client.list_objects_type2( bucket, - prefix=key.rstrip("/") + "/", + prefix=prefix, max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, continuation_token=continuation_token, + delimiter="/" if bucket_type == TOS_BUCKET_TYPE_HNS else None, ) resp = retryable_func_executor( _call_list_objects_type2, - args=(continuation_token,), max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token - all_results = resp.contents - - deleting_objects = [ + collected_objects.extend( DeletingObject(o.key if hasattr(o, "key") else o.prefix) - for o in all_results - ] + for o in resp.contents + ) + if bucket_type == TOS_BUCKET_TYPE_HNS: + for common_prefix in resp.common_prefixes: + collected_objects.extend( + self._list_and_collect_objects( + bucket, bucket_type, common_prefix + ) + ) - if deleting_objects: - delete_objects(deleting_objects) + return collected_objects def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. @@ -1924,6 +1961,53 @@ def _call_list_objects_type2( return all_results + def _prefix_search_for_exists(self, bucket: str, key: str) -> bool: + bucket_type = self.tos_client._get_bucket_type(bucket).lower() + if bucket_type == TOS_BUCKET_TYPE_FNS: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + start_after=key.rstrip("/") + "/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + return len(resp.contents) > 0 + elif bucket_type == TOS_BUCKET_TYPE_HNS: + + def search_in_common_prefixes(bucket: str, prefix: str) -> bool: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix, + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + for common_prefix in resp.common_prefixes: + if search_in_common_prefixes(bucket, common_prefix): + return True + return False + + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + return search_in_common_prefixes(bucket, key.rstrip("/") + "/") + else: + raise ValueError(f"Unsupported bucket type {bucket_type}") + def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: """Normalise tos path string into bucket and key. diff --git a/tosfs/models.py b/tosfs/models.py new file mode 100644 index 0000000..956f679 --- /dev/null +++ b/tosfs/models.py @@ -0,0 +1,25 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The module contains models for the tosfs package.""" +from typing import Optional + + +class DeletingObject: + """Object to be deleted.""" + + def __init__(self, key: str, version_id: Optional[str] = None): + """Initialize a DeletingObject for batch delete.""" + self.key = key + self.version_id = version_id