Skip to content

Commit

Permalink
Core: Implement put_file API (#40)
Browse files Browse the repository at this point in the history
* Core: Implement put_file API

* Fix a doc mistake
  • Loading branch information
yanghua authored Aug 30, 2024
1 parent f942ea5 commit 4c20fa9
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 14 deletions.
89 changes: 89 additions & 0 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""The core module of TOSFS."""
import logging
import mimetypes
import os
from typing import Any, List, Optional, Tuple, Union

Expand Down Expand Up @@ -420,6 +421,94 @@ def isfile(self, path: str) -> bool:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def put_file(
self,
lpath: str,
rpath: str,
chunksize: int = 5 * 2**20,
**kwargs: Any,
) -> None:
"""Put a file from local to TOS.
Parameters
----------
lpath : str
The local path of the file to put.
rpath : str
The remote path of the file to put.
chunksize : int, optional
The size of the chunks to read from the file (default is 5 * 2**20).
**kwargs : Any, optional
Additional arguments.
Raises
------
FileNotFoundError
If the local file does not exist.
IsADirectoryError
If the local path is a directory.
TosClientError
If there is a client error while putting the file.
TosServerError
If there is a server error while putting the file.
TosfsError
If there is an unknown error while putting the file.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.put_file("localfile.txt", "tos://mybucket/remote.txt")
"""
if not os.path.exists(lpath):
raise FileNotFoundError(f"Local file {lpath} not found.")

if os.path.isdir(lpath):
raise IsADirectoryError(f"{lpath} is a directory.")

size = os.path.getsize(lpath)

content_type = None
if "ContentType" not in kwargs:
content_type, _ = mimetypes.guess_type(lpath)

bucket, key, _ = self._split_path(rpath)

try:
if self.isfile(rpath):
self.makedirs(self._parent(rpath), exist_ok=True)

if self.isdir(rpath):
rpath = os.path.join(rpath, os.path.basename(lpath))

bucket, key, _ = self._split_path(rpath)

with open(lpath, "rb") as f:
if size < min(5 * 2**30, 2 * chunksize):
chunk = f.read()
self.tos_client.put_object(
bucket,
key,
content=chunk,
content_type=content_type,
)
else:
mpu = self.tos_client.create_multipart_upload(
bucket, key, content_type=content_type
)
self.tos_client.upload_part_from_file(
bucket, key, mpu.upload_id, file_path=lpath, part_number=1
)
self.tos_client.complete_multipart_upload(
bucket, key, mpu.upload_id, complete_all=True
)
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _bucket_info(self, bucket: str) -> dict:
"""Get the information of a bucket.
Expand Down
4 changes: 2 additions & 2 deletions tosfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tos import EnvCredentialsProvider

from tosfs.core import TosFileSystem, logger
from tosfs.utils import random_path
from tosfs.utils import random_str


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -49,7 +49,7 @@ def bucket() -> str:
def temporary_workspace(
tosfs: TosFileSystem, bucket: str
) -> Generator[str, None, None]:
workspace = random_path()
workspace = random_str()
tosfs.mkdirs(f"{bucket}/{workspace}/")
yield workspace
try:
Expand Down
78 changes: 69 additions & 9 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from tosfs.core import TosFileSystem
from tosfs.exceptions import TosfsError
from tosfs.utils import random_path
from tosfs.utils import create_temp_dir, random_str


def test_ls_bucket(tosfs: TosFileSystem, bucket: str) -> None:
Expand Down Expand Up @@ -51,7 +51,7 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) ->


