Skip to content

Commit

Permalink
Add optional file-based listings caching
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Mar 17, 2024
1 parent c2ab54d commit 4dad114
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 57 deletions.
2 changes: 2 additions & 0 deletions ci/environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ dependencies:
- nomkl
- s3fs
- tqdm
- diskcache
- platformdirs
8 changes: 6 additions & 2 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Base Classes
fsspec.core.OpenFiles
fsspec.core.get_fs_token_paths
fsspec.core.url_to_fs
fsspec.dircache.DirCache
fsspec.dircache.MemoryDirCache
fsspec.dircache.FileDirCache
fsspec.FSMap
fsspec.generic.GenericFileSystem
fsspec.registry.register_implementation
Expand Down Expand Up @@ -82,7 +83,10 @@ Base Classes

.. autofunction:: fsspec.core.url_to_fs

.. autoclass:: fsspec.dircache.DirCache
.. autoclass:: fsspec.dircache.MemoryDirCache
:members: __init__

.. autoclass:: fsspec.dircache.FileDirCache
:members: __init__

.. autoclass:: fsspec.FSMap
Expand Down
7 changes: 7 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

Dev
---------

Enhancements

- add file-based listing cache using diskcache

2024.2.0
--------

Expand Down
21 changes: 12 additions & 9 deletions docs/source/features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,18 @@ Listings Caching
----------------

For some implementations, getting file listings (i.e., ``ls`` and anything that
depends on it) is expensive. These implementations use dict-like instances of
:class:`fsspec.dircache.DirCache` to manage the listings.

The cache allows for time-based expiry of entries with the ``listings_expiry_time``
parameter, or LRU expiry with the ``max_paths`` parameter. These can be
set on any implementation instance that uses listings caching; or to skip the
caching altogether, use ``use_listings_cache=False``. That would be appropriate
when the target location is known to be volatile because it is being written
to from other sources.
depends on it) is expensive. These implementations use either dict-like instances of
:class:`fsspec.dircache.MemoryDirCache` or file-based caching with instances of
:class:`fsspec.dircache.FileDirCache` to manage the listings.

The type of cache that is used, can be controlled via the keyword ``listings_cache_type``
that has to be one of `memdircache` or `filedircache`. The cache allows for time-based expiry
of entries with the ``listings_expiry_time`` parameter, or LRU expiry with the ``max_paths``
parameter. These can be set on any implementation instance that uses listings caching; or to
skip the caching altogether, use ``use_listings_cache=False``. That would be appropriate
when the target location is known to be volatile because it is being written to from other
sources. If you want to use the file-based caching, you can also provide the argument
``listings_cache_location`` to determine where the cache file is stored.

When the ``fsspec`` instance writes to the backend, the method ``invalidate_cache``
is called, so that subsequent listing of the given paths will force a refresh. In
Expand Down
4 changes: 2 additions & 2 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ class AsyncFileSystem(AbstractFileSystem):
mirror_sync_methods = True
disable_throttling = False

def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, listings_cache_options=None, **kwargs):
self.asynchronous = asynchronous
self._pid = os.getpid()
if not asynchronous:
self._loop = loop or get_loop()
else:
self._loop = None
self.batch_size = batch_size
super().__init__(*args, **kwargs)
super().__init__(listings_cache_options, *args, **kwargs)

@property
def loop(self):
Expand Down
137 changes: 121 additions & 16 deletions fsspec/dircache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging
import time
from collections.abc import MutableMapping
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import Union, Optional

logger = logging.getLogger(__name__)

class DirCache(MutableMapping):

