Skip to content

Commit

Permalink
Core: Enhance retryability for TosFile
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 13, 2024
1 parent a1a25fb commit c9cf8c7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
66 changes: 47 additions & 19 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,10 @@ def __init__(
self.buffer: Optional[io.BytesIO] = io.BytesIO()

if "a" in mode and fs.exists(path):
head = self.fs.tos_client.head_object(bucket, key)
head = retryable_func_executor(
lambda: self.fs.tos_client.head_object(bucket, key),
max_retry_num=self.fs.max_retry_num,
)
loc = head.content_length

if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD:
Expand All @@ -2030,16 +2033,22 @@ def _initiate_upload(self) -> None:
logger.debug("Initiate upload for %s", self)
self.parts = []

self.mpu = self.fs.tos_client.create_multipart_upload(self.bucket, self.key)
self.mpu = retryable_func_executor(
lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key),
max_retry_num=self.fs.max_retry_num,
)

if self.append_block:
# use existing data in key when appending,
# and block is big enough
out = self.fs.tos_client.upload_part_copy(
bucket=self.bucket,
key=self.key,
part_number=1,
upload_id=self.mpu.upload_id,
out = retryable_func_executor(
lambda: self.fs.tos_client.upload_part_copy(
bucket=self.bucket,
key=self.key,
part_number=1,
upload_id=self.mpu.upload_id,
),
max_retry_num=self.fs.max_retry_num,
)

self.parts.append({"PartNumber": out.part_number, "ETag": out.etag})
Expand Down Expand Up @@ -2123,12 +2132,19 @@ def handle_remainder(
part = len(self.parts) + 1 if self.parts is not None else 1
logger.debug("Upload chunk %s, %s", self, part)

out: UploadPartOutput = self.fs.tos_client.upload_part(
bucket=bucket,
key=key,
part_number=part,
upload_id=self.mpu.upload_id,
content=previous_chunk,
def _call_upload_part(
part: int = part, previous_chunk: Optional[bytes] = previous_chunk
) -> UploadPartOutput:
self.fs.tos_client.upload_part(
bucket=bucket,
key=key,
part_number=part,
upload_id=self.mpu.upload_id,
content=previous_chunk,
)

out = retryable_func_executor(
_call_upload_part, max_retry_num=self.fs.max_retry_num
)

(
Expand Down Expand Up @@ -2179,15 +2195,24 @@ def commit(self) -> None:
logger.debug("One-shot upload of %s", self)
self.buffer.seek(0)
data = self.buffer.read()
write_result = self.fs.tos_client.put_object(
self.bucket, self.key, content=data
write_result = retryable_func_executor(
lambda: self.fs.tos_client.put_object(
self.bucket, self.key, content=data
),
max_retry_num=self.fs.max_retry_num,
)
else:
raise RuntimeError
else:
logger.debug("Complete multi-part upload for %s ", self)
write_result = self.fs.tos_client.complete_multipart_upload(
self.bucket, self.key, upload_id=self.mpu.upload_id, parts=self.parts
write_result = retryable_func_executor(
lambda: self.fs.tos_client.complete_multipart_upload(
self.bucket,
self.key,
upload_id=self.mpu.upload_id,
parts=self.parts,
),
max_retry_num=self.fs.max_retry_num,
)

if self.fs.version_aware:
Expand All @@ -2202,7 +2227,10 @@ def discard(self) -> None:

def _abort_mpu(self) -> None:
if self.mpu:
self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.mpu.upload_id
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.mpu.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.mpu = None
13 changes: 13 additions & 0 deletions tosfs/tests/test_stability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# ByteDance Volcengine EMR, Copyright 2024.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

0 comments on commit c9cf8c7

Please sign in to comment.