From d7424701a1dcbe8dab0249cc685604d32d7aebc4 Mon Sep 17 00:00:00 2001 From: yanghua Date: Tue, 1 Oct 2024 09:52:39 +0800 Subject: [PATCH] Refactor code --- tosfs/core.py | 65 ++++++++++++++++++++++------------------- tosfs/tests/conftest.py | 2 +- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 5b75853..0cdd924 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -107,7 +107,7 @@ def __init__( region: Optional[str] = None, max_retry_num: int = 20, max_connections: int = 1024, - connection_time: int = 10, + connection_timeout: int = 10, socket_timeout: int = 30, high_latency_log_threshold: int = 100, version_aware: bool = False, @@ -139,7 +139,7 @@ def __init__( max_connections : int, optional The maximum number of HTTP connections that can be opened in the connection pool (default is 1024). - connection_time : int, optional + connection_timeout : int, optional The time to keep a connection open in seconds (default is 10). socket_timeout : int, optional The socket read and write timeout time for a single request after @@ -196,7 +196,7 @@ def __init__( region, max_retry_count=0, max_connections=max_connections, - connection_time=connection_time, + connection_time=connection_timeout, socket_timeout=socket_timeout, high_latency_log_threshold=high_latency_log_threshold, credentials_provider=credentials_provider, @@ -254,15 +254,15 @@ def _open( as they do for the built-in `open` function. block_size: int Size of data-node blocks if reading + version_id : str + Explicit version of the object to open. This requires that the tos + filesystem is version aware and bucket versioning is enabled on the + relevant bucket. fill_cache: bool If seeking to new a part of the file beyond the current buffer, with this True, the buffer will be filled between the sections to best support random access. When reading only a few specific chunks out of a file, performance may be better if False. - version_id : str - Explicit version of the object to open. This requires that the tos - filesystem is version aware and bucket versioning is enabled on the - relevant bucket. cache_type : str See fsspec's documentation for available cache_type values. Set to "none" if no caching is desired. If None, defaults to ``self.default_cache_type``. @@ -2211,29 +2211,24 @@ def __init__( self.append_block = False self.buffer: Optional[io.BytesIO] = io.BytesIO() - self.multipart_uploader = MultipartUploader( - fs=fs, - bucket=bucket, - key=key, - part_size=fs.multipart_size, - thread_pool_size=fs.multipart_thread_pool_size, - staging_buffer_size=fs.multipart_staging_buffer_size, - multipart_threshold=fs.multipart_threshold, - ) - - if "a" in mode and fs.exists(path): - head = retryable_func_executor( - lambda: self.fs.tos_client.head_object(bucket, key), - max_retry_num=self.fs.max_retry_num, - ) - loc = head.content_length - - if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD: - # existing file too small for multi-upload: download - self.write(self.fs.cat(self.path)) - else: - self.append_block = True - self.loc = loc + if "a" in mode: + try: + head = retryable_func_executor( + lambda: self.fs.tos_client.head_object(bucket, key), + max_retry_num=self.fs.max_retry_num, + ) + loc = head.content_length + if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD: + # existing file too small for multi-upload: download + self.write(self.fs.cat(self.path)) + else: + self.append_block = True + self.loc = loc + except TosServerError as e: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + pass + else: + raise e if "w" in mode: # check the local staging dir if not exist, create it @@ -2241,6 +2236,16 @@ def __init__( if not os.path.exists(staging_dir): os.makedirs(staging_dir) + self.multipart_uploader = MultipartUploader( + fs=fs, + bucket=bucket, + key=key, + part_size=fs.multipart_size, + thread_pool_size=fs.multipart_thread_pool_size, + staging_buffer_size=fs.multipart_staging_buffer_size, + multipart_threshold=fs.multipart_threshold, + ) + def _initiate_upload(self) -> None: """Create remote file/upload.""" if self.autocommit and not self.append_block and self.tell() < self.blocksize: diff --git a/tosfs/tests/conftest.py b/tosfs/tests/conftest.py index 549e550..16ab5b6 100644 --- a/tosfs/tests/conftest.py +++ b/tosfs/tests/conftest.py @@ -39,7 +39,7 @@ def tosfs(_tosfs_env_prepare: None) -> TosFileSystem: region=os.environ.get("TOS_REGION"), credentials_provider=EnvCredentialsProvider(), max_retry_num=1000, - connection_time=300, + connection_timeout=300, socket_timeout=300, multipart_size=4 << 20, multipart_threshold=4 << 20,