def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert f"{bucket}/{temporary_workspace}/{file_name}" in tosfs.ls(
f"{bucket}/{temporary_workspace}", detail=False
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_rmdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N
with pytest.raises(TosfsError):
tosfs.rmdir(bucket)

file_name = random_path()
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert f"{bucket}/{temporary_workspace}/{file_name}" in tosfs.ls(
f"{bucket}/{temporary_workspace}", detail=False
Expand All @@ -112,7 +112,7 @@ def test_rmdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N


def test_touch(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
file_name = random_str()
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
tosfs.touch(f"{bucket}/{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_isdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N
assert not tosfs.isdir(f"{bucket}/{temporary_workspace}/nonexistent")
assert not tosfs.isdir(f"{bucket}/{temporary_workspace}/nonexistent/")

file_name = random_path()
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert not tosfs.isdir(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.isdir(f"{bucket}/{temporary_workspace}/{file_name}/")
Expand All @@ -153,7 +153,7 @@ def test_isdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N


def test_isfile(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert tosfs.isfile(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.isfile(f"{bucket}/{temporary_workspace}/{file_name}/")
Expand All @@ -176,7 +176,7 @@ def test_exists_bucket(
def test_exists_object(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
file_name = random_path()
file_name = random_str()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/")
Expand All @@ -189,7 +189,7 @@ def test_exists_object(


def test_mkdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
dir_name = random_path()
dir_name = random_str()

with pytest.raises(TosfsError):
tosfs.mkdir(f"{bucket}")
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_mkdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N


def test_makedirs(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
dir_name = random_path()
dir_name = random_str()

with pytest.raises(FileExistsError):
tosfs.makedirs(f"{bucket}/{temporary_workspace}", exist_ok=False)
Expand Down Expand Up @@ -260,3 +260,63 @@ def test_makedirs(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -
tosfs.rmdir(f"{bucket}/{temporary_workspace}/notexist")
tosfs.rmdir(f"{bucket}/{temporary_workspace}/{dir_name}")
tosfs.rmdir(f"{bucket}/{temporary_workspace}")


def test_put_file(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
temp_dir = create_temp_dir()
file_name = f"{random_str()}.txt"
lpath = f"{temp_dir}/{file_name}"
rpath = f"{bucket}/{temporary_workspace}/{file_name}"

with open(lpath, "w") as f:
f.write("test content")

assert not tosfs.exists(rpath)

tosfs.put_file(lpath, rpath)
assert tosfs.exists(rpath)

bucket, key, _ = tosfs._split_path(rpath)
assert (
tosfs.tos_client.get_object(bucket, key).content.read().decode()
== "test content"
)

with open(lpath, "w") as f:
f.write("hello world")

tosfs.put_file(lpath, rpath)
assert (
tosfs.tos_client.get_object(bucket, key).content.read().decode()
== "hello world"
)

tosfs.rm_file(rpath)
assert not tosfs.exists(rpath)

tosfs.put(lpath, f"{bucket}/{temporary_workspace}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
assert (
tosfs.tos_client.get_object(bucket, f"{temporary_workspace}/{file_name}")
.content.read()
.decode()
== "hello world"
)

with pytest.raises(IsADirectoryError):
tosfs.put_file(temp_dir, f"{bucket}/{temporary_workspace}")

with pytest.raises(FileNotFoundError):
tosfs.put_file(f"/notexist/{random_str()}", rpath)

with open(lpath, "wb") as f:
f.write(b"a" * 1024 * 1024 * 6)

# test mpu
tosfs.put_file(lpath, rpath, chunksize=2 * 1024 * 1024)
assert (
tosfs.tos_client.get_object(bucket, key).content.read()
== b"a" * 1024 * 1024 * 6
)

tosfs.rm_file(rpath)
18 changes: 15 additions & 3 deletions tosfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import random
import re
import string
import tempfile
from typing import Tuple


def random_path(length: int = 5) -> str:
"""Generate a random path(dir or file) of the given length.
def random_str(length: int = 5) -> str:
"""Generate a random string of the given length.
Args:
----
Expand All @@ -32,7 +33,18 @@ def random_path(length: int = 5) -> str:
str: The random string.
"""
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
return "".join(random.choices(string.ascii_letters, k=length))


def create_temp_dir() -> str:
"""Create a temporary directory for testing purposes.
Returns
-------
str: The path of the created temporary directory.
"""
return tempfile.mkdtemp()


def find_bucket_key(tos_path: str) -> Tuple[str, str]:
Expand Down

0 comments on commit 4c20fa9

Please sign in to comment.