Skip to content

Commit

Permalink
Performance: Read optimize (#88)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored Sep 19, 2024
1 parent 6f0d353 commit acd4bd5
Showing 1 changed file with 39 additions and 65 deletions.
104 changes: 39 additions & 65 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def __init__(
socket_timeout=socket_timeout,
high_latency_log_threshold=high_latency_log_threshold,
credentials_provider=credentials_provider,
enable_crc=False,
enable_verify_ssl=False,
disable_encoding_meta=True,
)
self.version_aware = version_aware
self.default_block_size = (
Expand Down Expand Up @@ -1561,10 +1564,31 @@ def exists(self, path: str, **kwargs: Any) -> bool:
# if the path is a bucket
if not key:
return self._exists_bucket(bucket)
elif self.isfile(path):
return self._exists_object(bucket, key, path, version_id)
else:
return self._exists_object(bucket, key.rstrip("/") + "/", path, version_id)

try:
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
try:
return retryable_func_executor(
lambda: self.tos_client.head_object(
bucket, key.rstrip("/") + "/"
)
or True,
max_retry_num=self.max_retry_num,
)
except TosServerError as ex:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
else:
raise ex
else:
raise e
except Exception as ex:
raise TosfsError(f"Tosfs failed with unknown error: {ex}") from ex

def _exists_bucket(self, bucket: str) -> bool:
"""Check if a bucket exists in the TOS.
Expand Down Expand Up @@ -1613,60 +1637,6 @@ def _exists_bucket(self, bucket: str) -> bool:
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _exists_object(
self, bucket: str, key: str, path: str, version_id: Optional[str] = None
) -> bool:
"""Check if an object exists in the TOS.
Parameters
----------
bucket : str
The name of the bucket.
key : str
The key of the object.
path : str
The full path of the object.
version_id : str, optional
The version ID of the object (default is None).
Returns
-------
bool
True if the object exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the object.
tos.exceptions.TosServerError
If there is a server error while checking the object.
TosfsError
If there is an unknown error while checking the object.
Examples
--------
>>> fs = TosFileSystem()
>>> fs._exists_object("mybucket", "myfile", "tos://mybucket/myfile")
True
>>> fs._exists_object("mybucket", "nonexistentfile", "tos://mybucket/nonexistentfile")
False
"""
try:
return retryable_func_executor(
lambda: self.tos_client.head_object(bucket, key) or True,
max_retry_num=self.max_retry_num,
)
except TosClientError as e:
raise e
except TosServerError as e:
if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND:
return False
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _lsbuckets(self) -> List[dict]:
"""List all buckets in the account.
Expand Down Expand Up @@ -2010,6 +1980,7 @@ def __init__(
self.fs = fs
self.bucket = bucket
self.key = key
self.version_id = path_version_id
self.path = path
self.mode = mode
self.autocommit = autocommit
Expand Down Expand Up @@ -2170,22 +2141,25 @@ def _call_upload_part(
)

def _fetch_range(self, start: int, end: int) -> bytes:
bucket, key, version_id = self.fs._split_path(self.path)
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
bucket,
key,
self.bucket,
self.key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)
logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end)

def fetch() -> bytes:
return self.fs.tos_client.get_object(
bucket, key, version_id, range_start=start, range_end=end
).read()
temp_buffer = io.BytesIO()
for chunk in self.fs.tos_client.get_object(
self.bucket, self.key, self.version_id, range_start=start, range_end=end
):
temp_buffer.write(chunk)
temp_buffer.seek(0)
return temp_buffer.read()

return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)

Expand Down

0 comments on commit acd4bd5

Please sign in to comment.