Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Oct 1, 2024
1 parent 4099b15 commit d742470
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 31 deletions.
65 changes: 35 additions & 30 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__(
region: Optional[str] = None,
max_retry_num: int = 20,
max_connections: int = 1024,
connection_time: int = 10,
connection_timeout: int = 10,
socket_timeout: int = 30,
high_latency_log_threshold: int = 100,
version_aware: bool = False,
Expand Down Expand Up @@ -139,7 +139,7 @@ def __init__(
max_connections : int, optional
The maximum number of HTTP connections that can be opened in the
connection pool (default is 1024).
connection_time : int, optional
connection_timeout : int, optional
The time to keep a connection open in seconds (default is 10).
socket_timeout : int, optional
The socket read and write timeout time for a single request after
Expand Down Expand Up @@ -196,7 +196,7 @@ def __init__(
region,
max_retry_count=0,
max_connections=max_connections,
connection_time=connection_time,
connection_time=connection_timeout,
socket_timeout=socket_timeout,
high_latency_log_threshold=high_latency_log_threshold,
credentials_provider=credentials_provider,
Expand Down Expand Up @@ -254,15 +254,15 @@ def _open(
as they do for the built-in `open` function.
block_size: int
Size of data-node blocks if reading
version_id : str
Explicit version of the object to open. This requires that the tos
filesystem is version aware and bucket versioning is enabled on the
relevant bucket.
fill_cache: bool
If seeking to new a part of the file beyond the current buffer,
with this True, the buffer will be filled between the sections to
best support random access. When reading only a few specific chunks
out of a file, performance may be better if False.
version_id : str
Explicit version of the object to open. This requires that the tos
filesystem is version aware and bucket versioning is enabled on the
relevant bucket.
cache_type : str
See fsspec's documentation for available cache_type values. Set to "none"
if no caching is desired. If None, defaults to ``self.default_cache_type``.
Expand Down Expand Up @@ -2211,36 +2211,41 @@ def __init__(
self.append_block = False
self.buffer: Optional[io.BytesIO] = io.BytesIO()

self.multipart_uploader = MultipartUploader(
fs=fs,
bucket=bucket,
key=key,
part_size=fs.multipart_size,
thread_pool_size=fs.multipart_thread_pool_size,
staging_buffer_size=fs.multipart_staging_buffer_size,
multipart_threshold=fs.multipart_threshold,
)

if "a" in mode and fs.exists(path):
head = retryable_func_executor(
lambda: self.fs.tos_client.head_object(bucket, key),
max_retry_num=self.fs.max_retry_num,
)
loc = head.content_length

if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD:
# existing file too small for multi-upload: download
self.write(self.fs.cat(self.path))
else:
self.append_block = True
self.loc = loc
if "a" in mode:
try:
head = retryable_func_executor(
lambda: self.fs.tos_client.head_object(bucket, key),
max_retry_num=self.fs.max_retry_num,
)
loc = head.content_length
if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD:
# existing file too small for multi-upload: download
self.write(self.fs.cat(self.path))
else:
self.append_block = True
self.loc = loc
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
pass
else:
raise e

if "w" in mode:
# check the local staging dir if not exist, create it
for staging_dir in fs.multipart_staging_dirs:
if not os.path.exists(staging_dir):
os.makedirs(staging_dir)

self.multipart_uploader = MultipartUploader(
fs=fs,
bucket=bucket,
key=key,
part_size=fs.multipart_size,
thread_pool_size=fs.multipart_thread_pool_size,
staging_buffer_size=fs.multipart_staging_buffer_size,
multipart_threshold=fs.multipart_threshold,
)

def _initiate_upload(self) -> None:
"""Create remote file/upload."""
if self.autocommit and not self.append_block and self.tell() < self.blocksize:
Expand Down
2 changes: 1 addition & 1 deletion tosfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def tosfs(_tosfs_env_prepare: None) -> TosFileSystem:
region=os.environ.get("TOS_REGION"),
credentials_provider=EnvCredentialsProvider(),
max_retry_num=1000,
connection_time=300,
connection_timeout=300,
socket_timeout=300,
multipart_size=4 << 20,
multipart_threshold=4 << 20,
Expand Down

0 comments on commit d742470

Please sign in to comment.