Skip to content

Commit

Permalink
Refactor get_file API to reuse retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 21, 2024
1 parent 239e39d commit e4449cb
Showing 1 changed file with 20 additions and 41 deletions.
61 changes: 20 additions & 41 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import mimetypes
import os
import tempfile
import time
from glob import has_magic
from typing import Any, BinaryIO, Collection, Generator, List, Optional, Tuple, Union

Expand Down Expand Up @@ -48,7 +47,6 @@
MPU_PART_SIZE_THRESHOLD,
PART_MAX_SIZE,
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD,
RETRY_NUM,
TOS_SERVER_STATUS_CODE_NOT_FOUND,
TOSFS_LOG_FORMAT,
)
Expand Down Expand Up @@ -873,53 +871,34 @@ def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
bucket, key, version_id = self._split_path(rpath)

def _read_chunks(body: BinaryIO, f: BinaryIO) -> None:
failed_reads = 0
bytes_read = 0
while True:
try:
chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE)
except TosClientError as e:
failed_reads += 1
if failed_reads >= RETRY_NUM:
raise e
try:
body.close()
except Exception as e:
logger.error(
"Failed to close the body when calling "
"get_file from %s to %s : %s",
rpath,
lpath,
e,
)

time.sleep(min(1.7**failed_reads * 0.1, 15))
body, _ = self._open_remote_file(
bucket, key, version_id, bytes_read, **kwargs
)
continue
chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE)
if not chunk:
break
bytes_read += len(chunk)
f.write(chunk)

body, content_length = self._open_remote_file(
bucket, key, version_id, range_start=0, **kwargs
)
try:
with open(lpath, "wb") as f:
_read_chunks(body, f)
finally:
def download_file() -> None:
body, content_length = self._open_remote_file(
bucket, key, version_id, range_start=0, **kwargs
)
try:
body.close()
except Exception as e:
logger.error(
"Failed to close the body when calling "
"get_file from %s to %s: %s",
rpath,
lpath,
e,
)
with open(lpath, "wb") as f:
retryable_func_executor(_read_chunks, args=(body, f))
finally:
try:
body.close()
except Exception as e:
logger.error(
"Failed to close the body when calling "
"get_file from %s to %s: %s",
rpath,
lpath,
e,
)

retryable_func_executor(download_file)

def walk(
self,
Expand Down

0 comments on commit e4449cb

Please sign in to comment.