Skip to content

Commit

Permalink
Core: Implement exists api
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Aug 21, 2024
1 parent a81701d commit 295af56
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 9 deletions.
18 changes: 18 additions & 0 deletions tosfs/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# ByteDance Volcengine EMR, Copyright 2024.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""The module contains constants for the tosfs package."""

# Tos server response codes
TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404
208 changes: 202 additions & 6 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
from tos.models import CommonPrefixInfo
from tos.models2 import ListedObject, ListedObjectVersion

from tosfs.consts import TOS_SERVER_RESPONSE_CODE_NOT_FOUND
from tosfs.exceptions import TosfsError
from tosfs.utils import find_bucket_key

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"

# constants
SERVER_RESPONSE_CODE_NOT_FOUND = 404

logger = logging.getLogger("tosfs")


Expand Down Expand Up @@ -255,7 +253,11 @@ def rmdir(self, path: str) -> None:
def _info_from_cache(
self, path: str, fullpath: str, version_id: Optional[str]
) -> dict:
out = self._ls_from_cache(fullpath)
try:
out = self._ls_from_cache(fullpath)
except FileNotFoundError:
return {}

if out is not None:
if self.version_aware and version_id is not None:
# If cached info does not match requested version_id,
Expand Down Expand Up @@ -316,7 +318,7 @@ def _bucket_info(self, bucket: str) -> dict:
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
raise FileNotFoundError(bucket) from e
else:
raise e
Expand Down Expand Up @@ -375,7 +377,7 @@ def _object_info(
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
pass
else:
raise e
Expand Down Expand Up @@ -413,6 +415,200 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def exists(self, path: str, **kwargs: Union[str, bool, float, None]) -> bool:
"""Check if a path exists in the TOS file system.
Parameters
----------
path : str
The path to check for existence.
**kwargs : Union[str, bool, float, None], optional
Additional arguments if needed in the future.
Returns
-------
bool
True if the path exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the path.
tos.exceptions.TosServerError
If there is a server error while checking the path.
TosfsError
If there is an unknown error while checking the path.
Examples
--------
>>> fs = TosFileSystem()
>>> fs.exists("tos://bucket/to/file")
True
>>> fs.exists("tos://mybucket/nonexistentfile")
False
"""
if path in ["", "/"]:
# the root always exists, even if anon
return True

path = self._strip_protocol(path)
bucket, key, version_id = self._split_path(path)
# if the path is a bucket
if not key:
return self._exists_bucket(bucket)
else:
object_exists = self._exists_object(bucket, key, path, version_id)
if not object_exists:
return self._exists_object(
bucket, key.rstrip("/") + "/", path, version_id
)
return object_exists

def _exists_bucket(self, bucket: str) -> bool:
"""Check if a bucket exists in the TOS file system.
It will first check the dircache,
then check the bucket using the TOS client.
Parameters
----------
bucket : str
The name of the bucket to check for existence.
Returns
-------
bool
True if the bucket exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the bucket.
tos.exceptions.TosServerError
If there is a server error while checking the bucket.
TosfsError
If there is an unknown error while checking the bucket.
Examples
--------
>>> fs = TosFileSystem()
>>> fs._exists_bucket("mybucket")
True
>>> fs._exists_bucket("nonexistentbucket")
False
"""
if self.dircache.get(bucket, False):
return True
else:
try:
if self._ls_from_cache(bucket):
return True
except FileNotFoundError:
# might still be a bucket we can access but don't own
pass

try:
self.tos_client.head_bucket(bucket)
return True
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
return False
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _exists_object(
self, bucket: str, key: str, path: str, version_id: Optional[str] = None
) -> bool:
"""Check if an object exists in the TOS file system.
It will first check the dircache,
then check the object using the TOS client.
Parameters
----------
bucket : str
The name of the bucket.
key : str
The key of the object.
path : str
The full path of the object.
version_id : str, optional
The version ID of the object (default is None).
Returns
-------
bool
True if the object exists, False otherwise.
Raises
------
tos.exceptions.TosClientError
If there is a client error while checking the object.
tos.exceptions.TosServerError
If there is a server error while checking the object.
TosfsError
If there is an unknown error while checking the object.
Examples
--------
>>> fs = TosFileSystem()
>>> fs._exists_object("mybucket", "myfile", "tos://mybucket/myfile")
True
>>> fs._exists_object("mybucket", "nonexistentfile", "tos://mybucket/nonexistentfile")
False
"""
exists_in_cache = self._exists_in_cache(path, bucket, key, version_id)
if exists_in_cache:
return exists_in_cache

try:
self.tos_client.head_object(bucket, key)
return True
except tos.exceptions.TosClientError as e:
raise e
except tos.exceptions.TosServerError as e:
if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND:
return False
else:
raise e
except Exception as e:
raise TosfsError(f"Tosfs failed with unknown error: {e}") from e

def _exists_in_cache(
self,
path: str,
bucket: str,
key: str,
version_id: Optional[str] = None,
) -> Optional[bool]:
fullpath = "/".join((bucket, key))

try:
entries = self._ls_from_cache(fullpath)
except FileNotFoundError:
return False

if entries is None:
return False

if not self.version_aware or version_id is None:
return True

for entry in entries:
if entry["name"] == fullpath and entry.get("VersionId") == version_id:
return True

# dircache doesn't support multiple versions, so we really can't tell if
# the one we want exists.
return False

def _lsbuckets(self, refresh: bool = False) -> List[dict]:
"""List all buckets in the account.
Expand Down
4 changes: 2 additions & 2 deletions tosfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ def temporary_workspace(
yield workspace
try:
tosfs.rmdir(f"{bucket}/{workspace}/")
except Exception:
logger.error("Ignore exception.")
except Exception as e:
logger.error(f"Ignore exception {e}.")
assert not tosfs.exists(f"{bucket}/{workspace}/")
30 changes: 29 additions & 1 deletion tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) ->
assert tosfs.ls(f"{bucket}/{temporary_workspace}/nonexistent", detail=False) == []


def test_ls_cache(tosfs: TosFileSystem, bucket: str) -> None:
def test_ls_cache(tosfs: TosFileSystem, temporary_workspace: str, bucket: str) -> None:
with unittest.mock.patch.object(
tosfs.tos_client,
"list_objects_type2",
Expand All @@ -71,6 +71,10 @@ def test_ls_cache(tosfs: TosFileSystem, bucket: str) -> None:
# Verify that list_objects_type2 was called only once
assert tosfs.tos_client.list_objects_type2.call_count == 1

unittest.mock.patch.stopall()
tosfs.invalidate_cache("")
tosfs.ls(bucket, detail=False, refresh=True)


def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_path()
Expand Down Expand Up @@ -131,3 +135,27 @@ def test_rmdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N
assert f"{bucket}/{temporary_workspace}" not in tosfs.ls(
bucket, detail=False, refresh=True
)


def test_exists_bucket(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
assert tosfs.exists("")
assert tosfs.exists("/")
assert tosfs.exists(bucket)
assert not tosfs.exists("nonexistent")


def test_exists_object(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
file_name = random_path()
tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/")
assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent")
assert not tosfs.exists(f"{bucket}/nonexistent")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent")
tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")
assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")

0 comments on commit 295af56

Please sign in to comment.