Skip to content

Commit

Permalink
Introducing limit option for ls (#70)
Browse files Browse the repository at this point in the history
* Introducing limit option for ls

* checklist updates

* document updates

* updated integration testS
  • Loading branch information
vminfant authored Jun 23, 2023
1 parent 95a7b2c commit 3ca24f9
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 126 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,12 @@ Changes are grouped as follows
- Dependency updates for docs & source code.

[unreleased]:


## [0.2.5] - 2023-06-22

### Fixed
- Added `limit` option for `ls` method. It will be useful for file io operations.
- Dependency updates for docs & source code.

[unreleased]:
2 changes: 1 addition & 1 deletion cognite/cdffs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .spec import CdfFileSystem

__version__ = "0.2.4"
__version__ = "0.2.5"
__all__ = ["CdfFileSystem"]

fsspec.register_implementation(CdfFileSystem.protocol, CdfFileSystem)
46 changes: 41 additions & 5 deletions cognite/cdffs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,30 @@ def _add_dirs_to_cache(self, directories: Dict[str, Dict[str, Any]]) -> None:
else:
self.dircache[parent_path].append(dir_val)

def _ls(self, root_dir: str, external_id_prefix: str) -> None:
def _invalidate_dircache(self, inp_path: str) -> None:
# Comments about why we need to invalidate the cache.
# Invalidating a cache will allow the subsequent requests to hit cdf to get the list of files instead of serving
# them from cache as user may use different limit values for each request.
# Caching the results with limit will lead to produce incorrect file list to the user.
# Possible scenarios.
# Scenario #1
# User may call `ls` method with no limits at first. We will cache the results.
# User may call `ls` method with no limits at first. Results will be returned from the cache.
# Scenario #2
# User may call `ls` method with no limits at first. We will cache the results.
# User may call `ls` method with limit. Cdf will be queried to get the results.
# Cache will be invalidated at this point. Any subsequent queries will hit cdf.
# Scenario #3
# User may call `ls` method with limit at first. We don't cache the results.
# Cache will be invalidated at this point. Any subsequent queries will hit cdf.
# User may call `ls` method with limit. Cdf will still be queried to get the results.
# Cache will be invalidated at this point. Any subsequent queries will hit cdf.
# User may call `ls` method with no limits. We will cache the results.
# Any subsequent queries will be served from cache.
if inp_path in self.dircache:
del self.dircache[inp_path]

def _ls(self, root_dir: str, external_id_prefix: str, limit: int = -1) -> None:
"""List the files based on the directory & external Id prefixes extracted from path.
Args:
Expand All @@ -223,7 +246,7 @@ def _ls(self, root_dir: str, external_id_prefix: str) -> None:

# Get all the files that were previously cached when writing. (if applicable)
_file_write_cache = {d_info["name"]: True for d_path in self.dircache for d_info in self.dircache[d_path]}
for file_met in self.cognite_client.files.list(**list_query, limit=-1):
for file_met in self.cognite_client.files.list(**list_query, limit=limit):
if not file_met.external_id:
# Files are expected to have a valid external id.
continue
Expand Down Expand Up @@ -266,19 +289,26 @@ def ls(self, path: str, detail: bool = False, **kwargs: Optional[Any]) -> Union[
FileNotFoundError: When there are no files matching the path given.
"""
root_dir, external_id_prefix, _ = self.split_path(path, validate_suffix=False)

# Invalidating cache when limit is used.
if (limit := kwargs.get("limit", -1)) != -1:
self._invalidate_dircache(root_dir.strip("/"))
self._invalidate_dircache(path.strip("/"))

inp_key = str(Path(root_dir, external_id_prefix)).lstrip("/")
if not (
if limit != -1 or not (
inp_key in self.dircache
and inp_key in self.cdf_list_cache
and time.time() - self.cdf_list_cache[inp_key] < self.cdf_list_expiry_time
):
self._ls(root_dir, external_id_prefix)
self._ls(root_dir, external_id_prefix, limit=limit) # type: ignore

inp_path = path.strip("/")
file_list = self.dircache.get(inp_path, [])
if not file_list:
# It is possible that the requested path is absolute.
file_list = [x for x in self.dircache.get(root_dir.strip("/"), []) if x["name"] == inp_path]

if file_list:
return file_list if detail else [x["name"] for x in file_list]

Expand All @@ -289,7 +319,13 @@ def ls(self, path: str, detail: bool = False, **kwargs: Optional[Any]) -> Union[
elif not [x for x in self.dircache[inp_path] if x["name"] == inp_path]:
self.dircache[inp_path].append({"type": "directory", "name": inp_path})

return self.dircache[inp_path] if detail else [x["name"] for x in self.dircache[inp_path]]
out_list = self.dircache[inp_path] if detail else [x["name"] for x in self.dircache[inp_path]]

# Invalidate the cache if limit is used when listing files. Same remarks as above.
if limit != -1 and inp_path in self.dircache:
del self.dircache[inp_path]

return out_list

def makedirs(self, path: str, exist_ok: bool = True) -> None:
"""Create a directory at a path given.
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
toml==0.10.2
fsspec==2023.6.0
cognite-sdk==6.4.0
cognite-sdk==6.4.7
requests==2.31.0
pydantic==1.10.9
12 changes: 9 additions & 3 deletions docs/source/guidelines.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ Guidelines
| `cdffs://ocean/production_data/atlantic_prod.csv`
- VALID

* User must use a valid file extension when working with CDF Files.
* User must use a valid file extension when working with CDF Files.

* `cdffs://sample_data/test.zarr`

* `cdffs://sample_data/test.csv`

* `cdffs://sample_data/test.parquet`

| Note: It is still recommended to use a valid file extension even when the data is split into multiple files. for example, dask might partition the data into multiple part files. 0.part, 1.part etc and it is still recommended to use an extension as well. Optionally, if you wish to avoid using file extensions especially when the data is split into multiple files, user can explicitly specify directory field in metadata to pass information about the directory prefix. Example,
| Note: It is still recommended to use a valid file extension even when the data is split into multiple files. for example, dask might partition the data into multiple part files. 0.part, 1.part etc and it is still recommended to use an extension as well. Optionally, if you wish to avoid using file extensions especially when the data is split into multiple files, user can explicitly specify directory field in metadata to pass information about the directory prefix. Example,
.. code-block:: python
Expand All @@ -51,9 +51,15 @@ Guidelines
ds = xarray.open_zarr("cdffs://sample_data/test.zarr", storage_options={"connection_config": client_cnf, "cache_type": "all"})
* Users can choose to use a file specific metadata when opening a file for write using file-like API.
* Users can choose to use a file specific metadata when opening a file for write using file-like API.

.. code-block:: python
with fs.open("test_data/test/workfile.csv", mode="wb",file_metadata=FileMetadata(source="test")) as f:
f.write("test_data".encode("utf8"))
* Users can also choose to use `limit` when working with `ls`. It may be useful when using file-like API.

.. code-block:: python
fs.ls("test_data/test/", detail=True, limit=100)
Loading

0 comments on commit 3ca24f9

Please sign in to comment.