class MemoryDirCache(MutableMapping):
"""
Caching of directory listings, in a structure like::
Expand All @@ -26,19 +32,15 @@ class DirCache(MutableMapping):

def __init__(
self,
use_listings_cache=True,
listings_expiry_time=None,
expiry_time=None,
max_paths=None,
**kwargs,
):
"""
Parameters
----------
use_listings_cache: bool
If False, this cache never returns items, but always reports KeyError,
and setting items has no effect
listings_expiry_time: int or float (optional)
expiry_time: int or float (optional)
Time in seconds that a listing is considered valid. If None,
listings do not expire.
max_paths: int (optional)
Expand All @@ -49,13 +51,12 @@ def __init__(
self._times = {}
if max_paths:
self._q = lru_cache(max_paths + 1)(lambda key: self._cache.pop(key, None))
self.use_listings_cache = use_listings_cache
self.listings_expiry_time = listings_expiry_time
self.expiry_time = expiry_time
self.max_paths = max_paths

def __getitem__(self, item):
if self.listings_expiry_time is not None:
if self._times.get(item, 0) - time.time() < -self.listings_expiry_time:
if self.expiry_time is not None:
if self._times.get(item, 0) - time.time() < -self.expiry_time:
del self._cache[item]
if self.max_paths:
self._q(item)
Expand All @@ -75,12 +76,10 @@ def __contains__(self, item):
return False

def __setitem__(self, key, value):
if not self.use_listings_cache:
return
if self.max_paths:
self._q(key)
self._cache[key] = value
if self.listings_expiry_time is not None:
if self.expiry_time is not None:
self._times[key] = time.time()

def __delitem__(self, key):
Expand All @@ -93,6 +92,112 @@ def __iter__(self):

def __reduce__(self):
return (
DirCache,
(self.use_listings_cache, self.listings_expiry_time, self.max_paths),
MemoryDirCache,
(self.expiry_time, self.max_paths),
)


class FileDirCache(MutableMapping):
def __init__(
self,
expiry_time=None,
directory=None,
**kwargs,
):
"""
Parameters
----------
expiry_time: int or float (optional)
Time in seconds that a listing is considered valid. If None,
listings do not expire.
directory: str (optional)
Directory path at which the listings cache file is stored. If None,
an autogenerated path at the user folder is created.
"""
import platformdirs
from diskcache import Cache

if not directory:
directory = platformdirs.user_cache_dir(appname="fsspec")
directory = Path(directory) / "dircache" / str(expiry_time)

try:
directory.mkdir(exist_ok=True, parents=True)
except OSError as e:
logger.error(
f"folder for dircache could not be created at {directory}"
)
raise e
else:
logger.info(f"Dircache located at {directory}")

self.directory = directory
self._cache = Cache(directory=str(directory))
self.expiry_time = expiry_time

def __getitem__(self, item):
"""Draw item as fileobject from cache, retry if timeout occurs"""
return self._cache.get(key=item, read=True, retry=True)

def clear(self):
self._cache.clear()

def __len__(self):
return len(list(self._cache.iterkeys()))

def __contains__(self, item):
value = self._cache.get(item, retry=True) # None, if expired
if value:
return True
return False

def __setitem__(self, key, value):
self._cache.set(
key=key, value=value, expire=self.expiry_time, retry=True
)

def __delitem__(self, key):
del self._cache[key]

def __iter__(self):
return (k for k in self._cache.iterkeys() if k in self)

def __reduce__(self):
return (
FileDirCache,
(self.expiry_time, self.directory),
)


class CacheType(Enum):
MEMORY = MemoryDirCache
FILE = FileDirCache


def create_dircache(
cache_type: Union[str, CacheType] = None,
expiry_time: Optional[Union[int, float]] = None,
**kwargs,
) -> Optional[Union[MemoryDirCache, FileDirCache]]:
if not cache_type:
return
cache_map = {
CacheType.MEMORY: MemoryDirCache,
CacheType.FILE: FileDirCache,
}
if isinstance(cache_type, str):
try:
cache_type = CacheType[cache_type.upper()]
except KeyError as e:
raise ValueError(f"Cache type must be one of {', '.join(ct.name.lower() for ct in CacheType)}") from e
expiry_time = expiry_time and float(expiry_time)
if expiry_time == 0.0:
return
return cache_map[cache_type](expiry_time, **kwargs)


if __name__ == "__main__":
d = create_dircache(cache_type="memory")
print(d)
2 changes: 1 addition & 1 deletion fsspec/fuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def main(args):
/historic/packages/RPMS /tmp/ftp \\
-o 'simplecache-cache_storage=/tmp/simplecache' \\
-o 'simplecache-check_files=false[bool]' \\
-o 'ftp-listings_expiry_time=60[int]' \\
-o 'ftp-expiry_time=60[int]' \\
-o 'ftp-username=anonymous' \\
-o 'ftp-password=xieyanbo'
"""
Expand Down
22 changes: 15 additions & 7 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)

from ..caching import AllBytes
from ..dircache import CacheType

# https://stackoverflow.com/a/15926317/3821154
ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
client_kwargs=None,
get_client=get_client,
encoded=False,
listings_cache_options=None,
**storage_options,
):
"""
Expand All @@ -83,11 +85,13 @@ def __init__(
A callable which takes keyword arguments and constructs
an aiohttp.ClientSession. It's state will be managed by
the HTTPFileSystem class.
listings_cache_options: dict
Options for the listings cache.
storage_options: key-value
Any other parameters passed on to requests
cache_type, cache_options: defaults used in open
"""
super().__init__(self, asynchronous=asynchronous, loop=loop, **storage_options)
super().__init__(self, asynchronous=asynchronous, loop=loop, listings_cache_options=listings_cache_options, **storage_options)
self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
self.simple_links = simple_links
self.same_schema = same_scheme
Expand All @@ -104,10 +108,12 @@ def __init__(
# TODO: Maybe rename `self.kwargs` to `self.request_options` to make
# it clearer.
request_options = copy(storage_options)
self.use_listings_cache = request_options.pop("use_listings_cache", False)
request_options.pop("listings_expiry_time", None)
request_options.pop("max_paths", None)
request_options.pop("skip_instance_cache", None)
# self.use_listings_cache = request_options.pop("use_listings_cache", False)
# request_options.pop("expiry_time", None)
# request_options.pop("max_paths", None)
# request_options.pop("skip_instance_cache", None)
# request_options.pop("listings_cache_type", None)
# request_options.pop("listings_cache_location", None)
self.kwargs = request_options

@property
Expand Down Expand Up @@ -201,11 +207,13 @@ async def _ls_real(self, url, detail=True, **kwargs):
return sorted(out)

async def _ls(self, url, detail=True, **kwargs):
if self.use_listings_cache and url in self.dircache:
listings_cache_disabled = self.dircache is None
if not listings_cache_disabled and url in self.dircache:
out = self.dircache[url]
else:
out = await self._ls_real(url, detail=detail, **kwargs)
self.dircache[url] = out
if not listings_cache_disabled:
self.dircache[url] = out
return out

ls = sync_wrapper(_ls)
Expand Down
Loading

0 comments on commit 4dad114

Please sign in to comment.