Skip to content

Commit

Permalink
Merge pull request #205 from pnuu/fsspec-caching-and-cleaning
Browse files Browse the repository at this point in the history
Add plugins for fsspec caching and cache cleaning
  • Loading branch information
mraspaud authored Sep 11, 2024
2 parents decdd9c + e7ae67c commit 8e7d1b4
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 0 deletions.
30 changes: 30 additions & 0 deletions doc/source/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,33 @@ Settings:
The S3 connection options are handled by the
`fsspec <https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_
configuration mechanism.

Caching remotely read data
**************************

The ``use_fsspec_cache`` plugin can be used to turn on caching for remotely read files. See below for
cache cleaning.

Settings used from the product list:
- ``fsspec_cache`` - dictionary of the following options
- ``type`` - name of the cache implementation. Can be one of ``blockcache``, ``filecache`` or
``simplecache``
- ``options`` - keyword arguments passed to the select cache ``type``

To use ``simplecache`` and store the caches in ``/tmp/caches`` use this:

.. code-block::
fsspec_cache:
type: simplecache
options:
cache_storage: /tmp/caches
For other options, see `fsspec API <https://filesystem-spec.readthedocs.io/en/latest/api.html>`_
documentation for ``SimpleCacheFileSystem``, ``WholeFileCacheFileSystem`` and ``BlockCache``, respecively.

Cleaning file cache
*******************

If remotely read files are cached, this plugin can be used to automatically clean the caches.
No additional settings are available.
26 changes: 26 additions & 0 deletions trollflow2/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,29 @@ def callback_close(obj, targs, job, fmat_config):
for targ in targs:
targ.close()
return obj


def use_fsspec_cache(job):
"""Use the caching from fsspec for (remote) files."""
import fsspec
from satpy.readers import FSFile

cache = job["product_list"]["fsspec_cache"]["type"]
cache_options = job["product_list"]["fsspec_cache"].get("options")
filenames = job["input_filenames"]
cached_filenames = [f"{cache}::{f}" for f in filenames]
kwargs = {}
if cache_options:
kwargs[cache] = cache_options
open_files = fsspec.open_files(cached_filenames, **kwargs)
fs_files = [FSFile(open_file) for open_file in open_files]
job["input_filenames"] = fs_files


def clear_fsspec_cache(job):
"""Clear all files in fsspec cache directory."""
filenames = job["input_filenames"]

for f in filenames:
if hasattr(f, "fs"):
f.fs.clear_cache()
80 changes: 80 additions & 0 deletions trollflow2/tests/test_trollflow2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pathlib
import queue
import unittest
from contextlib import suppress
from functools import partial
from unittest import mock

Expand Down Expand Up @@ -2300,5 +2301,84 @@ def test_format_decoration_plain_text():
assert format_decoration(fmat, fmat_config) == fmat_config


@pytest.fixture
def local_test_file(tmp_path):
"""Create a local test file for fsspec tests."""
fname = tmp_path / "file.nc"
with open(fname, "w") as fid:
fid.write("42\n")
return fname


def _get_fsspec_job(tmp_path, test_file, fsspec_cache, use_cache_dir=True):
input_filenames = [f"file://{os.fspath(test_file)}"]
product_list = {"fsspec_cache": {"type": fsspec_cache}}
if use_cache_dir:
cache_dir = os.fspath(tmp_path / fsspec_cache)
product_list["fsspec_cache"]["options"] = {"cache_storage": cache_dir}
job = {
"product_list": product_list,
"input_filenames": input_filenames,
}
return job


def test_use_fsspec_cache(local_test_file, tmp_path):
"""Test that the configured cache method is applied to the given input files."""
import fsspec
from satpy.readers import FSFile

from trollflow2.plugins import use_fsspec_cache

job = _get_fsspec_job(tmp_path, local_test_file, "simplecache", use_cache_dir=False)

use_fsspec_cache(job)

for f in job["input_filenames"]:
assert isinstance(f, FSFile)
assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem)


def _access_fsspec_file(fname):
# For blockcache we need to ignore the AttributeError
with suppress(AttributeError):
with fname.open("r") as fid:
_ = fid.read()


@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache"))
def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache):
"""Test that the configured cache directory is used."""
from trollflow2.plugins import use_fsspec_cache

job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache)

use_fsspec_cache(job)

# Access the file and check the data has been cached
_access_fsspec_file(job["input_filenames"][0])

assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"])


@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache"))
def test_clear_fsspec_cache(tmp_path, local_test_file, fsspec_cache):
"""Test clearing fsspec created caches."""
from trollflow2.plugins import clear_fsspec_cache, use_fsspec_cache

# Access some data and use fsspec caching
job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache)
use_fsspec_cache(job)
_access_fsspec_file(job["input_filenames"][0])
# Make sure the caches exist
assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"])

clear_fsspec_cache(job)

# simplecache cleaning removes the whole cache directory so we need to account for that
with suppress(FileNotFoundError):
assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"]) == []


if __name__ == '__main__':
unittest.main()

0 comments on commit 8e7d1b4

Please sign in to comment.