Skip to content

Commit

Permalink
Infra: introduce retry func warpper
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 11, 2024
1 parent 53987fd commit e83a91e
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 64 deletions.
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pytest = "==8.1.1"
pytest-cov = "==5.0.0"
coverage = "==7.5.0"
ruff = "==0.6.0"
types-requests = "==2.32.0.20240907"

[tool.pydocstyle]
convention = "numpy"
Expand Down
15 changes: 4 additions & 11 deletions tosfs/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
"""The module contains constants for the tosfs package."""

# Tos server response codes
TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404

TOS_SERVER_RETRYABLE_ERROR_CODE_SET = {
"IncompleteBody",
"ExceedAccountQPSLimit",
"ExceedAccountRateLimit",
"ExceedBucketQPSLimit",
"ExceedBucketRateLimit",
"InternalError",
"ServiceUnavailable",
}
TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND = 404
# TOS_SERVER_RESPONSE_CODE_INTERNAL_SERVER_ERROR = 500
# TOS_SERVER_RESPONSE_CODE_TOO_MANY_REQUESTS = 429


MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30
MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20
Expand Down
34 changes: 19 additions & 15 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
PART_MAX_SIZE,
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD,
RETRY_NUM,
TOS_SERVER_RESPONSE_CODE_NOT_FOUND,
TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND,
TOSFS_LOG_FORMAT,
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.utils import find_bucket_key, get_brange, retryable_func_wrapper
from tosfs.stability import retryable_func
from tosfs.utils import find_bucket_key, get_brange

logger = logging.getLogger("tosfs")

Expand Down Expand Up @@ -386,12 +387,15 @@ def rmdir(self, path: str) -> None:
if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0:
raise TosfsError(f"Directory {path} is not empty.")

try:
self.tos_client.delete_object(bucket, key.rstrip("/") + "/")
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
def _call_delete_object() -> None:
try:
self.tos_client.delete_object(bucket, key.rstrip("/") + "/")
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

retryable_func(_call_delete_object)

def rm(
self, path: str, recursive: bool = False, maxdepth: Optional[int] = None
Expand Down Expand Up @@ -582,7 +586,7 @@ def isdir(self, path: str) -> bool:
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
Expand Down Expand Up @@ -617,7 +621,7 @@ def isfile(self, path: str) -> bool:
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
return False
raise e
except Exception as e:
Expand Down Expand Up @@ -1350,7 +1354,7 @@ def _bucket_info(self, bucket: str) -> dict:
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
raise FileNotFoundError(bucket) from e
else:
raise e
Expand Down Expand Up @@ -1409,7 +1413,7 @@ def _object_info(
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
pass
else:
raise e
Expand Down Expand Up @@ -1527,7 +1531,7 @@ def _exists_bucket(self, bucket: str) -> bool:
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
Expand Down Expand Up @@ -1579,7 +1583,7 @@ def _exists_object(
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESP_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
Expand Down Expand Up @@ -2063,7 +2067,7 @@ def fetch() -> bytes:
bucket, key, version_id, range_start=start, range_end=end
).read()

return retryable_func_wrapper(fetch, retries=RETRY_NUM)
return retryable_func(fetch)

def commit(self) -> None:
"""Complete multipart upload or PUT."""
Expand Down
106 changes: 106 additions & 0 deletions tosfs/stability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# ByteDance Volcengine EMR, Copyright 2024.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The module contains utility functions for the tosfs stability."""

import time
from typing import Any, Optional

import requests
from requests.exceptions import (
ConnectTimeout,
HTTPError,
ProxyError,
ReadTimeout,
RetryError,
SSLError,
StreamConsumedError,
Timeout,
)
from tos.exceptions import TosClientError, TosError, TosServerError

from tosfs.exceptions import TosfsError

TOS_SERVER_RETRYABLE_CODES = {
"404", # NOT_FOUND
"409", # CONFLICT
"429", # TOO_MANY_REQUESTS
"500", # INTERNAL_SERVER_ERROR
}

TOS_CLIENT_RETRYABLE_EXCEPTIONS = {
HTTPError,
requests.ConnectionError,
ProxyError,
SSLError,
Timeout,
ConnectTimeout,
ReadTimeout,
StreamConsumedError,
RetryError,
InterruptedError,
}

MAX_RETRY_NUM = 20


def retryable_func(
func: Any,
*,
args: tuple[()] = (),
kwargs: Optional[Any] = None,
max_retry_num: int = MAX_RETRY_NUM,
) -> Any:
"""Retry a function in case of catch errors."""
if kwargs is None:
kwargs = {}

attempt = 0

while attempt < max_retry_num:
attempt += 1
try:
return func(*args, **kwargs)
except TosError as e:
from tosfs.core import logger

if attempt >= max_retry_num:
logger.error("Retry exhausted after %d times.", max_retry_num)
raise e

if is_retryable_exception(e):
logger.warn("Retry TOS request in the %d times, error: %s", attempt, e)
try:
time.sleep(min(1.7**attempt * 0.1, 15))
except InterruptedError as ie:
raise TosfsError(f"Request {func} interrupted.") from ie
else:
raise e
except Exception as e:
raise TosfsError(f"{e}") from e


def is_retryable_exception(e: TosError) -> bool:
"""Check if the exception is retryable."""
return _is_retryable_tos_server_exception(e) or _is_retryable_tos_client_exception(
e
)


def _is_retryable_tos_server_exception(e: TosError) -> bool:
return isinstance(e, TosServerError) and e.code in TOS_SERVER_RETRYABLE_CODES


def _is_retryable_tos_client_exception(e: TosError) -> bool:
return isinstance(e, TosClientError) and e.cause in TOS_CLIENT_RETRYABLE_EXCEPTIONS
38 changes: 1 addition & 37 deletions tosfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,7 @@
import re
import string
import tempfile
import time
from typing import Any, Generator, Optional, Tuple

from tos.exceptions import TosServerError

from tosfs.consts import TOS_SERVER_RETRYABLE_ERROR_CODE_SET
from typing import Generator, Tuple


def random_str(length: int = 5) -> str:
Expand Down Expand Up @@ -103,34 +98,3 @@ def get_brange(size: int, block: int) -> Generator[Tuple[int, int], None, None]:
"""
for offset in range(0, size, block):
yield offset, min(offset + block - 1, size - 1)


def retryable_func_wrapper(
func: Any, *, args: tuple[()] = (), kwargs: Optional[Any] = None, retries: int = 5
) -> Any:
"""Retry a function in case of server errors."""
if kwargs is None:
kwargs = {}

err = None

for i in range(retries):
try:
return func(*args, **kwargs)
except TosServerError as e:
err = e
from tosfs.core import logger

logger.debug("Server error (maybe retryable): %s", e)
if e.code in TOS_SERVER_RETRYABLE_ERROR_CODE_SET:
time.sleep(min(1.7**i * 0.1, 15))
else:
break
except Exception as e:
err = e
from tosfs.core import logger

logger.debug("Nonretryable error: %s", e)
break

raise err if err is not None else ""

0 comments on commit e83a91e

Please sign in to comment.