diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index db13723..23708d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: matrix: python-version: ["3.9", "3.10", "3.11", "3.12"] fsspec-version: ["2023.5.0", "2024.9.0"] + bucket-name: ["proton-ci", "proton-ci-hns"] steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} @@ -59,5 +60,6 @@ jobs: echo "TOS_REGION=${{ vars.TOS_REGION }}" >> $GITHUB_ENV echo "TOS_ENDPOINT=${{ vars.TOS_ENDPOINT }}" >> $GITHUB_ENV echo "TOSFS_LOGGING_LEVEL=${{ vars.TOSFS_LOGGING_LEVEL }}" >> $GITHUB_ENV + echo "TOS_BUCKET=${{ matrix.bucket-name }}" >> $GITHUB_ENV - name: Run tests run: make test diff --git a/tosfs/consts.py b/tosfs/consts.py index 398f3bf..4f405b8 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -17,8 +17,12 @@ # Tos server response status codes TOS_SERVER_STATUS_CODE_NOT_FOUND = 404 -MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 # 5GB -MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 # 5MB +# tos bucket type (hns, fns) +TOS_BUCKET_TYPE_HNS = "hns" +TOS_BUCKET_TYPE_FNS = "fns" + +MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30 +MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20 RETRY_NUM = 5 PART_MIN_SIZE = 5 * 2**20 diff --git a/tosfs/core.py b/tosfs/core.py index 5dad01a..81c866f 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -48,13 +48,16 @@ MPU_PART_SIZE_THRESHOLD, PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, + TOS_BUCKET_TYPE_FNS, + TOS_BUCKET_TYPE_HNS, TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate +from tosfs.models import DeletingObject from tosfs.mpu import MultipartUploader -from tosfs.retry import retryable_func_executor +from tosfs.retry import CONFLICT_CODE, retryable_func_executor from tosfs.tag import BucketTagMgr from tosfs.utils import find_bucket_key, get_brange @@ -519,10 +522,23 @@ def info( if not key: return self._bucket_info(bucket) - if info := self._object_info(bucket, key, version_id): - return info + bucket_type = self._get_bucket_type(bucket) + if bucket_type == TOS_BUCKET_TYPE_FNS: + result = self._object_info(bucket, key, version_id) - return self._get_dir_info(bucket, key, path, fullpath) + if not result: + result = self._get_dir_info(bucket, key, fullpath) + else: + # Priority is given to judging dir, followed by file. + result = self._get_dir_info(bucket, key, fullpath) + + if not result: + result = self._object_info(bucket, key, version_id) + + if not result: + raise FileNotFoundError(f"Can not get information for path: {path}") + + return result def exists(self, path: str, **kwargs: Any) -> bool: """Check if a path exists in the TOS. @@ -584,16 +600,7 @@ def exists(self, path: str, **kwargs: Any) -> bool: ) except TosServerError as ex: if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: - resp = retryable_func_executor( - lambda: self.tos_client.list_objects_type2( - bucket, - key.rstrip("/") + "/", - start_after=key.rstrip("/") + "/", - max_keys=1, - ), - max_retry_num=self.max_retry_num, - ) - return len(resp.contents) > 0 + return self._prefix_search_for_exists(bucket, key) else: raise ex else: @@ -837,14 +844,24 @@ def isdir(self, path: str) -> bool: key = key.rstrip("/") + "/" try: - return retryable_func_executor( - lambda: self.tos_client.head_object(bucket, key) and True, + resp = retryable_func_executor( + lambda: self.tos_client.head_object(bucket, key), max_retry_num=self.max_retry_num, ) + if self._is_fns_bucket(bucket): + return True + else: + if "x-tos-directory" not in resp.header._store: + return False + return resp.header._store["x-tos-directory"][1].lower() == "true" except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND or ( + self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS + and e.status_code == CONFLICT_CODE + and e.header._store["x-tos-ec"][1] == "0026-00000020" + ): return False else: raise e @@ -873,10 +890,16 @@ def isfile(self, path: str) -> bool: return False try: - return retryable_func_executor( - lambda: self.tos_client.head_object(bucket, key) and True, + resp = retryable_func_executor( + lambda: self.tos_client.head_object(bucket, key), max_retry_num=self.max_retry_num, ) + if self._is_fns_bucket(bucket): + return True + else: + if "x-tos-directory" not in resp.header._store: + return True + return resp.header._store["x-tos-directory"][1].lower() != "true" except TosClientError as e: raise e except TosServerError as e: @@ -1245,14 +1268,21 @@ def cp_file( If there is an unknown error while copying the file. """ + path1 = self._strip_protocol(path1) + path2 = self._strip_protocol(path2) if path1 == path2: logger.warning("Source and destination are the same: %s", path1) return - path1 = self._strip_protocol(path1) + if self.isdir(path1) and self.isdir(path2): + return + bucket, key, vers = self._split_path(path1) info = self.info(path1, bucket, key, version_id=vers) + if not info: + raise FileNotFoundError(f"Can not get information for path: {path1}") + if info["type"] == "directory": logger.warning("Do not support copy directory %s.", path1) return @@ -1373,16 +1403,53 @@ def _rm(self, path: str) -> None: ######################## private methods ######################## def _list_and_batch_delete_objs(self, bucket: str, key: str) -> None: + bucket_type = self._get_bucket_type(bucket) is_truncated = True continuation_token = "" all_results = [] - class DeletingObject: - def __init__(self, key: str, version_id: Optional[str] = None): - self.key = key - self.version_id = version_id + if bucket_type == TOS_BUCKET_TYPE_FNS: + + def _call_list_objects( + continuation_token: str = "", + ) -> ListObjectType2Output: + return retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=key.rstrip("/") + "/", + max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, + continuation_token=continuation_token, + ), + max_retry_num=self.max_retry_num, + ) + + while is_truncated: + resp = _call_list_objects(continuation_token) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + all_results = resp.contents + + deleting_objects = [ + DeletingObject(o.key if hasattr(o, "key") else o.prefix) + for o in all_results + ] + + if deleting_objects: + self._delete_objects(bucket, deleting_objects) + elif bucket_type == TOS_BUCKET_TYPE_HNS: + all_results = self._list_and_collect_objects( + bucket, bucket_type, key.rstrip("/") + "/" + ) + if all_results: + self._delete_objects(bucket, all_results) + else: + raise ValueError(f"Unsupported bucket type: {bucket_type}") - def delete_objects(deleting_objects: List[DeletingObject]) -> None: + def _delete_objects( + self, bucket: str, deleting_objects: list[DeletingObject] + ) -> None: + bucket_type = self._get_bucket_type(bucket) + if bucket_type == TOS_BUCKET_TYPE_FNS: delete_resp = retryable_func_executor( lambda: self.tos_client.delete_multi_objects( bucket, deleting_objects, quiet=True @@ -1392,35 +1459,71 @@ def delete_objects(deleting_objects: List[DeletingObject]) -> None: if delete_resp.error: for d in delete_resp.error: logger.warning("Deleted object: %s failed", d) + else: + + def _call_delete_object(obj: DeletingObject) -> None: + retryable_func_executor( + lambda: self.tos_client.delete_object(bucket, obj.key), + max_retry_num=self.max_retry_num, + ) + + # Preferentially delete subpaths with longer keys + for obj in sorted(deleting_objects, key=lambda x: len(x.key), reverse=True): + _call_delete_object(obj) + + def _list_and_collect_objects( + self, + bucket: str, + bucket_type: str, + prefix: str, + collected_objects: Optional[List[DeletingObject]] = None, + ) -> List[DeletingObject]: + + if collected_objects is None: + collected_objects = [] + + collected_keys = {obj.key for obj in collected_objects} + + is_truncated = True + continuation_token = "" while is_truncated: def _call_list_objects_type2( - continuation_token: str = continuation_token, + continuation_token: str = continuation_token, prefix: str = prefix ) -> ListObjectType2Output: return self.tos_client.list_objects_type2( bucket, - prefix=key.rstrip("/") + "/", + prefix=prefix, max_keys=LS_OPERATION_DEFAULT_MAX_ITEMS, continuation_token=continuation_token, + delimiter="/" if bucket_type == TOS_BUCKET_TYPE_HNS else None, ) resp = retryable_func_executor( _call_list_objects_type2, - args=(continuation_token,), max_retry_num=self.max_retry_num, ) is_truncated = resp.is_truncated continuation_token = resp.next_continuation_token - all_results = resp.contents - deleting_objects = [ - DeletingObject(o.key if hasattr(o, "key") else o.prefix) - for o in all_results - ] + for obj in resp.contents: + key = obj.key if hasattr(obj, "key") else obj.prefix + if key not in collected_keys: + collected_objects.append(DeletingObject(key=key)) + collected_keys.add(key) + + for common_prefix in resp.common_prefixes: + key = common_prefix.prefix + if key not in collected_keys: + collected_objects.append(DeletingObject(key=key)) + collected_keys.add(key) + if bucket_type == TOS_BUCKET_TYPE_HNS: + self._list_and_collect_objects( + bucket, bucket_type, common_prefix.prefix, collected_objects + ) - if deleting_objects: - delete_objects(deleting_objects) + return collected_objects def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: """Copy file between locations on tos. @@ -1600,13 +1703,18 @@ def _find_file_dir( self, key: str, path: str, prefix: str, withdirs: bool, kwargs: Any ) -> List[dict]: out = self._ls_dirs_and_files( - path, delimiter="", include_self=True, prefix=prefix, **kwargs + path, + delimiter="", + include_self=True, + prefix=prefix, + recursive=True, ) if not out and key: try: out = [self.info(path)] except FileNotFoundError: out = [] + dirs = { self._parent(o["name"]): { "Key": self._parent(o["name"]).rstrip("/"), @@ -1699,7 +1807,7 @@ def _bucket_info(self, bucket: str) -> dict: def _object_info( self, bucket: str, key: str, version_id: Optional[str] = None - ) -> dict: + ) -> Optional[dict]: """Get the information of an object. Parameters @@ -1752,16 +1860,20 @@ def _object_info( except TosClientError as e: raise e except TosServerError as e: - if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND or ( + self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS + and e.status_code == CONFLICT_CODE + and e.header._store["x-tos-ec"][1] == "0026-00000020" + ): pass else: raise e except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e - return {} + return None - def _get_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict: + def _get_dir_info(self, bucket: str, key: str, fullpath: str) -> Optional[dict]: try: # We check to see if the path is a directory by attempting to list its # contexts. If anything is found, it is indeed a directory @@ -1784,8 +1896,8 @@ def _get_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict "type": "directory", } - raise FileNotFoundError(path) - except (TosClientError, TosServerError, FileNotFoundError) as e: + return None + except (TosClientError, TosServerError) as e: raise e except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e @@ -1881,6 +1993,7 @@ def _ls_dirs_and_files( prefix: str = "", include_self: bool = False, versions: bool = False, + recursive: bool = False, ) -> List[dict]: bucket, key, _ = self._split_path(path) if not prefix: @@ -1891,6 +2004,8 @@ def _ls_dirs_and_files( logger.debug("Get directory listing for %s", path) dirs = [] files = [] + seen_names = set() + for obj in self._ls_objects( bucket, max_items=max_items, @@ -1898,13 +2013,26 @@ def _ls_dirs_and_files( prefix=prefix, include_self=include_self, versions=versions, + recursive=recursive, ): - if isinstance(obj, CommonPrefixInfo) and delimiter == "/": - dirs.append(self._fill_dir_info(bucket, obj)) + if isinstance(obj, CommonPrefixInfo): + dir_info = self._fill_dir_info(bucket, obj) + dir_name = dir_info["name"] + if dir_name not in seen_names: + dirs.append(dir_info) + seen_names.add(dir_name) elif obj.key.endswith("/"): - dirs.append(self._fill_dir_info(bucket, None, obj.key)) + dir_info = self._fill_dir_info(bucket, None, obj.key) + dir_name = dir_info["name"] + if dir_name not in seen_names: + dirs.append(dir_info) + seen_names.add(dir_name) else: - files.append(self._fill_file_info(obj, bucket, versions)) + file_info = self._fill_file_info(obj, bucket, versions) + file_name = file_info["name"] + if file_name not in seen_names: + files.append(file_info) + seen_names.add(file_name) files += dirs return files @@ -1917,6 +2045,7 @@ def _ls_objects( prefix: str = "", include_self: bool = False, versions: bool = False, + recursive: bool = False, ) -> List[Union[CommonPrefixInfo, ListedObject, ListedObjectVersion]]: if versions: raise ValueError( @@ -1924,35 +2053,112 @@ def _ls_objects( "not version aware." ) + bucket_type = self._get_bucket_type(bucket) all_results = [] - is_truncated = True - continuation_token = "" - while is_truncated: + if recursive and bucket_type == TOS_BUCKET_TYPE_HNS: - def _call_list_objects_type2( - continuation_token: str = continuation_token, - ) -> ListObjectType2Output: - return self.tos_client.list_objects_type2( - bucket, - prefix, - start_after=prefix if not include_self else None, - delimiter=delimiter, - max_keys=max_items, - continuation_token=continuation_token, + def _recursive_list(bucket: str, prefix: str) -> None: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix=prefix, + delimiter="/", + max_keys=max_items, + ), + max_retry_num=self.max_retry_num, + ) + + all_results.extend(resp.contents + resp.common_prefixes) + for common_prefix in resp.common_prefixes: + _recursive_list(bucket, common_prefix.prefix) + + _recursive_list(bucket, prefix) + else: + is_truncated = True + + continuation_token = "" + while is_truncated: + + def _call_list_objects_type2( + continuation_token: str = continuation_token, + ) -> ListObjectType2Output: + return self.tos_client.list_objects_type2( + bucket, + prefix, + start_after=prefix if not include_self else None, + delimiter=delimiter, + max_keys=max_items, + continuation_token=continuation_token, + ) + + resp = retryable_func_executor( + _call_list_objects_type2, + args=(continuation_token,), + max_retry_num=self.max_retry_num, ) + is_truncated = resp.is_truncated + continuation_token = resp.next_continuation_token + + all_results.extend(resp.contents + resp.common_prefixes) + + # if bucket_type == TOS_BUCKET_TYPE_FNS: + # + # elif bucket_type == TOS_BUCKET_TYPE_HNS: + # + + # else: + # raise ValueError(f"Unsupported bucket type: {bucket_type}") + + return all_results + def _prefix_search_for_exists(self, bucket: str, key: str) -> bool: + bucket_type = self._get_bucket_type(bucket) + + if bucket_type == TOS_BUCKET_TYPE_FNS: resp = retryable_func_executor( - _call_list_objects_type2, - args=(continuation_token,), + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + start_after=key.rstrip("/") + "/", + max_keys=1, + ), max_retry_num=self.max_retry_num, ) - is_truncated = resp.is_truncated - continuation_token = resp.next_continuation_token + return len(resp.contents) > 0 + elif bucket_type == TOS_BUCKET_TYPE_HNS: - all_results.extend(resp.contents + resp.common_prefixes) + def search_in_common_prefixes(bucket: str, prefix: str) -> bool: + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + prefix, + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + for common_prefix in resp.common_prefixes: + if search_in_common_prefixes(bucket, common_prefix): + return True + return False - return all_results + resp = retryable_func_executor( + lambda: self.tos_client.list_objects_type2( + bucket, + key.rstrip("/") + "/", + delimiter="/", + max_keys=1, + ), + max_retry_num=self.max_retry_num, + ) + if len(resp.contents) > 0: + return True + return search_in_common_prefixes(bucket, key.rstrip("/") + "/") + else: + raise ValueError(f"Unsupported bucket type {bucket_type}") def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: """Normalise tos path string into bucket and key. @@ -1988,6 +2194,19 @@ def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]: version_id if self.version_aware and version_id else None, ) + def _get_bucket_type(self, bucket: str) -> str: + bucket_type = self.tos_client._get_bucket_type(bucket) + if not bucket_type: + return TOS_BUCKET_TYPE_FNS + + return bucket_type + + def _is_hns_bucket(self, bucket: str) -> bool: + return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_HNS + + def _is_fns_bucket(self, bucket: str) -> bool: + return self._get_bucket_type(bucket) == TOS_BUCKET_TYPE_FNS + def _init_tag_manager(self) -> None: auth = self.tos_client.auth if isinstance(auth, CredentialProviderAuth): diff --git a/tosfs/models.py b/tosfs/models.py new file mode 100644 index 0000000..956f679 --- /dev/null +++ b/tosfs/models.py @@ -0,0 +1,25 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The module contains models for the tosfs package.""" +from typing import Optional + + +class DeletingObject: + """Object to be deleted.""" + + def __init__(self, key: str, version_id: Optional[str] = None): + """Initialize a DeletingObject for batch delete.""" + self.key = key + self.version_id = version_id diff --git a/tosfs/retry.py b/tosfs/retry.py index 4d695ee..4603f8d 100644 --- a/tosfs/retry.py +++ b/tosfs/retry.py @@ -138,7 +138,14 @@ def _is_retryable_tos_server_exception(e: TosError) -> bool: # not all conflict errors are retryable if e.status_code == CONFLICT_CODE: - return e.ec not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + return ( + e.ec not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + and + # TODO: currently, hack for supporting hns, + # need to refactor when tos python sdk GA + e.header._store["x-tos-ec"][1] + not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + ) return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 8b72379..6432109 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -114,10 +114,10 @@ def test_info(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> No assert tosfs.info("") == {"name": "", "size": 0, "type": "directory"} assert tosfs.info("/") == {"name": "/", "size": 0, "type": "directory"} assert tosfs.info(bucket) == { - "Key": "proton-ci", + "Key": bucket, "Size": 0, "StorageClass": "BUCKET", - "name": "proton-ci", + "name": bucket, "size": 0, "type": "directory", } @@ -819,11 +819,24 @@ def test_file_write_append( content = "hello world" with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: f.write(content) - with pytest.raises(TosServerError): + + if tosfs._is_fns_bucket(bucket): + with pytest.raises(TosServerError): + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: + f.write(content) + else: with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: f.write(content) + assert tosfs.info(f"{bucket}/{temporary_workspace}/{file_name}")[ + "size" + ] == 2 * len(content) + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == content + content another_file = random_str() + if tosfs._is_hns_bucket(bucket): + tosfs.touch(f"{bucket}/{temporary_workspace}/{another_file}") + with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: f.write(content) with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: @@ -844,11 +857,20 @@ def test_big_file_append( f.write(content) append_content = "a" * 1024 * 1024 - with pytest.raises(TosServerError): + if tosfs._is_fns_bucket(bucket): + with pytest.raises(TosServerError): + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: + f.write(append_content) + else: with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "a") as f: f.write(append_content) + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == content + append_content + another_file = random_str() + if tosfs._is_hns_bucket(bucket): + tosfs.touch(f"{bucket}/{temporary_workspace}/{another_file}") with tosfs.open(f"{bucket}/{temporary_workspace}/{another_file}", "a") as f: f.write(content)