diff --git a/tosfs/consts.py b/tosfs/consts.py new file mode 100644 index 0000000..aa1a4da --- /dev/null +++ b/tosfs/consts.py @@ -0,0 +1,18 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The module contains constants for the tosfs package.""" + +# Tos server response codes +TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404 diff --git a/tosfs/core.py b/tosfs/core.py index a7f6974..b6a65c1 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -23,15 +23,13 @@ from tos.models import CommonPrefixInfo from tos.models2 import ListedObject, ListedObjectVersion +from tosfs.consts import TOS_SERVER_RESPONSE_CODE_NOT_FOUND from tosfs.exceptions import TosfsError from tosfs.utils import find_bucket_key # environment variable names ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL" -# constants -SERVER_RESPONSE_CODE_NOT_FOUND = 404 - logger = logging.getLogger("tosfs") @@ -255,7 +253,11 @@ def rmdir(self, path: str) -> None: def _info_from_cache( self, path: str, fullpath: str, version_id: Optional[str] ) -> dict: - out = self._ls_from_cache(fullpath) + try: + out = self._ls_from_cache(fullpath) + except FileNotFoundError: + return {} + if out is not None: if self.version_aware and version_id is not None: # If cached info does not match requested version_id, @@ -316,7 +318,7 @@ def _bucket_info(self, bucket: str) -> dict: except tos.exceptions.TosClientError as e: raise e except tos.exceptions.TosServerError as e: - if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: raise FileNotFoundError(bucket) from e else: raise e @@ -375,7 +377,7 @@ def _object_info( except tos.exceptions.TosClientError as e: raise e except tos.exceptions.TosServerError as e: - if e.status_code == SERVER_RESPONSE_CODE_NOT_FOUND: + if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: pass else: raise e @@ -413,6 +415,200 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + def exists(self, path: str, **kwargs: Union[str, bool, float, None]) -> bool: + """Check if a path exists in the TOS file system. + + Parameters + ---------- + path : str + The path to check for existence. + **kwargs : Union[str, bool, float, None], optional + Additional arguments if needed in the future. + + Returns + ------- + bool + True if the path exists, False otherwise. + + Raises + ------ + tos.exceptions.TosClientError + If there is a client error while checking the path. + tos.exceptions.TosServerError + If there is a server error while checking the path. + TosfsError + If there is an unknown error while checking the path. + + Examples + -------- + >>> fs = TosFileSystem() + >>> fs.exists("tos://bucket/to/file") + True + >>> fs.exists("tos://mybucket/nonexistentfile") + False + + """ + if path in ["", "/"]: + # the root always exists, even if anon + return True + + path = self._strip_protocol(path) + bucket, key, version_id = self._split_path(path) + # if the path is a bucket + if not key: + return self._exists_bucket(bucket) + else: + object_exists = self._exists_object(bucket, key, path, version_id) + if not object_exists: + return self._exists_object( + bucket, key.rstrip("/") + "/", path, version_id + ) + return object_exists + + def _exists_bucket(self, bucket: str) -> bool: + """Check if a bucket exists in the TOS file system. + + It will first check the dircache, + then check the bucket using the TOS client. + + Parameters + ---------- + bucket : str + The name of the bucket to check for existence. + + Returns + ------- + bool + True if the bucket exists, False otherwise. + + Raises + ------ + tos.exceptions.TosClientError + If there is a client error while checking the bucket. + tos.exceptions.TosServerError + If there is a server error while checking the bucket. + TosfsError + If there is an unknown error while checking the bucket. + + Examples + -------- + >>> fs = TosFileSystem() + >>> fs._exists_bucket("mybucket") + True + >>> fs._exists_bucket("nonexistentbucket") + False + + """ + if self.dircache.get(bucket, False): + return True + else: + try: + if self._ls_from_cache(bucket): + return True + except FileNotFoundError: + # might still be a bucket we can access but don't own + pass + + try: + self.tos_client.head_bucket(bucket) + return True + except tos.exceptions.TosClientError as e: + raise e + except tos.exceptions.TosServerError as e: + if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + return False + else: + raise e + except Exception as e: + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + + def _exists_object( + self, bucket: str, key: str, path: str, version_id: Optional[str] = None + ) -> bool: + """Check if an object exists in the TOS file system. + + It will first check the dircache, + then check the object using the TOS client. + + Parameters + ---------- + bucket : str + The name of the bucket. + key : str + The key of the object. + path : str + The full path of the object. + version_id : str, optional + The version ID of the object (default is None). + + Returns + ------- + bool + True if the object exists, False otherwise. + + Raises + ------ + tos.exceptions.TosClientError + If there is a client error while checking the object. + tos.exceptions.TosServerError + If there is a server error while checking the object. + TosfsError + If there is an unknown error while checking the object. + + Examples + -------- + >>> fs = TosFileSystem() + >>> fs._exists_object("mybucket", "myfile", "tos://mybucket/myfile") + True + >>> fs._exists_object("mybucket", "nonexistentfile", "tos://mybucket/nonexistentfile") + False + + """ + exists_in_cache = self._exists_in_cache(path, bucket, key, version_id) + if exists_in_cache: + return exists_in_cache + + try: + self.tos_client.head_object(bucket, key) + return True + except tos.exceptions.TosClientError as e: + raise e + except tos.exceptions.TosServerError as e: + if e.status_code == TOS_SERVER_RESPONSE_CODE_NOT_FOUND: + return False + else: + raise e + except Exception as e: + raise TosfsError(f"Tosfs failed with unknown error: {e}") from e + + def _exists_in_cache( + self, + path: str, + bucket: str, + key: str, + version_id: Optional[str] = None, + ) -> Optional[bool]: + fullpath = "/".join((bucket, key)) + + try: + entries = self._ls_from_cache(fullpath) + except FileNotFoundError: + return False + + if entries is None: + return False + + if not self.version_aware or version_id is None: + return True + + for entry in entries: + if entry["name"] == fullpath and entry.get("VersionId") == version_id: + return True + + # dircache doesn't support multiple versions, so we really can't tell if + # the one we want exists. + return False + def _lsbuckets(self, refresh: bool = False) -> List[dict]: """List all buckets in the account. diff --git a/tosfs/tests/conftest.py b/tosfs/tests/conftest.py index 9833631..b62bb9d 100644 --- a/tosfs/tests/conftest.py +++ b/tosfs/tests/conftest.py @@ -56,6 +56,6 @@ def temporary_workspace( yield workspace try: tosfs.rmdir(f"{bucket}/{workspace}/") - except Exception: - logger.error("Ignore exception.") + except Exception as e: + logger.error(f"Ignore exception {e}.") assert not tosfs.exists(f"{bucket}/{workspace}/") diff --git a/tosfs/tests/test_tosfs.py b/tosfs/tests/test_tosfs.py index 725862a..6dbbf35 100644 --- a/tosfs/tests/test_tosfs.py +++ b/tosfs/tests/test_tosfs.py @@ -52,7 +52,7 @@ def test_ls_dir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> assert tosfs.ls(f"{bucket}/{temporary_workspace}/nonexistent", detail=False) == [] -def test_ls_cache(tosfs: TosFileSystem, bucket: str) -> None: +def test_ls_cache(tosfs: TosFileSystem, temporary_workspace: str, bucket: str) -> None: with unittest.mock.patch.object( tosfs.tos_client, "list_objects_type2", @@ -71,6 +71,10 @@ def test_ls_cache(tosfs: TosFileSystem, bucket: str) -> None: # Verify that list_objects_type2 was called only once assert tosfs.tos_client.list_objects_type2.call_count == 1 + unittest.mock.patch.stopall() + tosfs.invalidate_cache("") + tosfs.ls(bucket, detail=False, refresh=True) + def test_inner_rm(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None: file_name = random_path() @@ -131,3 +135,27 @@ def test_rmdir(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> N assert f"{bucket}/{temporary_workspace}" not in tosfs.ls( bucket, detail=False, refresh=True ) + + +def test_exists_bucket( + tosfs: TosFileSystem, bucket: str, temporary_workspace: str +) -> None: + assert tosfs.exists("") + assert tosfs.exists("/") + assert tosfs.exists(bucket) + assert not tosfs.exists("nonexistent") + + +def test_exists_object( + tosfs: TosFileSystem, bucket: str, temporary_workspace: str +) -> None: + file_name = random_path() + tosfs.tos_client.put_object(bucket=bucket, key=f"{temporary_workspace}/{file_name}") + assert tosfs.exists(f"{bucket}/{temporary_workspace}") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/") + assert tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}") + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent") + assert not tosfs.exists(f"{bucket}/nonexistent") + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/nonexistent") + tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}") + assert not tosfs.exists(f"{bucket}/{temporary_workspace}/{file_name}")