Skip to content

Commit

Permalink
Infra: introduce retry func warpper
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 12, 2024
1 parent 44035aa commit 95799f6
Showing 1 changed file with 38 additions and 45 deletions.
83 changes: 38 additions & 45 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,58 +675,51 @@ def put_file(
if "ContentType" not in kwargs:
content_type, _ = mimetypes.guess_type(lpath)

bucket, key, _ = self._split_path(rpath)

try:
if self.isfile(rpath):
self.makedirs(self._parent(rpath), exist_ok=True)
if self.isfile(rpath):
self.makedirs(self._parent(rpath), exist_ok=True)

if self.isdir(rpath):
rpath = os.path.join(rpath, os.path.basename(lpath))
if self.isdir(rpath):
rpath = os.path.join(rpath, os.path.basename(lpath))

bucket, key, _ = self._split_path(rpath)
bucket, key, _ = self._split_path(rpath)

with open(lpath, "rb") as f:
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
self.tos_client.put_object(
with open(lpath, "rb") as f:
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
self.tos_client.put_object(
bucket,
key,
content=chunk,
content_type=content_type,
)
retryable_func(
lambda: self.tos_client.put_object(
bucket,
key,
content=chunk,
content_type=content_type,
)
retryable_func(
lambda: self.tos_client.put_object(
bucket,
key,
content=chunk,
content_type=content_type,
),
max_retry_num=self.max_retry_num,
)
else:
mpu = retryable_func(
lambda: self.tos_client.create_multipart_upload(
bucket, key, content_type=content_type
),
max_retry_num=self.max_retry_num,
)
retryable_func(
lambda: self.tos_client.upload_part_from_file(
bucket, key, mpu.upload_id, file_path=lpath, part_number=1
),
max_retry_num=self.max_retry_num,
)
retryable_func(
lambda: self.tos_client.complete_multipart_upload(
bucket, key, mpu.upload_id, complete_all=True
),
max_retry_num=self.max_retry_num,
)
except (TosClientError, TosServerError) as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e
),
max_retry_num=self.max_retry_num,
)
else:
mpu = retryable_func(
lambda: self.tos_client.create_multipart_upload(
bucket, key, content_type=content_type
),
max_retry_num=self.max_retry_num,
)
retryable_func(
lambda: self.tos_client.upload_part_from_file(
bucket, key, mpu.upload_id, file_path=lpath, part_number=1
),
max_retry_num=self.max_retry_num,
)
retryable_func(
lambda: self.tos_client.complete_multipart_upload(
bucket, key, mpu.upload_id, complete_all=True
),
max_retry_num=self.max_retry_num,
)

def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
"""Get a file from the TOS filesystem and write to a local path.
Expand Down

0 comments on commit 95799f6

Please sign in to comment.