Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Implement write API #42

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 292 additions & 4 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

"""The core module of TOSFS."""
import io
import logging
import mimetypes
import os
Expand All @@ -21,9 +22,16 @@

import tos
from fsspec import AbstractFileSystem
from fsspec.spec import AbstractBufferedFile
from fsspec.utils import setup_logging as setup_logger
from tos.models import CommonPrefixInfo
from tos.models2 import ListedObject, ListedObjectVersion
from tos.models2 import (
CreateMultipartUploadOutput,
ListedObject,
ListedObjectVersion,
PartInfo,
UploadPartOutput,
)

from tosfs.consts import TOS_SERVER_RESPONSE_CODE_NOT_FOUND
from tosfs.exceptions import TosfsError
Expand Down Expand Up @@ -58,6 +66,7 @@ class TosFileSystem(AbstractFileSystem):
"""

retries = 5
default_block_size = 5 * 2**20

def __init__(
self,
Expand All @@ -67,7 +76,10 @@ def __init__(
region: Optional[str] = None,
version_aware: bool = False,
credentials_provider: Optional[object] = None,
**kwargs: Union[str, bool, float, None],
default_block_size: Optional[int] = None,
default_fill_cache: bool = True,
default_cache_type: str = "readahead",
**kwargs: Any,
) -> None:
"""Initialise the TosFileSystem."""
self.tos_client = tos.TosClientV2(
Expand All @@ -78,8 +90,77 @@ def __init__(
credentials_provider=credentials_provider,
)
self.version_aware = version_aware
self.default_block_size = default_block_size or self.default_block_size
self.default_fill_cache = default_fill_cache
self.default_cache_type = default_cache_type

super().__init__(**kwargs)

def _open(
self,
path: str,
mode: str = "rb",
block_size: Optional[int] = None,
version_id: Optional[str] = None,
fill_cache: Optional[bool] = None,
cache_type: Optional[str] = None,
autocommit: bool = True,
**kwargs: Any,
) -> AbstractBufferedFile:
"""Open a file for reading or writing.

Parameters
----------
path: string
Path of file on TOS
mode: string
One of 'r', 'w', 'a', 'rb', 'wb', or 'ab'. These have the same meaning
as they do for the built-in `open` function.
block_size: int
Size of data-node blocks if reading
fill_cache: bool
If seeking to new a part of the file beyond the current buffer,
with this True, the buffer will be filled between the sections to
best support random access. When reading only a few specific chunks
out of a file, performance may be better if False.
version_id : str
Explicit version of the object to open. This requires that the s3
filesystem is version aware and bucket versioning is enabled on the
relevant bucket.
cache_type : str
See fsspec's documentation for available cache_type values. Set to "none"
if no caching is desired. If None, defaults to ``self.default_cache_type``.
autocommit : bool
If True, writes will be committed to the filesystem on flush or close.
kwargs: dict-like
Additional parameters.

"""
if block_size is None:
block_size = self.default_block_size
if fill_cache is None:
fill_cache = self.default_fill_cache

if not self.version_aware and version_id:
raise ValueError(
"version_id cannot be specified if the filesystem "
"is not version aware"
)

if cache_type is None:
cache_type = self.default_cache_type

return TosFile(
self,
path,
mode,
block_size=block_size,
version_id=version_id,
fill_cache=fill_cache,
cache_type=cache_type,
autocommit=autocommit,
)

def ls(
self,
path: str,
Expand Down Expand Up @@ -1116,10 +1197,10 @@ def _split_path(self, path: str) -> Tuple[str, str, Optional[str]]:

Examples
--------
>>> split_path("tos://mybucket/path/to/file")
>>> self._split_path("tos://mybucket/path/to/file")
['mybucket', 'path/to/file', None]
# pylint: disable=line-too-long
>>> split_path("tos://mybucket/path/to/versioned_file?versionId=some_version_id")
>>> self._split_path("tos://mybucket/path/to/versioned_file?versionId=some_version_id")
['mybucket', 'path/to/versioned_file', 'some_version_id']

"""
Expand Down Expand Up @@ -1175,3 +1256,210 @@ def _fill_bucket_info(bucket_name: str) -> dict:
"type": "directory",
"name": bucket_name,
}


class TosFile(AbstractBufferedFile):
"""File-like operations for TOS."""

retries = 5
part_min = 5 * 2**20
part_max = 5 * 2**30

