From ddb284166c9c8ef05532c565f3086af33583aa75 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 11:56:26 +0300 Subject: [PATCH 01/12] Use input_filenames instead of input metadata --- trollflow2/plugins/__init__.py | 12 +++++++++++ trollflow2/tests/test_trollflow2.py | 31 +++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 9bd3b35a..77f77f68 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1114,3 +1114,15 @@ def callback_close(obj, targs, job, fmat_config): for targ in targs: targ.close() return obj + + +def use_fsspec_cache(job): + """Use the caching from fsspec for (remote) files.""" + import fsspec + from satpy.readers import FSFile + + cache = job["product_list"]["fsspec_cache"] + filenames = job["input_filenames"] + open_files = fsspec.open_files([f"{cache}::{f}" for f in filenames]) + fs_files = [FSFile(open_file) for open_file in open_files] + job["input_filenames"] = fs_files diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index e17788c0..1b0010a2 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2298,5 +2298,36 @@ def test_format_decoration_plain_text(): assert format_decoration(fmat, fmat_config) == fmat_config +@pytest.fixture +def local_test_file(tmp_path): + """Create a local test file for fsspec tests.""" + fname = tmp_path / "file.nc" + with open(fname, "w") as fid: + fid.write("42\n") + return fname + + +def test_fsspec_cache_method_file(local_test_file): + """Test that the configured cache method is applied to the URI of a single file.""" + import fsspec + from satpy.readers import FSFile + + from trollflow2.plugins import use_fsspec_cache + + input_filenames = [f"file://{os.fspath(local_test_file)}"] + job = { + "product_list": { + "fsspec_cache": "simplecache", + "fsspec_cache_dir": "/tmp/simplecache", + }, + "input_filenames": input_filenames, + } + use_fsspec_cache(job) + + for f in job["input_filenames"]: + assert isinstance(f, FSFile) + assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem) + + if __name__ == '__main__': unittest.main() From 146a272a6ce44b5e057256566bbdb2ba3d72ce2c Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 11:57:44 +0300 Subject: [PATCH 02/12] Fix function name and docstring --- trollflow2/tests/test_trollflow2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 1b0010a2..bd9687c9 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2307,8 +2307,8 @@ def local_test_file(tmp_path): return fname -def test_fsspec_cache_method_file(local_test_file): - """Test that the configured cache method is applied to the URI of a single file.""" +def test_use_fsspec_cache(local_test_file): + """Test that the configured cache method is applied to the given input files.""" import fsspec from satpy.readers import FSFile From 7ee1c7b4eaf7ff762ff62930d4d4bd150309d363 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 13:08:36 +0300 Subject: [PATCH 03/12] Add cache directory option --- trollflow2/plugins/__init__.py | 14 +++++++++++++- trollflow2/tests/test_trollflow2.py | 26 +++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 77f77f68..0de3697c 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1122,7 +1122,19 @@ def use_fsspec_cache(job): from satpy.readers import FSFile cache = job["product_list"]["fsspec_cache"] + cache_dir = job["product_list"].get("fsspec_cache_dir") filenames = job["input_filenames"] - open_files = fsspec.open_files([f"{cache}::{f}" for f in filenames]) + cached_filenames = [f"{cache}::{f}" for f in filenames] + if cache == "blockcache": + open_files = fsspec.open_files(cached_filenames, + blockcache={"cache_storage": cache_dir}) + elif cache == "filecache": + open_files = fsspec.open_files(cached_filenames, + filecache={"cache_storage": cache_dir}) + elif cache == "simplecache": + open_files = fsspec.open_files(cached_filenames, + simplecache={"cache_storage": cache_dir}) + else: + open_files = fsspec.open_files(cached_filenames) fs_files = [FSFile(open_file) for open_file in open_files] job["input_filenames"] = fs_files diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index bd9687c9..132768f1 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -29,6 +29,7 @@ import pathlib import queue import unittest +from contextlib import suppress from functools import partial from unittest import mock @@ -2318,7 +2319,6 @@ def test_use_fsspec_cache(local_test_file): job = { "product_list": { "fsspec_cache": "simplecache", - "fsspec_cache_dir": "/tmp/simplecache", }, "input_filenames": input_filenames, } @@ -2329,5 +2329,29 @@ def test_use_fsspec_cache(local_test_file): assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem) +@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache")) +def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache): + """Test that the configured cache directory is used.""" + from trollflow2.plugins import use_fsspec_cache + + input_filenames = [f"file://{os.fspath(local_test_file)}"] + cache_dir = os.fspath(tmp_path / "cache") + job = { + "product_list": { + "fsspec_cache": fsspec_cache, + "fsspec_cache_dir": cache_dir, + }, + "input_filenames": input_filenames, + } + use_fsspec_cache(job) + + # Read the file and chech the cache directory to see the cache is used + # For blockcache we need to ignore the AttributeError + with suppress(AttributeError): + with job["input_filenames"][0].open("r") as fid: + _ = fid.read() + assert os.listdir(cache_dir) + + if __name__ == '__main__': unittest.main() From 7932b76091730be7fd43231cfb3d04bc0d5744a6 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 14:28:28 +0300 Subject: [PATCH 04/12] Add cache cleaning and refactor --- trollflow2/plugins/__init__.py | 9 +++++ trollflow2/tests/test_trollflow2.py | 52 +++++++++++++++++++++++------ 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 0de3697c..089b7bb6 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1138,3 +1138,12 @@ def use_fsspec_cache(job): open_files = fsspec.open_files(cached_filenames) fs_files = [FSFile(open_file) for open_file in open_files] job["input_filenames"] = fs_files + + +def clear_fsspec_cache(job): + """Clear all files in fsspec cache directory.""" + filenames = job["input_filenames"] + + for f in filenames: + if hasattr(f, "_fs"): + f._fs.clear_cache() diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 132768f1..9b9541ac 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2329,13 +2329,9 @@ def test_use_fsspec_cache(local_test_file): assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem) -@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache")) -def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache): - """Test that the configured cache directory is used.""" - from trollflow2.plugins import use_fsspec_cache - - input_filenames = [f"file://{os.fspath(local_test_file)}"] - cache_dir = os.fspath(tmp_path / "cache") +def _get_fsspec_job(tmp_path, test_file, fsspec_cache): + input_filenames = [f"file://{os.fspath(test_file)}"] + cache_dir = os.fspath(tmp_path / fsspec_cache) job = { "product_list": { "fsspec_cache": fsspec_cache, @@ -2343,14 +2339,48 @@ def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache): }, "input_filenames": input_filenames, } - use_fsspec_cache(job) + return job + - # Read the file and chech the cache directory to see the cache is used +def _access_fsspec_file(fname): # For blockcache we need to ignore the AttributeError with suppress(AttributeError): - with job["input_filenames"][0].open("r") as fid: + with fname.open("r") as fid: _ = fid.read() - assert os.listdir(cache_dir) + + +@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache")) +def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache): + """Test that the configured cache directory is used.""" + from trollflow2.plugins import use_fsspec_cache + + job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache) + + use_fsspec_cache(job) + + # Access the file and check the data has been cached + _access_fsspec_file(job["input_filenames"][0]) + + assert os.listdir(job["product_list"]["fsspec_cache_dir"]) + + +@pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache")) +def test_clear_fsspec_cache(tmp_path, local_test_file, fsspec_cache): + """Test clearing fsspec created caches.""" + from trollflow2.plugins import clear_fsspec_cache, use_fsspec_cache + + # Access some data and use fsspec caching + job = _get_fsspec_job(tmp_path, local_test_file, fsspec_cache) + use_fsspec_cache(job) + _access_fsspec_file(job["input_filenames"][0]) + # Make sure the caches exist + assert os.listdir(job["product_list"]["fsspec_cache_dir"]) + + clear_fsspec_cache(job) + + # simplecache cleaning removes the whole cache directory so we need to account for that + with suppress(FileNotFoundError): + assert os.listdir(job["product_list"]["fsspec_cache_dir"]) == [] if __name__ == '__main__': From 5c261621f57d9960cb813f17d429fd5b461afad6 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 15:01:25 +0300 Subject: [PATCH 05/12] Refactor test_use_fsspec_cache --- trollflow2/plugins/__init__.py | 19 ++++++++------- trollflow2/tests/test_trollflow2.py | 37 +++++++++++++---------------- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 089b7bb6..96acbba3 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1125,15 +1125,16 @@ def use_fsspec_cache(job): cache_dir = job["product_list"].get("fsspec_cache_dir") filenames = job["input_filenames"] cached_filenames = [f"{cache}::{f}" for f in filenames] - if cache == "blockcache": - open_files = fsspec.open_files(cached_filenames, - blockcache={"cache_storage": cache_dir}) - elif cache == "filecache": - open_files = fsspec.open_files(cached_filenames, - filecache={"cache_storage": cache_dir}) - elif cache == "simplecache": - open_files = fsspec.open_files(cached_filenames, - simplecache={"cache_storage": cache_dir}) + if cache_dir: + if cache == "blockcache": + open_files = fsspec.open_files(cached_filenames, + blockcache={"cache_storage": cache_dir}) + elif cache == "filecache": + open_files = fsspec.open_files(cached_filenames, + filecache={"cache_storage": cache_dir}) + elif cache == "simplecache": + open_files = fsspec.open_files(cached_filenames, + simplecache={"cache_storage": cache_dir}) else: open_files = fsspec.open_files(cached_filenames) fs_files = [FSFile(open_file) for open_file in open_files] diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 9b9541ac..7247bb61 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2308,20 +2308,28 @@ def local_test_file(tmp_path): return fname -def test_use_fsspec_cache(local_test_file): +def _get_fsspec_job(tmp_path, test_file, fsspec_cache, use_cache_dir=True): + input_filenames = [f"file://{os.fspath(test_file)}"] + product_list = {"fsspec_cache": fsspec_cache} + if use_cache_dir: + cache_dir = os.fspath(tmp_path / fsspec_cache) + product_list["fsspec_cache_dir"] = cache_dir + job = { + "product_list": product_list, + "input_filenames": input_filenames, + } + return job + + +def test_use_fsspec_cache(local_test_file, tmp_path): """Test that the configured cache method is applied to the given input files.""" import fsspec from satpy.readers import FSFile from trollflow2.plugins import use_fsspec_cache - input_filenames = [f"file://{os.fspath(local_test_file)}"] - job = { - "product_list": { - "fsspec_cache": "simplecache", - }, - "input_filenames": input_filenames, - } + job = _get_fsspec_job(tmp_path, local_test_file, "simplecache", use_cache_dir=False) + use_fsspec_cache(job) for f in job["input_filenames"]: @@ -2329,19 +2337,6 @@ def test_use_fsspec_cache(local_test_file): assert isinstance(f._fs, fsspec.implementations.cached.SimpleCacheFileSystem) -def _get_fsspec_job(tmp_path, test_file, fsspec_cache): - input_filenames = [f"file://{os.fspath(test_file)}"] - cache_dir = os.fspath(tmp_path / fsspec_cache) - job = { - "product_list": { - "fsspec_cache": fsspec_cache, - "fsspec_cache_dir": cache_dir, - }, - "input_filenames": input_filenames, - } - return job - - def _access_fsspec_file(fname): # For blockcache we need to ignore the AttributeError with suppress(AttributeError): From 3c89f77d14f21f62e1a83ed58f1e498c911527a2 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 15:08:26 +0300 Subject: [PATCH 06/12] Add documentation --- doc/source/plugins.rst | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/doc/source/plugins.rst b/doc/source/plugins.rst index cb7770fb..1a17ad07 100644 --- a/doc/source/plugins.rst +++ b/doc/source/plugins.rst @@ -359,3 +359,20 @@ Settings: The S3 connection options are handled by the `fsspec `_ configuration mechanism. + +Caching remotely read data +************************** + +The ``use_fsspec_cache`` plugin can be used to turn on cachcing for remotely read files. See below for +cache cleaning. + +Settings used from the product list: + - ``fsspec_cache`` - name of the cache implementation. Can be one of ``blockcache``, ``filecache`` or + ``simplecache`` + - ``fsspec_cache_dir`` - location where the caching is done. If it does not exist, it will be created + +Cleaning file cache +******************* + +If remotely read files are cached, this plugin can be used to automatically clean the caches. +No additional settings are available. From 9e0942f1bf941c0a24eda5995d528ee876df94e4 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Thu, 13 Jun 2024 17:37:03 +0300 Subject: [PATCH 07/12] Fix a too rigid mock'd test --- trollflow2/tests/test_trollflow2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 7247bb61..1920ecda 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2019,7 +2019,8 @@ def test_filepublisher_kwargs_direct_instance_no_nameserver(self): _ = FilePublisher(port=40000, nameservers=False) NoisyPublisher.assert_not_called() - Publisher.assert_called_once_with('tcp://*:40000', name='l2processor', min_port=None, max_port=None) + assert "tcp://*:40000" in Publisher.mock_calls[0].args + assert Publisher.mock_calls[0].kwargs["name"] == "l2processor" def test_filepublisher_kwargs(self): """Test filepublisher keyword argument usage. From 535f55153d3f4c8e33a79e3c696b32aca7d27ee5 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 14 Jun 2024 10:51:26 +0300 Subject: [PATCH 08/12] Update doc/source/plugins.rst Co-authored-by: Martin Raspaud --- doc/source/plugins.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/plugins.rst b/doc/source/plugins.rst index 1a17ad07..5546d74b 100644 --- a/doc/source/plugins.rst +++ b/doc/source/plugins.rst @@ -363,7 +363,7 @@ configuration mechanism. Caching remotely read data ************************** -The ``use_fsspec_cache`` plugin can be used to turn on cachcing for remotely read files. See below for +The ``use_fsspec_cache`` plugin can be used to turn on caching for remotely read files. See below for cache cleaning. Settings used from the product list: From f2701dbf0119780768b780fa721222b166f14d35 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 14 Jun 2024 11:02:20 +0300 Subject: [PATCH 09/12] Simplify cache storage option passing --- trollflow2/plugins/__init__.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 96acbba3..4abc0f17 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1125,18 +1125,10 @@ def use_fsspec_cache(job): cache_dir = job["product_list"].get("fsspec_cache_dir") filenames = job["input_filenames"] cached_filenames = [f"{cache}::{f}" for f in filenames] + kwargs = {} if cache_dir: - if cache == "blockcache": - open_files = fsspec.open_files(cached_filenames, - blockcache={"cache_storage": cache_dir}) - elif cache == "filecache": - open_files = fsspec.open_files(cached_filenames, - filecache={"cache_storage": cache_dir}) - elif cache == "simplecache": - open_files = fsspec.open_files(cached_filenames, - simplecache={"cache_storage": cache_dir}) - else: - open_files = fsspec.open_files(cached_filenames) + kwargs[cache] = {"cache_storage": cache_dir} + open_files = fsspec.open_files(cached_filenames, **kwargs) fs_files = [FSFile(open_file) for open_file in open_files] job["input_filenames"] = fs_files From 1c2844e9249f375897490a1f85d8540413327c1c Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 14 Jun 2024 11:36:59 +0300 Subject: [PATCH 10/12] Put fsspec cache configuration in a single dictionary --- doc/source/plugins.rst | 17 +++++++++++++++-- trollflow2/plugins/__init__.py | 8 ++++---- trollflow2/tests/test_trollflow2.py | 10 +++++----- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/doc/source/plugins.rst b/doc/source/plugins.rst index 5546d74b..85c75899 100644 --- a/doc/source/plugins.rst +++ b/doc/source/plugins.rst @@ -367,9 +367,22 @@ The ``use_fsspec_cache`` plugin can be used to turn on caching for remotely read cache cleaning. Settings used from the product list: - - ``fsspec_cache`` - name of the cache implementation. Can be one of ``blockcache``, ``filecache`` or + - ``fsspec_cache`` - dictionary of the following options + - ``type`` - name of the cache implementation. Can be one of ``blockcache``, ``filecache`` or ``simplecache`` - - ``fsspec_cache_dir`` - location where the caching is done. If it does not exist, it will be created + - ``options`` - keyword arguments passed to the select cache ``type`` + +To use ``simplecache`` and store the caches in ``/tmp/caches`` use this: + +.. code-block:: + + fsspec_cache: + type: simplecache + options: + cache_storage: /tmp/caches + +For other options, see `fsspec API `_ +documentation for ``SimpleCacheFileSystem``, ``WholeFileCacheFileSystem`` and ``BlockCache``, respecively. Cleaning file cache ******************* diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 4abc0f17..35ee9c4a 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1121,13 +1121,13 @@ def use_fsspec_cache(job): import fsspec from satpy.readers import FSFile - cache = job["product_list"]["fsspec_cache"] - cache_dir = job["product_list"].get("fsspec_cache_dir") + cache = job["product_list"]["fsspec_cache"]["type"] + cache_options = job["product_list"]["fsspec_cache"].get("options") filenames = job["input_filenames"] cached_filenames = [f"{cache}::{f}" for f in filenames] kwargs = {} - if cache_dir: - kwargs[cache] = {"cache_storage": cache_dir} + if cache_options: + kwargs[cache] = cache_options open_files = fsspec.open_files(cached_filenames, **kwargs) fs_files = [FSFile(open_file) for open_file in open_files] job["input_filenames"] = fs_files diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 1920ecda..0c82995b 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2311,10 +2311,10 @@ def local_test_file(tmp_path): def _get_fsspec_job(tmp_path, test_file, fsspec_cache, use_cache_dir=True): input_filenames = [f"file://{os.fspath(test_file)}"] - product_list = {"fsspec_cache": fsspec_cache} + product_list = {"fsspec_cache": {"type": fsspec_cache}} if use_cache_dir: cache_dir = os.fspath(tmp_path / fsspec_cache) - product_list["fsspec_cache_dir"] = cache_dir + product_list["fsspec_cache"]["options"] = {"cache_storage": cache_dir} job = { "product_list": product_list, "input_filenames": input_filenames, @@ -2357,7 +2357,7 @@ def test_use_fsspec_cache_dir(local_test_file, tmp_path, fsspec_cache): # Access the file and check the data has been cached _access_fsspec_file(job["input_filenames"][0]) - assert os.listdir(job["product_list"]["fsspec_cache_dir"]) + assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"]) @pytest.mark.parametrize("fsspec_cache", ("blockcache", "filecache", "simplecache")) @@ -2370,13 +2370,13 @@ def test_clear_fsspec_cache(tmp_path, local_test_file, fsspec_cache): use_fsspec_cache(job) _access_fsspec_file(job["input_filenames"][0]) # Make sure the caches exist - assert os.listdir(job["product_list"]["fsspec_cache_dir"]) + assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"]) clear_fsspec_cache(job) # simplecache cleaning removes the whole cache directory so we need to account for that with suppress(FileNotFoundError): - assert os.listdir(job["product_list"]["fsspec_cache_dir"]) == [] + assert os.listdir(job["product_list"]["fsspec_cache"]["options"]["cache_storage"]) == [] if __name__ == '__main__': From 76de724eb4fe201653a40836dee4b279b73b1c5d Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 14 Jun 2024 11:39:03 +0300 Subject: [PATCH 11/12] Check the publisher is called only once --- trollflow2/tests/test_trollflow2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trollflow2/tests/test_trollflow2.py b/trollflow2/tests/test_trollflow2.py index 0c82995b..a95ee47a 100644 --- a/trollflow2/tests/test_trollflow2.py +++ b/trollflow2/tests/test_trollflow2.py @@ -2019,6 +2019,7 @@ def test_filepublisher_kwargs_direct_instance_no_nameserver(self): _ = FilePublisher(port=40000, nameservers=False) NoisyPublisher.assert_not_called() + Publisher.assert_called_once() assert "tcp://*:40000" in Publisher.mock_calls[0].args assert Publisher.mock_calls[0].kwargs["name"] == "l2processor" From bab0378b5d45f3feb102bc85348a850b3b6e2a7b Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 28 Jun 2024 13:41:28 +0300 Subject: [PATCH 12/12] Use public .fs property to clear the cache --- trollflow2/plugins/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trollflow2/plugins/__init__.py b/trollflow2/plugins/__init__.py index 35ee9c4a..7ff862de 100644 --- a/trollflow2/plugins/__init__.py +++ b/trollflow2/plugins/__init__.py @@ -1138,5 +1138,5 @@ def clear_fsspec_cache(job): filenames = job["input_filenames"] for f in filenames: - if hasattr(f, "_fs"): - f._fs.clear_cache() + if hasattr(f, "fs"): + f.fs.clear_cache()