From 91c6300f6cbc3b7c7416ba69df38092e88d27029 Mon Sep 17 00:00:00 2001 From: vinoyang Date: Fri, 13 Sep 2024 19:40:03 +0800 Subject: [PATCH] Core: Enhance retryability for TosFile (#81) --- tosfs/core.py | 66 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 9584bb1..7de2b35 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -2012,7 +2012,10 @@ def __init__( self.buffer: Optional[io.BytesIO] = io.BytesIO() if "a" in mode and fs.exists(path): - head = self.fs.tos_client.head_object(bucket, key) + head = retryable_func_executor( + lambda: self.fs.tos_client.head_object(bucket, key), + max_retry_num=self.fs.max_retry_num, + ) loc = head.content_length if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD: @@ -2030,16 +2033,22 @@ def _initiate_upload(self) -> None: logger.debug("Initiate upload for %s", self) self.parts = [] - self.mpu = self.fs.tos_client.create_multipart_upload(self.bucket, self.key) + self.mpu = retryable_func_executor( + lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key), + max_retry_num=self.fs.max_retry_num, + ) if self.append_block: # use existing data in key when appending, # and block is big enough - out = self.fs.tos_client.upload_part_copy( - bucket=self.bucket, - key=self.key, - part_number=1, - upload_id=self.mpu.upload_id, + out = retryable_func_executor( + lambda: self.fs.tos_client.upload_part_copy( + bucket=self.bucket, + key=self.key, + part_number=1, + upload_id=self.mpu.upload_id, + ), + max_retry_num=self.fs.max_retry_num, ) self.parts.append({"PartNumber": out.part_number, "ETag": out.etag}) @@ -2123,12 +2132,19 @@ def handle_remainder( part = len(self.parts) + 1 if self.parts is not None else 1 logger.debug("Upload chunk %s, %s", self, part) - out: UploadPartOutput = self.fs.tos_client.upload_part( - bucket=bucket, - key=key, - part_number=part, - upload_id=self.mpu.upload_id, - content=previous_chunk, + def _call_upload_part( + part: int = part, previous_chunk: Optional[bytes] = previous_chunk + ) -> UploadPartOutput: + return self.fs.tos_client.upload_part( + bucket=bucket, + key=key, + part_number=part, + upload_id=self.mpu.upload_id, + content=previous_chunk, + ) + + out = retryable_func_executor( + _call_upload_part, max_retry_num=self.fs.max_retry_num ) ( @@ -2179,15 +2195,24 @@ def commit(self) -> None: logger.debug("One-shot upload of %s", self) self.buffer.seek(0) data = self.buffer.read() - write_result = self.fs.tos_client.put_object( - self.bucket, self.key, content=data + write_result = retryable_func_executor( + lambda: self.fs.tos_client.put_object( + self.bucket, self.key, content=data + ), + max_retry_num=self.fs.max_retry_num, ) else: raise RuntimeError else: logger.debug("Complete multi-part upload for %s ", self) - write_result = self.fs.tos_client.complete_multipart_upload( - self.bucket, self.key, upload_id=self.mpu.upload_id, parts=self.parts + write_result = retryable_func_executor( + lambda: self.fs.tos_client.complete_multipart_upload( + self.bucket, + self.key, + upload_id=self.mpu.upload_id, + parts=self.parts, + ), + max_retry_num=self.fs.max_retry_num, ) if self.fs.version_aware: @@ -2202,7 +2227,10 @@ def discard(self) -> None: def _abort_mpu(self) -> None: if self.mpu: - self.fs.tos_client.abort_multipart_upload( - self.bucket, self.key, self.mpu.upload_id + retryable_func_executor( + lambda: self.fs.tos_client.abort_multipart_upload( + self.bucket, self.key, self.mpu.upload_id + ), + max_retry_num=self.fs.max_retry_num, ) self.mpu = None