def __init__(
self,
fs: TosFileSystem,
path: str,
mode: str = "rb",
block_size: Union[int, str] = "default",
autocommit: bool = True,
cache_type: str = "readahead",
**kwargs: Any,
):
"""Instantiate a TOS file."""
bucket, key, path_version_id = fs._split_path(path)
if not key:
raise ValueError("Attempt to open non key-like path: %s" % path)
super().__init__(
fs,
path,
mode,
block_size=block_size,
autocommit=autocommit,
cache_type=cache_type,
**kwargs,
)
self.fs = fs
self.bucket = bucket
self.key = key
self.path = path
self.mode = mode
self.autocommit = autocommit
self.mpu: CreateMultipartUploadOutput = None
self.parts: Optional[list] = None
self.append_block = False
self.buffer: Optional[io.BytesIO] = io.BytesIO()

def _initiate_upload(self) -> None:
"""Create remote file/upload."""
if self.autocommit and not self.append_block and self.tell() < self.blocksize:
# only happens when closing small file, use on-shot PUT
return
logger.debug("Initiate upload for %s", self)
self.parts = []

self.mpu = self.fs.tos_client.create_multipart_upload(self.bucket, self.key)

if self.append_block:
# use existing data in key when appending,
# and block is big enough
out = self.fs.tos_client.upload_part_copy(
bucket=self.bucket,
key=self.key,
part_number=1,
upload_id=self.mpu.upload_id,
)

self.parts.append({"PartNumber": out.part_number, "ETag": out.etag})

def _upload_chunk(self, final: bool = False) -> bool:
"""Write one part of a multi-block file upload.

Parameters
----------
final: bool
This is the last block, so should complete file, if
self.autocommit is True.

"""
bucket, key, _ = self.fs._split_path(self.path)
if self.buffer:
logger.debug(
"Upload for %s, final=%s, loc=%s, buffer loc=%s",
self,
final,
self.loc,
self.buffer.tell(),
)

if (
self.autocommit
and not self.append_block
and final
and self.tell() < self.blocksize
):
# only happens when closing small file, use one-shot PUT
pass
else:
self._upload_multiple_chunks(bucket, key)

if self.autocommit and final:
self.commit()

return not final

def _upload_multiple_chunks(self, bucket: str, key: str) -> None:
if self.buffer:
self.buffer.seek(0)
current_chunk: Optional[bytes] = self.buffer.read(self.blocksize)

while current_chunk:
(previous_chunk, current_chunk) = (
current_chunk,
self.buffer.read(self.blocksize) if self.buffer else None,
)
current_chunk_size = len(current_chunk if current_chunk else b"")

# Define a helper function to handle the remainder logic
def handle_remainder(
previous_chunk: bytes,
current_chunk: Optional[bytes],
blocksize: int,
part_max: int,
) -> Tuple[bytes, Optional[bytes]]:
if current_chunk:
remainder = previous_chunk + current_chunk
else:
remainder = previous_chunk

remainder_size = (
blocksize + len(current_chunk) if current_chunk else blocksize
)

if remainder_size <= part_max:
return remainder, None
else:
partition = remainder_size // 2
return remainder[:partition], remainder[partition:]

# Use the helper function in the main code
if 0 < current_chunk_size < self.blocksize:
previous_chunk, current_chunk = handle_remainder(
previous_chunk, current_chunk, self.blocksize, self.part_max
)

part = len(self.parts) + 1 if self.parts is not None else 1
logger.debug("Upload chunk %s, %s", self, part)

out: UploadPartOutput = self.fs.tos_client.upload_part(
bucket=bucket,
key=key,
part_number=part,
upload_id=self.mpu.upload_id,
content=previous_chunk,
)

(
self.parts.append(
PartInfo(
part_number=part,
etag=out.etag,
part_size=len(previous_chunk),
offset=None,
hash_crc64_ecma=None,
is_completed=None,
)
)
if self.parts is not None
else None
)

def commit(self) -> None:
"""Complete multipart upload or PUT."""
logger.debug("Commit %s", self)
if self.tell() == 0:
if self.buffer is not None:
logger.debug("Empty file committed %s", self)
self._abort_mpu()
self.fs.touch(self.path, **self.kwargs)
elif not self.parts:
if self.buffer is not None:
logger.debug("One-shot upload of %s", self)
self.buffer.seek(0)
data = self.buffer.read()
write_result = self.fs.tos_client.put_object(
self.bucket, self.key, content=data
)
else:
raise RuntimeError
else:
logger.debug("Complete multi-part upload for %s ", self)
write_result = self.fs.tos_client.complete_multipart_upload(
self.bucket, self.key, upload_id=self.mpu.upload_id, parts=self.parts
)

if self.fs.version_aware:
self.version_id = write_result.version_id

self.buffer = None

def discard(self) -> None:
"""Close the file without writing."""
self._abort_mpu()
self.buffer = None # file becomes unusable

def _abort_mpu(self) -> None:
if self.mpu:
self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.mpu.upload_id
)
self.mpu = None
Loading