diff --git a/ci/environment-win.yml b/ci/environment-win.yml index 1582f746b..bb90734c3 100644 --- a/ci/environment-win.yml +++ b/ci/environment-win.yml @@ -25,3 +25,5 @@ dependencies: - nomkl - s3fs - tqdm + - diskcache + - platformdirs diff --git a/docs/source/api.rst b/docs/source/api.rst index cb14fe7e1..78b9c3594 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -47,7 +47,8 @@ Base Classes fsspec.core.OpenFiles fsspec.core.get_fs_token_paths fsspec.core.url_to_fs - fsspec.dircache.DirCache + fsspec.dircache.MemoryDirCache + fsspec.dircache.FileDirCache fsspec.FSMap fsspec.generic.GenericFileSystem fsspec.registry.register_implementation @@ -82,7 +83,10 @@ Base Classes .. autofunction:: fsspec.core.url_to_fs -.. autoclass:: fsspec.dircache.DirCache +.. autoclass:: fsspec.dircache.MemoryDirCache + :members: __init__ + +.. autoclass:: fsspec.dircache.FileDirCache :members: __init__ .. autoclass:: fsspec.FSMap diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 8ad6fc093..b07f26c4d 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,13 @@ Changelog ========= +Dev +--------- + +Enhancements + +- add file-based listing cache using diskcache + 2024.2.0 -------- diff --git a/docs/source/features.rst b/docs/source/features.rst index 907084e0d..5e9aaef5b 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst @@ -181,15 +181,18 @@ Listings Caching ---------------- For some implementations, getting file listings (i.e., ``ls`` and anything that -depends on it) is expensive. These implementations use dict-like instances of -:class:`fsspec.dircache.DirCache` to manage the listings. - -The cache allows for time-based expiry of entries with the ``listings_expiry_time`` -parameter, or LRU expiry with the ``max_paths`` parameter. These can be -set on any implementation instance that uses listings caching; or to skip the -caching altogether, use ``use_listings_cache=False``. That would be appropriate -when the target location is known to be volatile because it is being written -to from other sources. +depends on it) is expensive. These implementations use either dict-like instances of +:class:`fsspec.dircache.MemoryDirCache` or file-based caching with instances of +:class:`fsspec.dircache.FileDirCache` to manage the listings. + +The type of cache that is used, can be controlled via the keyword ``listings_cache_type`` +that has to be one of `memdircache` or `filedircache`. The cache allows for time-based expiry +of entries with the ``listings_expiry_time`` parameter, or LRU expiry with the ``max_paths`` +parameter. These can be set on any implementation instance that uses listings caching; or to +skip the caching altogether, use ``use_listings_cache=False``. That would be appropriate +when the target location is known to be volatile because it is being written to from other +sources. If you want to use the file-based caching, you can also provide the argument +``listings_cache_location`` to determine where the cache file is stored. When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache`` is called, so that subsequent listing of the given paths will force a refresh. In diff --git a/fsspec/asyn.py b/fsspec/asyn.py index fb4e05e74..cee4b5d93 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -297,7 +297,7 @@ class AsyncFileSystem(AbstractFileSystem): mirror_sync_methods = True disable_throttling = False - def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): + def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, listings_cache_options=None, **kwargs): self.asynchronous = asynchronous self._pid = os.getpid() if not asynchronous: @@ -305,7 +305,7 @@ def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwar else: self._loop = None self.batch_size = batch_size - super().__init__(*args, **kwargs) + super().__init__(listings_cache_options, *args, **kwargs) @property def loop(self): diff --git a/fsspec/dircache.py b/fsspec/dircache.py index eca19566b..f6e29faf8 100644 --- a/fsspec/dircache.py +++ b/fsspec/dircache.py @@ -1,9 +1,15 @@ +import logging import time from collections.abc import MutableMapping +from enum import Enum from functools import lru_cache +from pathlib import Path +from typing import Union, Optional +logger = logging.getLogger(__name__) -class DirCache(MutableMapping): + +class MemoryDirCache(MutableMapping): """ Caching of directory listings, in a structure like:: @@ -26,8 +32,7 @@ class DirCache(MutableMapping): def __init__( self, - use_listings_cache=True, - listings_expiry_time=None, + expiry_time=None, max_paths=None, **kwargs, ): @@ -35,10 +40,7 @@ def __init__( Parameters ---------- - use_listings_cache: bool - If False, this cache never returns items, but always reports KeyError, - and setting items has no effect - listings_expiry_time: int or float (optional) + expiry_time: int or float (optional) Time in seconds that a listing is considered valid. If None, listings do not expire. max_paths: int (optional) @@ -49,13 +51,12 @@ def __init__( self._times = {} if max_paths: self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None)) - self.use_listings_cache = use_listings_cache - self.listings_expiry_time = listings_expiry_time + self.expiry_time = expiry_time self.max_paths = max_paths def __getitem__(self, item): - if self.listings_expiry_time is not None: - if self._times.get(item, 0) - time.time() < -self.listings_expiry_time: + if self.expiry_time is not None: + if self._times.get(item, 0) - time.time() < -self.expiry_time: del self._cache[item] if self.max_paths: self._q(item) @@ -75,12 +76,10 @@ def __contains__(self, item): return False def __setitem__(self, key, value): - if not self.use_listings_cache: - return if self.max_paths: self._q(key) self._cache[key] = value - if self.listings_expiry_time is not None: + if self.expiry_time is not None: self._times[key] = time.time() def __delitem__(self, key): @@ -93,6 +92,112 @@ def __iter__(self): def __reduce__(self): return ( - DirCache, - (self.use_listings_cache, self.listings_expiry_time, self.max_paths), + MemoryDirCache, + (self.expiry_time, self.max_paths), + ) + + +class FileDirCache(MutableMapping): + def __init__( + self, + expiry_time=None, + directory=None, + **kwargs, + ): + """ + + Parameters + ---------- + expiry_time: int or float (optional) + Time in seconds that a listing is considered valid. If None, + listings do not expire. + directory: str (optional) + Directory path at which the listings cache file is stored. If None, + an autogenerated path at the user folder is created. + + """ + import platformdirs + from diskcache import Cache + + if not directory: + directory = platformdirs.user_cache_dir(appname="fsspec") + directory = Path(directory) / "dircache" / str(expiry_time) + + try: + directory.mkdir(exist_ok=True, parents=True) + except OSError as e: + logger.error( + f"folder for dircache could not be created at {directory}" + ) + raise e + else: + logger.info(f"Dircache located at {directory}") + + self.directory = directory + self._cache = Cache(directory=str(directory)) + self.expiry_time = expiry_time + + def __getitem__(self, item): + """Draw item as fileobject from cache, retry if timeout occurs""" + return self._cache.get(key=item, read=True, retry=True) + + def clear(self): + self._cache.clear() + + def __len__(self): + return len(list(self._cache.iterkeys())) + + def __contains__(self, item): + value = self._cache.get(item, retry=True) # None, if expired + if value: + return True + return False + + def __setitem__(self, key, value): + self._cache.set( + key=key, value=value, expire=self.expiry_time, retry=True + ) + + def __delitem__(self, key): + del self._cache[key] + + def __iter__(self): + return (k for k in self._cache.iterkeys() if k in self) + + def __reduce__(self): + return ( + FileDirCache, + (self.expiry_time, self.directory), ) + + +class CacheType(Enum): + MEMORY = MemoryDirCache + FILE = FileDirCache + + +def create_dircache( + cache_type: Union[str, CacheType] = None, + expiry_time: Optional[Union[int, float]] = None, + **kwargs, +) -> Optional[Union[MemoryDirCache, FileDirCache]]: + if not cache_type: + return + cache_map = { + CacheType.MEMORY: MemoryDirCache, + CacheType.FILE: FileDirCache, + } + if isinstance(cache_type, str): + try: + cache_type = CacheType[cache_type.upper()] + except KeyError as e: + raise ValueError(f"Cache type must be one of {', '.join(ct.name.lower() for ct in CacheType)}") from e + expiry_time = expiry_time and float(expiry_time) + if expiry_time == 0.0: + return + return cache_map[cache_type](expiry_time, **kwargs) + + +if __name__ == "__main__": + d = create_dircache(cache_type="memory") + print(d) diff --git a/fsspec/fuse.py b/fsspec/fuse.py index 6ca8c973c..11b31d2d1 100644 --- a/fsspec/fuse.py +++ b/fsspec/fuse.py @@ -224,7 +224,7 @@ def main(args): /historic/packages/RPMS /tmp/ftp \\ -o 'simplecache-cache_storage=/tmp/simplecache' \\ -o 'simplecache-check_files=false[bool]' \\ - -o 'ftp-listings_expiry_time=60[int]' \\ + -o 'ftp-expiry_time=60[int]' \\ -o 'ftp-username=anonymous' \\ -o 'ftp-password=xieyanbo' """ diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index aa36332b6..163874e7f 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -22,6 +22,7 @@ ) from ..caching import AllBytes +from ..dircache import CacheType # https://stackoverflow.com/a/15926317/3821154 ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P[^"']+)""") @@ -58,6 +59,7 @@ def __init__( client_kwargs=None, get_client=get_client, encoded=False, + listings_cache_options=None, **storage_options, ): """ @@ -83,11 +85,13 @@ def __init__( A callable which takes keyword arguments and constructs an aiohttp.ClientSession. It's state will be managed by the HTTPFileSystem class. + listings_cache_options: dict + Options for the listings cache. storage_options: key-value Any other parameters passed on to requests cache_type, cache_options: defaults used in open """ - super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options) + super().__init__(self, asynchronous=asynchronous, loop=loop, listings_cache_options=listings_cache_options, **storage_options) self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE self.simple_links = simple_links self.same_schema = same_scheme @@ -104,10 +108,12 @@ def __init__( # TODO: Maybe rename `self.kwargs` to `self.request_options` to make # it clearer. request_options = copy(storage_options) - self.use_listings_cache = request_options.pop("use_listings_cache", False) - request_options.pop("listings_expiry_time", None) - request_options.pop("max_paths", None) - request_options.pop("skip_instance_cache", None) + # self.use_listings_cache = request_options.pop("use_listings_cache", False) + # request_options.pop("expiry_time", None) + # request_options.pop("max_paths", None) + # request_options.pop("skip_instance_cache", None) + # request_options.pop("listings_cache_type", None) + # request_options.pop("listings_cache_location", None) self.kwargs = request_options @property @@ -201,11 +207,13 @@ async def _ls_real(self, url, detail=True, **kwargs): return sorted(out) async def _ls(self, url, detail=True, **kwargs): - if self.use_listings_cache and url in self.dircache: + listings_cache_disabled = self.dircache is None + if not listings_cache_disabled and url in self.dircache: out = self.dircache[url] else: out = await self._ls_real(url, detail=detail, **kwargs) - self.dircache[url] = out + if not listings_cache_disabled: + self.dircache[url] = out return out ls = sync_wrapper(_ls) diff --git a/fsspec/implementations/tests/test_http.py b/fsspec/implementations/tests/test_http.py index af1164480..4cd9cbbc9 100644 --- a/fsspec/implementations/tests/test_http.py +++ b/fsspec/implementations/tests/test_http.py @@ -26,14 +26,30 @@ def test_list_invalid_args(server): h.glob(server + "/index/*") -def test_list_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True) +@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"]) +def test_list_cache(server, listings_cache_type): + h = fsspec.filesystem( + "http", use_listings_cache=True, listings_cache_type=listings_cache_type + ) + + h.dircache.clear() # Needed for filedircache + out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] + h.dircache.clear() # clean up + + +@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"]) +def test_list_cache_with_expiry_time_cached(server, listings_cache_type): + h = fsspec.filesystem( + "http", + use_listings_cache=True, + listings_expiry_time=30, + listings_cache_type=listings_cache_type, + ) -def test_list_cache_with_expiry_time_cached(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=30) + h.dircache.clear() # Needed for filedircache # First, the directory cache is not initialized. assert not h.dircache @@ -49,9 +65,19 @@ def test_list_cache_with_expiry_time_cached(server): out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] + h.dircache.clear() # clean up + + +@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"]) +def test_list_cache_with_expiry_time_purged(server, listings_cache_type): + h = fsspec.filesystem( + "http", + use_listings_cache=True, + listings_expiry_time=0.3, + listings_cache_type=listings_cache_type, + ) -def test_list_cache_with_expiry_time_purged(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=0.3) + h.dircache.clear() # Needed for filedircache # First, the directory cache is not initialized. assert not h.dircache @@ -80,9 +106,20 @@ def test_list_cache_with_expiry_time_purged(server): cached_items = h.dircache.get(server + "/index/") assert len(cached_items) == 1 + h.dircache.clear() # clean up -def test_list_cache_reuse(server): - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + +@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"]) +def test_list_cache_reuse(server, listings_cache_type): + h = fsspec.filesystem( + "http", + use_listings_cache=True, + listings_expiry_time=5, + listings_cache_type=listings_cache_type, + ) + + # Needed for filedircache + h.dircache.clear() # First, the directory cache is not initialized. assert not h.dircache @@ -101,14 +138,26 @@ def test_list_cache_reuse(server): # Verify that yet another new instance, with caching enabled, # will see the same cache content again. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=5) + h = fsspec.filesystem( + "http", + use_listings_cache=True, + listings_expiry_time=5, + listings_cache_type=listings_cache_type, + ) assert len(h.dircache) == 1 # However, yet another instance with a different expiry time will also not have # any valid cache content. - h = fsspec.filesystem("http", use_listings_cache=True, listings_expiry_time=666) + h = fsspec.filesystem( + "http", + use_listings_cache=True, + listings_expiry_time=666, + listings_cache_type=listings_cache_type, + ) assert len(h.dircache) == 0 + h.dircache.clear() # clean up + def test_ls_raises_filenotfound(server): h = fsspec.filesystem("http") @@ -123,8 +172,14 @@ def test_list_cache_with_max_paths(server): assert out == [server + "/index/realfile"] -def test_list_cache_with_skip_instance_cache(server): - h = fsspec.filesystem("http", use_listings_cache=True, skip_instance_cache=True) +@pytest.mark.parametrize("listings_cache_type", ["memdircache", "filedircache"]) +def test_list_cache_with_skip_instance_cache(server, listings_cache_type): + h = fsspec.filesystem( + "http", + use_listings_cache=True, + skip_instance_cache=True, + listings_cache_type=listings_cache_type, + ) out = h.glob(server + "/index/*") assert out == [server + "/index/realfile"] diff --git a/fsspec/spec.py b/fsspec/spec.py index 39ee91a16..88f8f84f0 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -9,11 +9,11 @@ from errno import ESPIPE from glob import has_magic from hashlib import sha256 -from typing import ClassVar +from typing import ClassVar, Optional from .callbacks import DEFAULT_CALLBACK from .config import apply_config, conf -from .dircache import DirCache +from .dircache import FileDirCache, MemoryDirCache, create_dircache, CacheType from .transaction import Transaction from .utils import ( _unstrip_protocol, @@ -115,7 +115,7 @@ class AbstractFileSystem(metaclass=_Cached): #: Extra *class attributes* that should be considered when hashing. _extra_tokenize_attributes = () - def __init__(self, *args, **storage_options): + def __init__(self, listings_cache_options: Optional[bool, dict] = None, *args, **storage_options): """Create and configure file-system instance Instances may be cachable, so if similar enough arguments are seen @@ -128,10 +128,11 @@ def __init__(self, *args, **storage_options): Parameters ---------- - use_listings_cache, listings_expiry_time, max_paths: - passed to ``DirCache``, if the implementation supports - directory listing caching. Pass use_listings_cache=False - to disable such caching. + listings_cache_options: bool or dict + If True, a default MemoryDirCache cache is created. + If dict of arguments, used to create a directory cache using + argument cache_type ("memory" or "file"), expiry_time, and + other arguments passed to ``MemoryDirCache`` or ``FileDirCache``. skip_instance_cache: bool If this is a cachable implementation, pass True here to force creating a new instance even if a matching instance exists, and prevent @@ -146,7 +147,12 @@ def __init__(self, *args, **storage_options): self._intrans = False self._transaction = None self._invalidated_caches_in_transaction = [] - self.dircache = DirCache(**storage_options) + if not listings_cache_options: + listings_cache_options = {} + elif listings_cache_options is True: + listings_cache_options = {"cache_type": CacheType.MEMORY} + self.listings_cache_options = listings_cache_options + self.dircache = create_dircache(**listings_cache_options) if storage_options.pop("add_docs", None): warnings.warn("add_docs is no longer supported.", FutureWarning) diff --git a/setup.py b/setup.py index e511faf2f..7471f1784 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ "abfs": ["adlfs"], "adl": ["adlfs"], "dask": ["dask", "distributed"], + "dircache": ["diskcache", "platformdirs"], "dropbox": ["dropboxdrivefs", "requests", "dropbox"], "gcs": ["gcsfs"], "git": ["pygit2"], diff --git a/tox.ini b/tox.ini new file mode 100644 index 000000000..e69de29bb