diff --git a/pyproject.toml b/pyproject.toml index f68ada4..777ac42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ ignore = [ "D213", # multi-line-summary-second-line "PLR0913", # Too many arguments in function definition "SIM105", # Use `contextlib.suppress(IOError)` instead of `try`-`except`-`pass` + "PERF203", # `try`-`except` within a loop incurs performance overhead ] [tool.ruff.lint.per-file-ignores] diff --git a/tosfs/consts.py b/tosfs/consts.py index aa1a4da..01cc726 100644 --- a/tosfs/consts.py +++ b/tosfs/consts.py @@ -16,3 +16,13 @@ # Tos server response codes TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404 + +TOS_SERVER_RETRYABLE_ERROR_CODE_SET = { + "IncompleteBody", + "ExceedAccountQPSLimit", + "ExceedAccountRateLimit", + "ExceedBucketQPSLimit", + "ExceedBucketRateLimit", + "InternalError", + "ServiceUnavailable", +} diff --git a/tosfs/core.py b/tosfs/core.py index 8d9d0c9..710c6e3 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -35,7 +35,7 @@ from tosfs.consts import TOS_SERVER_RESPONSE_CODE_NOT_FOUND from tosfs.exceptions import TosfsError -from tosfs.utils import find_bucket_key +from tosfs.utils import find_bucket_key, retryable_func_wrapper # environment variable names ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL" @@ -1556,6 +1556,26 @@ def handle_remainder( else None ) + def _fetch_range(self, start: int, end: int) -> bytes: + bucket, key, version_id = self.fs._split_path(self.path) + if start == end: + logger.debug( + "skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d", + bucket, + key, + start, + end, + ) + return b"" + logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end) + + def fetch() -> bytes: + return self.fs.tos_client.get_object( + bucket, key, version_id, range_start=start, range_end=end + ).read() + + return retryable_func_wrapper(fetch, retries=self.fs.retries) + def commit(self) -> None: """Complete multipart upload or PUT.""" logger.debug("Commit %s", self) diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 1e16c9a..bac1f88 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -575,3 +575,75 @@ def test_file_write_mpu( ) tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + + +def test_file_read(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: + file_name = random_str() + content = "hello world" + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: + f.write(content) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.read() == content + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "rb") as f: + assert f.read().decode() == content + + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + + +def test_file_read_encdec( + tosfs: TosFileSystem, bucket: str, temporary_workspace: str +) -> None: + file_name = random_str() + content = "你好" + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "wb") as f: + f.write(content.encode("gbk")) + + with tosfs.open( + f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="gbk" + ) as f: + assert f.read() == content + + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + + content = "\u00af\\_(\u30c4)_/\u00af" + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "wb") as f: + f.write(content.encode("utf-16-le")) + + with tosfs.open( + f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="utf-16-le" + ) as f: + assert f.read() == content + + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + + content = "Hello, World!" + with tosfs.open( + f"{bucket}/{temporary_workspace}/{file_name}", "w", encoding="ibm500" + ) as f: + f.write(content) + + with tosfs.open( + f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="ibm500" + ) as f: + assert f.read() == content + + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + + +def test_file_readlines( + tosfs: TosFileSystem, bucket: str, temporary_workspace: str +) -> None: + file_name = random_str() + content = "hello\nworld" + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f: + f.write(content) + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f: + assert f.readlines() == ["hello\n", "world"] + + with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "rb") as f: + assert f.readlines() == [b"hello\n", b"world"] + + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") diff --git a/tosfs/utils.py b/tosfs/utils.py index e9669dd..43d8074 100644 --- a/tosfs/utils.py +++ b/tosfs/utils.py @@ -18,7 +18,12 @@ import re import string import tempfile -from typing import Tuple +import time +from typing import Any, Optional, Tuple + +import tos + +from tosfs.consts import TOS_SERVER_RETRYABLE_ERROR_CODE_SET def random_str(length: int = 5) -> str: @@ -84,3 +89,34 @@ def find_bucket_key(tos_path: str) -> Tuple[str, str]: if len(tos_components) > 1: tos_key = tos_components[1] return bucket, tos_key + + +def retryable_func_wrapper( + func: Any, *, args: tuple[()] = (), kwargs: Optional[Any] = None, retries: int = 5 +) -> Any: + """Retry a function in case of server errors.""" + if kwargs is None: + kwargs = {} + + err = None + + for i in range(retries): + try: + return func(*args, **kwargs) + except tos.exceptions.TosServerError as e: + err = e + from tosfs.core import logger + + logger.debug("Server error (maybe retryable): %s", e) + if e.code in TOS_SERVER_RETRYABLE_ERROR_CODE_SET: + time.sleep(min(1.7**i * 0.1, 15)) + else: + break + except Exception as e: + err = e + from tosfs.core import logger + + logger.debug("Nonretryable error: %s", e) + break + + raise err if err is not None else ""