Skip to content

Commit

Permalink
Core: Implement get_file API
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Aug 30, 2024
1 parent f1eec7f commit 6c4cbd1
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 1 deletion.
108 changes: 107 additions & 1 deletion tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"""The core module of TOSFS."""
import logging
import os
from typing import Any, List, Optional, Tuple, Union
import time
from typing import Any, BinaryIO, List, Optional, Tuple, Union

import tos
from fsspec import AbstractFileSystem
Expand Down Expand Up @@ -55,6 +56,8 @@ class TosFileSystem(AbstractFileSystem):
abstract super-class for pythonic file-systems.
"""

retries = 5

def __init__(
self,
endpoint_url: Optional[str] = None,
Expand Down Expand Up @@ -352,6 +355,109 @@ def isfile(self, path: str) -> bool:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None:
"""Get a file from the TOS filesystem.
Parameters
----------
rpath : str
The remote path of the file to get.
lpath : str
The local path to save the file.
**kwargs : Any, optional
Additional arguments.
Raises
------
tos.exceptions.TosClientError
If there is a client error while getting the file.
tos.exceptions.TosServerError
If there is a server error while getting the file.
TosfsError
If there is an unknown error while getting the file.
"""
if os.path.isdir(lpath):
return

if not self.exists(rpath):
raise FileNotFoundError(rpath)

bucket, key, version_id = self._split_path(rpath)

def _read_chunks(body: BinaryIO, f: BinaryIO) -> None:
failed_reads = 0
bytes_read = 0
while True:
try:
chunk = body.read(2**16)
except tos.exceptions.TosClientError as e:
failed_reads += 1
if failed_reads >= self.retries:
raise e
try:
body.close()
except Exception as e:
logger.error(
"Failed to close the body when calling "
"get_file from %s to %s : %s",
rpath,
lpath,
e,
)

time.sleep(min(1.7**failed_reads * 0.1, 15))
body, _ = self._open_remote_file(
bucket, key, version_id, bytes_read, **kwargs
)
continue
if not chunk:
break
bytes_read += len(chunk)
f.write(chunk)

body, content_length = self._open_remote_file(
bucket, key, version_id, range_start=0, **kwargs
)
try:
with open(lpath, "wb") as f:
_read_chunks(body, f)
finally:
try:
body.close()
except Exception as e:
logger.error(
"Failed to close the body when calling "
"get_file from %s to %s: %s",
rpath,
lpath,
e,
)

def _open_remote_file(
self,
bucket: str,
key: str,
version_id: Optional[str],
range_start: int,
**kwargs: Any,
) -> Tuple[BinaryIO, int]:
try:
resp = self.tos_client.get_object(
bucket,
key,
version_id=version_id,
range_start=range_start,
**kwargs,
)
return resp.content, resp.content_length
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _bucket_info(self, bucket: str) -> dict:
"""Get the information of a bucket.
Expand Down
22 changes: 22 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path
import tempfile

import pytest
from tos.exceptions import TosServerError
Expand Down Expand Up @@ -186,3 +188,23 @@ def test_exists_object(
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent")
tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")


def test_get_file(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
file_content = "hello world"
rpath = f"{bucket}/{temporary_workspace}/{file_name}"
lpath = f"{tempfile.mkdtemp()}/{file_name}"
assert not os.path.exists(lpath)

bucket, key, _ = tosfs._split_path(rpath)
tosfs.tos_client.put_object(bucket=bucket, key=key, content=file_content)

tosfs.get_file(rpath, lpath)
with open(lpath, "r") as f:
assert f.read() == file_content

with pytest.raises(FileNotFoundError):
tosfs.get_file(f"{bucket}/{temporary_workspace}/nonexistent", lpath)

tosfs.rm_file(rpath)

0 comments on commit 6c4cbd1

Please sign in to comment.