From eba36e127668bbd689ddc2db11cbb0372f8d65e7 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 27 Sep 2024 19:35:11 +0800 Subject: [PATCH] Run test cases on hns --- .github/workflows/ci.yml | 2 + tosfs/consts.py | 8 +- tosfs/core.py | 156 +++++++++++++++++++++++++++++++-------- tosfs/models.py | 25 +++++++ 4 files changed, 157 insertions(+), 34 deletions(-) create mode 100644 tosfs/models.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index db13723..23708d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] fsspec-version: ["2023.5.0", "2024.9.0"] + bucket-name: ["proton-ci", "proton-ci-hns"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} @@ -59,5 +60,6 @@ jobs: echo "TOS_REGION=${{ vars.TOS_REGION }}" >> $GITHUB_ENV echo "TOS_ENDPOINT=${{ vars.TOS_ENDPOINT }}" >> $GITHUB_ENV echo "TOSFS_LOGGING_LEVEL=${{ vars.TOSFS_LOGGING_LEVEL }}" >> $GITHUB_ENV + echo "TOS_BUCKET=${{ matrix.bucket-name }}" >> $GITHUB_ENV - name: Run tests run: make test diff --git a/tosfs/consts.py b/tosfs/consts.py index ae2aeed..f83b0e5 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -17,8 +17,12 @@ # Tos server response status codes TOS_SERVER_STATUS_CODE_NOT_FOUND = 404 -MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 # 5GB -MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 # 5MB +# tos bucket type (hns, fns) +TOS_BUCKET_TYPE_HNS = "hns" +TOS_BUCKET_TYPE_FNS = "fns" + +MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 +MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 RETRY_NUM = 5 PART_MIN_SIZE = 5 * 2**20 diff --git a/tosfs/core.py b/tosfs/core.py index f34f95d..47b3ca8 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -48,11 +48,14 @@ MPU_PART_SIZE_THRESHOLD, PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, + TOS_BUCKET_TYPE_FNS, + TOS_BUCKET_TYPE_HNS, TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate +from tosfs.models import DeletingObject from tosfs.mpu import MultipartUploader from tosfs.retry import retryable_func_executor from tosfs.tag import BucketTagMgr @@ -583,16 +586,7 @@ def exists(self, path: str, **kwargs: Any) -> bool: ) except TosServerError as ex: if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: - resp = retryable_func_executor( - lambda: self.tos_client.list_objects_type2( - bucket, - key.rstrip("/") + "/", - start_after=key.rstrip("/") + "/", - max_keys=1, - ), - max_retry_num=self.max_retry_num, - ) - return len(resp.contents) > 0 + return self._prefix_search_for_exists(bucket, key) else: raise ex else: @@ -1363,54 +1357,97 @@ def _rm(self, path: str) -> None: ######################## private methods ######################## def _list_and_batch_delete_objs(self, bucket: str, key: str) -> None: + bucket_type = self._get_bucket_type(bucket) is_truncated = True continuation_token = "" all_results = [] - class DeletingObject: - def __init__(self, key: str, version_id: Optional[str] = None): - self.key = key - self.version_id = version_id - - def delete_objects(deleting_objects: List[DeletingObject]) -> None: - delete_resp = retryable_func_executor( - lambda: self.tos_client.delete_multi_objects( - bucket, deleting_objects, quiet=True + def list_objects(continuation_token: str = "") -> ListObjectType2Output: + return retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, ), max_retry_num=self.max_retry_num, ) - if delete_resp.error: - for d in delete_resp.error: - logger.warning("Deleted object: %s failed", d) + + if bucket_type == TOS_BUCKET_TYPE_FNS: + while is_truncated: + resp = list_objects(continuation_token) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + all_results = resp.contents + + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] + + if deleting_objects: + self._delete_objects(bucket, deleting_objects) + elif bucket_type == TOS_BUCKET_TYPE_HNS: + all_results = self._list_and_collect_objects( + bucket, bucket_type, key.rstrip("/") + "/" + ) + if all_results: + self._delete_objects(bucket, all_results) + else: + raise ValueError(f"Unsupported bucket type: {bucket_type}") + + def _delete_objects( + self, bucket: str, deleting_objects: list[DeletingObject] + ) -> None: + delete_resp = retryable_func_executor( + lambda: self.tos_client.delete_multi_objects( + bucket, deleting_objects, quiet=True + ), + max_retry_num=self.max_retry_num, + ) + if delete_resp.error: + for d in delete_resp.error: + logger.warning("Deleted object: %s failed", d) + + def _list_and_collect_objects( + self, bucket: str, bucket_type: str, prefix: str + ) -> List[DeletingObject]: + collected_objects: list[DeletingObject] = [] + is_truncated = True + continuation_token = "" while is_truncated: def _call_list_objects_type2( - continuation_token: str = continuation_token, + continuation_token: str = continuation_token, prefix: str = prefix ) -> ListObjectType2Output: return self.tos_client.list_objects_type2( bucket, - prefix=key.rstrip("/") + "/", + prefix=prefix, max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, continuation_token=continuation_token, + delimiter="/" if bucket_type == TOS_BUCKET_TYPE_HNS else None, ) resp = retryable_func_executor( _call_list_objects_type2, - args=(continuation_token,), max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token - all_results = resp.contents - - deleting_objects = [ + collected_objects.extend( DeletingObject(o.key if hasattr(o, "key") else o.prefix) - for o in all_results - ] + for o in resp.contents + ) + if bucket_type == TOS_BUCKET_TYPE_HNS: + for common_prefix in resp.common_prefixes: + collected_objects.extend( + self._list_and_collect_objects( + bucket, bucket_type, common_prefix + ) + ) - if deleting_objects: - delete_objects(deleting_objects) + return collected_objects def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. @@ -1944,6 +1981,54 @@ def _call_list_objects_type2( return all_results + def _prefix_search_for_exists(self, bucket: str, key: str) -> bool: + bucket_type = self._get_bucket_type(bucket) + + if bucket_type == TOS_BUCKET_TYPE_FNS: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + start_after=key.rstrip("/") + "/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + return len(resp.contents) > 0 + elif bucket_type == TOS_BUCKET_TYPE_HNS: + + def search_in_common_prefixes(bucket: str, prefix: str) -> bool: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix, + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + for common_prefix in resp.common_prefixes: + if search_in_common_prefixes(bucket, common_prefix): + return True + return False + + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + return search_in_common_prefixes(bucket, key.rstrip("/") + "/") + else: + raise ValueError(f"Unsupported bucket type {bucket_type}") + def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: """Normalise tos path string into bucket and key. @@ -1978,6 +2063,13 @@ def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: version_id if self.version_aware and version_id else None, ) + def _get_bucket_type(self, bucket: str) -> str: + bucket_type = self.tos_client._get_bucket_type(bucket) + if not bucket_type: + return TOS_BUCKET_TYPE_FNS + + return bucket_type + def _init_tag_manager(self) -> None: auth = self.tos_client.auth if isinstance(auth, CredentialProviderAuth): diff --git a/tosfs/models.py b/tosfs/models.py new file mode 100644 index 0000000..956f679 --- /dev/null +++ b/tosfs/models.py @@ -0,0 +1,25 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The module contains models for the tosfs package.""" +from typing import Optional + + +class DeletingObject: + """Object to be deleted.""" + + def __init__(self, key: str, version_id: Optional[str] = None): + """Initialize a DeletingObject for batch delete.""" + self.key = key + self.version_id = version_id