From 95799f63786388f897a045f0e5c4e332762b7d69 Mon Sep 17 00:00:00 2001 From: yanghua Date: Thu, 12 Sep 2024 14:06:41 +0800 Subject: [PATCH] Infra: introduce retry func warpper --- tosfs/core.py | 83 +++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 20ffab0..8bc9eaf 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -675,58 +675,51 @@ def put_file( if "ContentType" not in kwargs: content_type, _ = mimetypes.guess_type(lpath) - bucket, key, _ = self._split_path(rpath) - - try: - if self.isfile(rpath): - self.makedirs(self._parent(rpath), exist_ok=True) + if self.isfile(rpath): + self.makedirs(self._parent(rpath), exist_ok=True) - if self.isdir(rpath): - rpath = os.path.join(rpath, os.path.basename(lpath)) + if self.isdir(rpath): + rpath = os.path.join(rpath, os.path.basename(lpath)) - bucket, key, _ = self._split_path(rpath) + bucket, key, _ = self._split_path(rpath) - with open(lpath, "rb") as f: - if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): - chunk = f.read() - self.tos_client.put_object( + with open(lpath, "rb") as f: + if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): + chunk = f.read() + self.tos_client.put_object( + bucket, + key, + content=chunk, + content_type=content_type, + ) + retryable_func( + lambda: self.tos_client.put_object( bucket, key, content=chunk, content_type=content_type, - ) - retryable_func( - lambda: self.tos_client.put_object( - bucket, - key, - content=chunk, - content_type=content_type, - ), - max_retry_num=self.max_retry_num, - ) - else: - mpu = retryable_func( - lambda: self.tos_client.create_multipart_upload( - bucket, key, content_type=content_type - ), - max_retry_num=self.max_retry_num, - ) - retryable_func( - lambda: self.tos_client.upload_part_from_file( - bucket, key, mpu.upload_id, file_path=lpath, part_number=1 - ), - max_retry_num=self.max_retry_num, - ) - retryable_func( - lambda: self.tos_client.complete_multipart_upload( - bucket, key, mpu.upload_id, complete_all=True - ), - max_retry_num=self.max_retry_num, - ) - except (TosClientError, TosServerError) as e: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + ), + max_retry_num=self.max_retry_num, + ) + else: + mpu = retryable_func( + lambda: self.tos_client.create_multipart_upload( + bucket, key, content_type=content_type + ), + max_retry_num=self.max_retry_num, + ) + retryable_func( + lambda: self.tos_client.upload_part_from_file( + bucket, key, mpu.upload_id, file_path=lpath, part_number=1 + ), + max_retry_num=self.max_retry_num, + ) + retryable_func( + lambda: self.tos_client.complete_multipart_upload( + bucket, key, mpu.upload_id, complete_all=True + ), + max_retry_num=self.max_retry_num, + ) def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: """Get a file from the TOS filesystem and write to a local path.