Skip to content

Commit

Permalink
Infra: introduce retry func warpper
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 12, 2024
1 parent 6f86cc1 commit 69900a0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
66 changes: 33 additions & 33 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.stability import retryable_func
from tosfs.stability import retryable_func_executor
from tosfs.utils import find_bucket_key, get_brange

logger = logging.getLogger("tosfs")
Expand Down Expand Up @@ -391,7 +391,7 @@ def rmdir(self, path: str) -> None:
if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0:
raise TosfsError(f"Directory {path} is not empty.")

retryable_func(
retryable_func_executor(
lambda: self.tos_client.delete_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -465,7 +465,7 @@ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
# here we need to create the parent directory recursively
self.mkdir(parent, create_parents=True)

retryable_func(
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
Expand All @@ -474,7 +474,7 @@ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
if not self.exists(parent):
raise FileNotFoundError(f"Parent directory {parent} does not exist.")
else:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -537,7 +537,7 @@ def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None:
if not truncate and self.exists(path):
raise FileExistsError(f"File {path} already exists.")

retryable_func(
retryable_func_executor(
lambda: self.tos_client.put_object(bucket, key),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -578,7 +578,7 @@ def isdir(self, path: str) -> bool:
key = key.rstrip("/") + "/"

try:
return retryable_func(
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -614,7 +614,7 @@ def isfile(self, path: str) -> bool:
return False

try:
return retryable_func(
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -689,7 +689,7 @@ def put_file(
with open(lpath, "rb") as f:
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
retryable_func(
retryable_func_executor(
lambda: self.tos_client.put_object(
bucket,
key,
Expand All @@ -699,19 +699,19 @@ def put_file(
max_retry_num=self.max_retry_num,
)
else:
mpu = retryable_func(
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(
bucket, key, content_type=content_type
),
max_retry_num=self.max_retry_num,
)
retryable_func(
retryable_func_executor(
lambda: self.tos_client.upload_part_from_file(
bucket, key, mpu.upload_id, file_path=lpath, part_number=1
),
max_retry_num=self.max_retry_num,
)
retryable_func(
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket, key, mpu.upload_id, complete_all=True
),
Expand Down Expand Up @@ -1129,7 +1129,7 @@ def _call_list_objects_type2(
continuation_token=continuation_token,
)

resp = retryable_func(
resp = retryable_func_executor(
_call_list_objects_type2,
args=(continuation_token,),
max_retry_num=self.max_retry_num,
Expand All @@ -1144,7 +1144,7 @@ def _call_list_objects_type2(
]

if deleting_objects:
delete_resp = retryable_func(
delete_resp = retryable_func_executor(
lambda: self.tos_client.delete_multi_objects(
bucket, deleting_objects, quiet=True
),
Expand All @@ -1164,7 +1164,7 @@ def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None:
if ver2:
raise ValueError("Cannot copy to a versioned file!")

retryable_func(
retryable_func_executor(
lambda: self.tos_client.copy_object(
bucket=buc2,
key=key2,
Expand All @@ -1185,7 +1185,7 @@ def _copy_etag_preserved(
upload_id = None

try:
mpu = retryable_func(
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(bucket2, key2),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -1216,7 +1216,7 @@ def _call_upload_part_copy(
copy_source_range_end=brange_last,
)

part = retryable_func(
part = retryable_func_executor(
_call_upload_part_copy,
args=(i, brange_first, brange_last),
max_retry_num=self.max_retry_num,
Expand All @@ -1233,14 +1233,14 @@ def _call_upload_part_copy(
)
brange_first += part_size

retryable_func(
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket2, key2, upload_id, parts
),
max_retry_num=self.max_retry_num,
)
except Exception as e:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.abort_multipart_upload(
bucket2, key2, upload_id
),
Expand Down Expand Up @@ -1272,7 +1272,7 @@ def _copy_managed(
upload_id = None

try:
mpu = retryable_func(
mpu = retryable_func_executor(
lambda: self.tos_client.create_multipart_upload(bucket2, key2),
max_retry_num=self.max_retry_num,
)
Expand All @@ -1293,7 +1293,7 @@ def _call_upload_part_copy(
)

out = [
retryable_func(
retryable_func_executor(
_call_upload_part_copy,
args=(i, brange_first, brange_last),
max_retry_num=self.max_retry_num,
Expand All @@ -1313,14 +1313,14 @@ def _call_upload_part_copy(
for i, o in enumerate(out)
]

retryable_func(
retryable_func_executor(
lambda: self.tos_client.complete_multipart_upload(
bucket2, key2, upload_id, parts
),
max_retry_num=self.max_retry_num,
)
except Exception as e:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.abort_multipart_upload(
bucket2, key2, upload_id
),
Expand Down Expand Up @@ -1367,7 +1367,7 @@ def _open_remote_file(
range_start: int,
**kwargs: Any,
) -> Tuple[BinaryIO, int]:
resp = retryable_func(
resp = retryable_func_executor(
lambda: self.tos_client.get_object(
bucket,
key,
Expand Down Expand Up @@ -1411,7 +1411,7 @@ def _bucket_info(self, bucket: str) -> dict:
"""
try:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.head_bucket(bucket),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -1464,7 +1464,7 @@ def _object_info(
"""
try:
out = retryable_func(
out = retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key, version_id=version_id),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -1494,7 +1494,7 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict
try:
# We check to see if the path is a directory by attempting to list its
# contexts. If anything is found, it is indeed a directory
out = retryable_func(
out = retryable_func_executor(
lambda: self.tos_client.list_objects_type2(
bucket,
prefix=key.rstrip("/") + "/" if key else "",
Expand Down Expand Up @@ -1597,7 +1597,7 @@ def _exists_bucket(self, bucket: str) -> bool:
"""
try:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.head_bucket(bucket),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -1652,7 +1652,7 @@ def _exists_object(
"""
try:
return retryable_func(
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -1692,7 +1692,7 @@ def _lsbuckets(self) -> List[dict]:
"""
try:
resp = retryable_func(
resp = retryable_func_executor(
lambda: self.tos_client.list_buckets(), max_retry_num=self.max_retry_num
)
except (TosClientError, TosServerError) as e:
Expand Down Expand Up @@ -1843,7 +1843,7 @@ def _call_list_object_versions(
version_id_marker=version_id_marker,
)

resp = retryable_func(
resp = retryable_func_executor(
_call_list_object_versions,
args=(key_marker, version_id_marker),
max_retry_num=self.max_retry_num,
Expand Down Expand Up @@ -1872,7 +1872,7 @@ def _call_list_objects_type2(
continuation_token=continuation_token,
)

resp = retryable_func(
resp = retryable_func_executor(
_call_list_objects_type2,
args=(continuation_token,),
max_retry_num=self.max_retry_num,
Expand All @@ -1892,7 +1892,7 @@ def _rm(self, path: str) -> None:
key = key.rstrip("/") + "/"

try:
retryable_func(
retryable_func_executor(
lambda: self.tos_client.delete_object(bucket, key),
max_retry_num=self.max_retry_num,
)
Expand Down Expand Up @@ -2164,7 +2164,7 @@ def fetch() -> bytes:
bucket, key, version_id, range_start=start, range_end=end
).read()

return retryable_func(fetch, max_retry_num=self.fs.max_retry_num)
return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)

def commit(self) -> None:
"""Complete multipart upload or PUT."""
Expand Down
2 changes: 1 addition & 1 deletion tosfs/stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
MAX_RETRY_NUM = 20


def retryable_func(
def retryable_func_executor(
func: Any,
*,
args: Tuple[Any, ...] = (),
Expand Down

0 comments on commit 69900a0

Please sign in to comment.