From 6c4cbd16a9fef7380716d1f765cedba41666bb19 Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 30 Aug 2024 12:49:49 +0800 Subject: [PATCH] Core: Implement get_file API --- tosfs/core.py | 108 +++++++++++++++++++++++++++++++++++++- tosfs/tests/test_tosfs.py | 22 ++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/tosfs/core.py b/tosfs/core.py index e20be5d..87085d9 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -15,7 +15,8 @@ """The core module of TOSFS.""" import logging import os -from typing import Any, List, Optional, Tuple, Union +import time +from typing import Any, BinaryIO, List, Optional, Tuple, Union import tos from fsspec import AbstractFileSystem @@ -55,6 +56,8 @@ class TosFileSystem(AbstractFileSystem): abstract super-class for pythonic file-systems. """ + retries = 5 + def __init__( self, endpoint_url: Optional[str] = None, @@ -352,6 +355,109 @@ def isfile(self, path: str) -> bool: except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: + """Get a file from the TOS filesystem. + + Parameters + ---------- + rpath : str + The remote path of the file to get. + lpath : str + The local path to save the file. + **kwargs : Any, optional + Additional arguments. + + Raises + ------ + tos.exceptions.TosClientError + If there is a client error while getting the file. + tos.exceptions.TosServerError + If there is a server error while getting the file. + TosfsError + If there is an unknown error while getting the file. + + """ + if os.path.isdir(lpath): + return + + if not self.exists(rpath): + raise FileNotFoundError(rpath) + + bucket, key, version_id = self._split_path(rpath) + + def _read_chunks(body: BinaryIO, f: BinaryIO) -> None: + failed_reads = 0 + bytes_read = 0 + while True: + try: + chunk = body.read(2**16) + except tos.exceptions.TosClientError as e: + failed_reads += 1 + if failed_reads >= self.retries: + raise e + try: + body.close() + except Exception as e: + logger.error( + "Failed to close the body when calling " + "get_file from %s to %s : %s", + rpath, + lpath, + e, + ) + + time.sleep(min(1.7**failed_reads * 0.1, 15)) + body, _ = self._open_remote_file( + bucket, key, version_id, bytes_read, **kwargs + ) + continue + if not chunk: + break + bytes_read += len(chunk) + f.write(chunk) + + body, content_length = self._open_remote_file( + bucket, key, version_id, range_start=0, **kwargs + ) + try: + with open(lpath, "wb") as f: + _read_chunks(body, f) + finally: + try: + body.close() + except Exception as e: + logger.error( + "Failed to close the body when calling " + "get_file from %s to %s: %s", + rpath, + lpath, + e, + ) + + def _open_remote_file( + self, + bucket: str, + key: str, + version_id: Optional[str], + range_start: int, + **kwargs: Any, + ) -> Tuple[BinaryIO, int]: + try: + resp = self.tos_client.get_object( + bucket, + key, + version_id=version_id, + range_start=range_start, + **kwargs, + ) + return resp.content, resp.content_length + except tos.exceptions.TosClientError as e: + raise e + except tos.exceptions.TosServerError as e: + raise e + except Exception as e: + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def _bucket_info(self, bucket: str) -> dict: """Get the information of a bucket. diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index f6dc24c..1358adb 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os.path +import tempfile import pytest from tos.exceptions import TosServerError @@ -186,3 +188,23 @@ def test_exists_object( assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent") tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}") + + +def test_get_file(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: + file_name = random_path() + file_content = "hello world" + rpath = f"{bucket}/{temporary_workspace}/{file_name}" + lpath = f"{tempfile.mkdtemp()}/{file_name}" + assert not os.path.exists(lpath) + + bucket, key, _ = tosfs._split_path(rpath) + tosfs.tos_client.put_object(bucket=bucket, key=key, content=file_content) + + tosfs.get_file(rpath, lpath) + with open(lpath, "r") as f: + assert f.read() == file_content + + with pytest.raises(FileNotFoundError): + tosfs.get_file(f"{bucket}/{temporary_workspace}/nonexistent", lpath) + + tosfs.rm_file(rpath)