From a69d5de9df56f64f0fbcaa75d8258f4b4154f679 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 09:25:08 +0200 Subject: [PATCH 1/8] Rename get_dask_client --- .pre-commit-config.yaml | 2 +- trollflow2/launcher.py | 7 ++----- trollflow2/tests/test_launcher.py | 26 +++++++++++++------------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1ccbf8bf..6091c6f2 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: flake8 additional_dependencies: [flake8-docstrings, flake8-debugger, flake8-bugbear] - repo: https://github.com/pycqa/isort - rev: 5.11.4 + rev: 5.12.0 hooks: - id: isort language_version: python3 diff --git a/trollflow2/launcher.py b/trollflow2/launcher.py index 8d6f0427..6514e038 100644 --- a/trollflow2/launcher.py +++ b/trollflow2/launcher.py @@ -322,7 +322,7 @@ def expand(yml): return yml -def get_dask_client(config): +def get_dask_distributed_client(config): """Create Dask client if configured.""" client = None @@ -375,10 +375,7 @@ def print_traces(signum, frame): def process(msg, prod_list, produced_files): """Process a message.""" config = read_config(prod_list, Loader=UnsafeLoader) - - # Get distributed client - client = get_dask_client(config) - + client = get_dask_distributed_client(config) try: config = expand(config) jobs = message_to_jobs(msg, config) diff --git a/trollflow2/tests/test_launcher.py b/trollflow2/tests/test_launcher.py index c2cf9701..ea0807d9 100644 --- a/trollflow2/tests/test_launcher.py +++ b/trollflow2/tests/test_launcher.py @@ -523,7 +523,7 @@ def setUp(self): yaml=mock.DEFAULT, message_to_jobs=mock.DEFAULT, open=mock.DEFAULT, - get_dask_client=mock.DEFAULT) + get_dask_distributed_client=mock.DEFAULT) mocks = self.patcher.start() self.traceback = mocks['traceback'] @@ -538,11 +538,11 @@ def setUp(self): # Is this necessary? self.yaml.YAMLError = YAMLError - self.get_dask_client = mocks['get_dask_client'] + self.get_dask_distributed_client = mocks['get_dask_distributed_client'] # Make a client that has no `.close()` method (for coverage) self.client = mock.MagicMock() self.client.close.side_effect = AttributeError - self.get_dask_client.return_value = self.client + self.get_dask_distributed_client.return_value = self.client self.expand = mocks['expand'] self.fake_plugin = mock.MagicMock() @@ -587,7 +587,7 @@ def test_plugin_is_used(self): def test_dask_client_is_used(self): """Test that the dask client is used.""" process("msg", "prod_list", self.queue) - self.get_dask_client.assert_called_once() + self.get_dask_distributed_client.assert_called_once() def test_dask_client_is_closed(self): """Test that the dask client is closed.""" @@ -660,8 +660,8 @@ def test_workers_initialized(): tmp_file.close() try: - with mock.patch("trollflow2.launcher.get_dask_client") as gdc: - # `get_dask_client()` is called just after config reading, so if we get there loading worked + with mock.patch("trollflow2.launcher.get_dask_distributed_client") as gdc: + # `get_dask_distributed_client()` is called just after config reading, so if we get there loading worked gdc.side_effect = StopIteration try: process("msg", fname, queue) @@ -671,9 +671,9 @@ def test_workers_initialized(): os.remove(fname) -def test_get_dask_client(caplog): +def test_get_dask_distributed_clien(caplog): """Test getting dask client.""" - from trollflow2.launcher import get_dask_client + from trollflow2.launcher import get_dask_distributed_client ncores = mock.MagicMock() ncores.return_value = {} @@ -684,7 +684,7 @@ def test_get_dask_client(caplog): # No client configured config = {} with caplog.at_level(logging.DEBUG): - res = get_dask_client(config) + res = get_dask_distributed_client(config) assert "Distributed processing not configured" in caplog.text caplog.clear() assert res is None @@ -695,7 +695,7 @@ def test_get_dask_client(caplog): } } with caplog.at_level(logging.WARNING): - res = get_dask_client(config) + res = get_dask_distributed_client(config) assert "No workers available, reverting to default scheduler" in caplog.text caplog.clear() assert res is None @@ -705,13 +705,13 @@ def test_get_dask_client(caplog): # The scheduler had no workers, the client doesn't have `.close()` client.close.side_effect = AttributeError with caplog.at_level(logging.WARNING): - res = get_dask_client(config) + res = get_dask_distributed_client(config) assert res is None # Config is valid, scheduler has workers ncores.return_value = {'a': 1, 'b': 1} with caplog.at_level(logging.DEBUG): - res = get_dask_client(config) + res = get_dask_distributed_client(config) assert "Using dask distributed client" in caplog.text caplog.clear() assert res is client @@ -720,7 +720,7 @@ def test_get_dask_client(caplog): # Scheduler couldn't connect to workers client_class.side_effect = OSError with caplog.at_level(logging.ERROR): - res = get_dask_client(config) + res = get_dask_distributed_client(config) assert "Scheduler not found, reverting to default scheduler" in caplog.text caplog.clear() assert res is None From 978df297f25c111e70db5175bc5c12b444ef7174 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 11:00:18 +0200 Subject: [PATCH 2/8] Refactor launcher for easier use by the cli --- trollflow2/launcher.py | 79 +++++++++++++++++++------------ trollflow2/tests/test_launcher.py | 38 ++++++++------- 2 files changed, 69 insertions(+), 48 deletions(-) diff --git a/trollflow2/launcher.py b/trollflow2/launcher.py index 6514e038..5e1536a8 100644 --- a/trollflow2/launcher.py +++ b/trollflow2/launcher.py @@ -257,6 +257,13 @@ def get_area_priorities(product_list): def message_to_jobs(msg, product_list): """Convert a posttroll message *msg* to a list of jobs given a *product_list*.""" + input_filenames = _extract_filenames(msg) + input_mda = msg.data + return file_list_to_jobs(input_filenames, product_list, input_mda) + + +def file_list_to_jobs(input_filenames, product_list, input_mda): + """Convert a file list to jobs.""" formats = product_list['product_list'].get('formats', None) for _product, pconfig in plist_iter(product_list['product_list'], level='product'): if 'formats' not in pconfig and formats is not None: @@ -264,11 +271,10 @@ def message_to_jobs(msg, product_list): jobs = OrderedDict() priorities = get_area_priorities(product_list) # TODO: check the uri is accessible from the current host. - input_filenames = _extract_filenames(msg) for prio, areas in priorities.items(): jobs[prio] = OrderedDict() jobs[prio]['input_filenames'] = input_filenames.copy() - jobs[prio]['input_mda'] = msg.data.copy() + jobs[prio]['input_mda'] = input_mda.copy() jobs[prio]['product_list'] = {} for section in product_list: if section == 'product_list': @@ -374,35 +380,20 @@ def print_traces(signum, frame): def process(msg, prod_list, produced_files): """Process a message.""" + """Convert a posttroll message *msg* to a list of jobs given a *product_list*.""" + input_filenames = _extract_filenames(msg) + input_mda = msg.data + process_files(input_filenames, input_mda, prod_list, produced_files) + + +def process_files(input_filenames, input_mda, prod_list, produced_files): + """Process files.""" config = read_config(prod_list, Loader=UnsafeLoader) client = get_dask_distributed_client(config) try: config = expand(config) - jobs = message_to_jobs(msg, config) - for prio in sorted(jobs.keys()): - job = jobs[prio] - job['processing_priority'] = prio - job['produced_files'] = produced_files - try: - for wrk in config['workers']: - cwrk = wrk.copy() - if "timeout" in cwrk: - - def _timeout_handler(signum, frame, wrk=wrk): - raise TimeoutError( - f"Timeout for {wrk['fun']!s} expired " - f"after {wrk['timeout']:.1f} seconds, " - "giving up") - signal.signal(signal.SIGALRM, _timeout_handler) - # using setitimer because it accepts floats, - # unlike signal.alarm - signal.setitimer(signal.ITIMER_REAL, - cwrk.pop("timeout")) - cwrk.pop('fun')(job, **cwrk) - if "timeout" in cwrk: - signal.alarm(0) # cancel the alarm - except AbortProcessing as err: - logger.warning(str(err)) + jobs = file_list_to_jobs(input_filenames, config, input_mda) + process_jobs(config["workers"], jobs, produced_files) except Exception: logger.exception("Process crashed") if "crash_handlers" in config: @@ -420,13 +411,39 @@ def _timeout_handler(signum, frame, wrk=wrk): except AttributeError: continue del config - try: + with suppress(AttributeError): client.close() - except AttributeError: - pass gc.collect() +def process_jobs(workers, jobs, produced_files): + """Process the jobs.""" + for prio in sorted(jobs.keys()): + job = jobs[prio] + job['processing_priority'] = prio + job['produced_files'] = produced_files + try: + for wrk in workers: + cwrk = wrk.copy() + if "timeout" in cwrk: + def _timeout_handler(signum, frame, wrk=wrk): + raise TimeoutError( + f"Timeout for {wrk['fun']!s} expired " + f"after {wrk['timeout']:.1f} seconds, " + "giving up") + + signal.signal(signal.SIGALRM, _timeout_handler) + # using setitimer because it accepts floats, + # unlike signal.alarm + signal.setitimer(signal.ITIMER_REAL, + cwrk.pop("timeout")) + cwrk.pop('fun')(job, **cwrk) + if "timeout" in cwrk: + signal.alarm(0) # cancel the alarm + except AbortProcessing as err: + logger.warning(str(err)) + + def read_config(fname=None, raw_string=None, Loader=SafeLoader): """Read the configuration file.""" try: @@ -436,6 +453,8 @@ def read_config(fname=None, raw_string=None, Loader=SafeLoader): raw_config = fid.read() elif raw_string: raw_config = raw_string + if not raw_config: + raise IOError config = yaml.load(_remove_null_keys(raw_config), Loader=Loader) except (IOError, yaml.YAMLError): # Either open() or yaml.load() failed diff --git a/trollflow2/tests/test_launcher.py b/trollflow2/tests/test_launcher.py index ea0807d9..e16958ed 100644 --- a/trollflow2/tests/test_launcher.py +++ b/trollflow2/tests/test_launcher.py @@ -521,7 +521,7 @@ def setUp(self): sendmail=mock.DEFAULT, expand=mock.DEFAULT, yaml=mock.DEFAULT, - message_to_jobs=mock.DEFAULT, + file_list_to_jobs=mock.DEFAULT, open=mock.DEFAULT, get_dask_distributed_client=mock.DEFAULT) mocks = self.patcher.start() @@ -549,11 +549,13 @@ def setUp(self): # Return something resembling a config self.expand.return_value = {"workers": [{"fun": self.fake_plugin}]} - self.message_to_jobs = mocks['message_to_jobs'] - self.message_to_jobs.return_value = {1: {"job1": dict([])}} + self.file_list_to_jobs = mocks['file_list_to_jobs'] + self.file_list_to_jobs.return_value = {1: {"job1": dict([])}} self.queue = mock.MagicMock() + self.msg = mock.MagicMock() + def tearDown(self): """Tear down the test case.""" self.patcher.stop() @@ -561,49 +563,49 @@ def tearDown(self): def test_plugin_is_stopped_after_processing(self): """Test plugin is stopped after processing.""" self.fake_plugin.stop.assert_not_called() - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.fake_plugin.stop.assert_called_once() def test_product_list_is_opened(self): """Test product list is opened.""" - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.open.assert_called_with("prod_list") def test_yaml_config_is_read_only_once(self): """Test that the yaml config is read only once.""" - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.yaml.load.assert_called_once() def test_workers_config_is_passed_down(self): """Test that the workers config is used.""" - process("msg", "prod_list", self.queue) - self.message_to_jobs.assert_called_with("msg", {"workers": [{"fun": self.fake_plugin}]}) + process(self.msg, "prod_list", self.queue) + self.file_list_to_jobs.assert_called_with([], {"workers": [{"fun": self.fake_plugin}]}, self.msg.data) def test_plugin_is_used(self): """Test that the plugin is being used.""" - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.fake_plugin.assert_called_with({'job1': {}, 'processing_priority': 1, 'produced_files': self.queue}) def test_dask_client_is_used(self): """Test that the dask client is used.""" - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.get_dask_distributed_client.assert_called_once() def test_dask_client_is_closed(self): """Test that the dask client is closed.""" - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) self.client.close.assert_called_once() def test_plugin_with_no_stop_work(self): """Test that plugins with no `stop` method (like regular functions) can be used.""" self.fake_plugin.stop = mock.MagicMock(side_effect=AttributeError('boo')) - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) def test_error_propagation(self): """Test that errors are propagated.""" self.fake_plugin.side_effect = KeyboardInterrupt with pytest.raises(KeyboardInterrupt): - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) def test_crash_handler_call(self): """Test crash hander call. @@ -615,7 +617,7 @@ def test_crash_handler_call(self): "handlers": [{"fun": self.sendmail}]}} self.expand.return_value = crash_handlers with pytest.raises(KeyError): - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) config = crash_handlers['crash_handlers']['config'] self.sendmail.assert_called_once_with(config, 'baz') @@ -623,13 +625,13 @@ def test_open_missing_file(self): """Test failure in open() due to a missing config file.""" self.open.side_effect = IOError with pytest.raises(IOError): - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) def test_open_bad_yaml(self): """Test failure in yaml.load(), e.g. bad formatting.""" self.open.side_effect = YAMLError with pytest.raises(YAMLError): - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) def test_timeout_in_running_job(self): """Test timeout in running job.""" @@ -642,7 +644,7 @@ def wait(job): self.expand.return_value = {"workers": [{"fun": self.fake_plugin, "timeout": 0.05}]} with pytest.raises(TimeoutError, match="Timeout for .* expired " "after 0.1 seconds"): - process("msg", "prod_list", self.queue) + process(self.msg, "prod_list", self.queue) # wait a little to ensure alarm is not raised later time.sleep(0.11) @@ -664,7 +666,7 @@ def test_workers_initialized(): # `get_dask_distributed_client()` is called just after config reading, so if we get there loading worked gdc.side_effect = StopIteration try: - process("msg", fname, queue) + process(mock.MagicMock(), fname, queue) except StopIteration: pass finally: From a9d5439377681dc1b2ee9eb4722aee95c5fd5874 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 11:04:46 +0200 Subject: [PATCH 3/8] Add the cli --- trollflow2/cli.py | 52 +++++++++++++++++ trollflow2/tests/test_cli.py | 106 +++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+) create mode 100644 trollflow2/cli.py create mode 100644 trollflow2/tests/test_cli.py diff --git a/trollflow2/cli.py b/trollflow2/cli.py new file mode 100644 index 00000000..c06f4e01 --- /dev/null +++ b/trollflow2/cli.py @@ -0,0 +1,52 @@ +"""Trollflow2 command line interface.""" + +import argparse +import json +import logging + +import yaml + +from trollflow2.launcher import logging_on, process_files + +logger = logging.getLogger(__name__) + + +def parse_args(args=None): + """Parse commandline arguments.""" + parser = argparse.ArgumentParser( + description='Launch trollflow2 processing with Satpy on the provides files then quit.') + parser.add_argument("files", nargs='*', + help="Data files to run on", + type=str) + parser.add_argument("-p", "--product-list", + help="The yaml file with the product list", + type=str, + required=True) + parser.add_argument("-m", "--metadata", + help="Metadata (json) to pass on", + type=str, required=False, default="{}") + parser.add_argument("-c", "--log-config", + help="Log config file (yaml) to use", + type=str, required=False, default=None) + return parser.parse_args(args) + + +def cli(args=None): + """Command line interface.""" + args = parse_args(args) + + log_config = _read_log_config(args) + + with logging_on(log_config): + logger.info("Starting Satpy.") + produced_files = [] + process_files(args.files, json.loads(args.metadata), args.product_list, produced_files) + + +def _read_log_config(args): + """Read the config.""" + log_config = args.log_config + if log_config is not None: + with open(log_config) as fd: + log_config = yaml.safe_load(fd.read()) + return log_config diff --git a/trollflow2/tests/test_cli.py b/trollflow2/tests/test_cli.py new file mode 100644 index 00000000..89f46f61 --- /dev/null +++ b/trollflow2/tests/test_cli.py @@ -0,0 +1,106 @@ +"""Tests for the CLI.""" + +import json +import os +from unittest import mock + +import pytest + +from trollflow2.cli import cli, parse_args +from trollflow2.tests.test_launcher import pnuus_log_config + +yaml_test_noop = """ +product_list: + output_dir: &output_dir + /mnt/output/ + publish_topic: /MSG_0deg/L3 + reader: seviri_l1b_hrit + fname_pattern: + "{start_time:%Y%m%d_%H%M}_{platform_name}_{areaname}_{productname}.{format}" + formats: + - format: tif + writer: geotiff + areas: + euro4: + areaname: euro4 + products: + overview: + productname: overview + airmass: + productname: airmass + natural_color: + productname: natural_color + night_fog: + productname: night_fog + +workers: [] +""" + + +def test_arg_parsing(): + """Test parsing args.""" + product_list = "my_product_list.yaml" + log_config = "my_log_config.yaml" + files = ["file1", "file2"] + res = parse_args(["-p", product_list, *files, "-c", log_config]) + assert res.product_list == product_list + assert res.files == files + assert res.log_config == log_config + + +def test_arg_parsing_fails_without_product_list(): + """Test args parsing fails without a product list.""" + log_config = "my_log_config.yaml" + files = ["file1", "file2"] + with pytest.raises(SystemExit): + parse_args([*files, "-c", log_config]) + + +def test_cli_logs_starting(tmp_path, caplog, empty_product_list): + """Test that the cli logs satpy starting.""" + product_list = empty_product_list + log_config = "my_log_config.yaml" + log_config_filename = os.fspath(tmp_path / log_config) + with open(log_config_filename, mode="w") as fd: + fd.write(pnuus_log_config) + files = ["file1", "file2"] + with pytest.raises(IOError): + cli(["-p", os.fspath(product_list), *files, "-c", log_config_filename]) + assert "Starting Satpy." in caplog.text + + +@pytest.fixture +def empty_product_list(tmp_path): + """Create an empty product list.""" + product_list = "my_product_list.yaml" + product_file = tmp_path / product_list + with open(product_file, "w"): + pass + return product_file + + +def test_cli_raises_an_error_when_product_list_is_empty(tmp_path, caplog, empty_product_list): + """Test that the cli raises an error when the product list is empty.""" + product_list = empty_product_list + files = ["file1", "file2"] + product_list_filename = os.fspath(tmp_path / product_list) + with open(product_list_filename, "w"): + pass + with pytest.raises(IOError): + cli(["-p", os.fspath(product_list), *files]) + assert "check YAML file" in caplog.text + + +def test_cli_starts_processing_when_files_are_provided(tmp_path): + """Test that the cli start processing when files are provided.""" + product_list = "my_product_list.yaml" + files = ["file1", "file2"] + product_list_filename = os.fspath(tmp_path / product_list) + with open(product_list_filename, "w") as fd: + fd.write(yaml_test_noop) + from trollflow2.launcher import process_files + new_process = mock.Mock(wraps=process_files) + mda = {"dish": "pizza"} + with mock.patch("trollflow2.cli.process_files", new=new_process): + cli(["-p", os.fspath(product_list_filename), "-m", json.dumps(mda), *files]) + new_process.assert_called_once_with(files, mda, product_list_filename, []) From 2bfef9bf9013663c9fe2f9828814e963b3cc58e2 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 11:31:46 +0200 Subject: [PATCH 4/8] Add entry point for the cli --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 81c6a6ea..5e3e31d4 100644 --- a/setup.py +++ b/setup.py @@ -65,5 +65,7 @@ tests_require=['pytest', 'mock', 'rasterio'], python_requires='>=3.9', test_suite='trollflow2.tests.suite', - use_scm_version=True + use_scm_version=True, + entry_points={ + 'console_scripts': ['satpy_cli = trollflow2.cli:cli',]} ) From cd44eb6648b3ede9f8ebb0b7c3d319f690c787ac Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 11:32:31 +0200 Subject: [PATCH 5/8] Fix produced files type --- trollflow2/cli.py | 3 ++- trollflow2/tests/test_cli.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/trollflow2/cli.py b/trollflow2/cli.py index c06f4e01..13e13b6f 100644 --- a/trollflow2/cli.py +++ b/trollflow2/cli.py @@ -3,6 +3,7 @@ import argparse import json import logging +from queue import Queue import yaml @@ -39,7 +40,7 @@ def cli(args=None): with logging_on(log_config): logger.info("Starting Satpy.") - produced_files = [] + produced_files = Queue() process_files(args.files, json.loads(args.metadata), args.product_list, produced_files) diff --git a/trollflow2/tests/test_cli.py b/trollflow2/tests/test_cli.py index 89f46f61..a0b760d9 100644 --- a/trollflow2/tests/test_cli.py +++ b/trollflow2/tests/test_cli.py @@ -102,5 +102,6 @@ def test_cli_starts_processing_when_files_are_provided(tmp_path): new_process = mock.Mock(wraps=process_files) mda = {"dish": "pizza"} with mock.patch("trollflow2.cli.process_files", new=new_process): - cli(["-p", os.fspath(product_list_filename), "-m", json.dumps(mda), *files]) - new_process.assert_called_once_with(files, mda, product_list_filename, []) + with mock.patch("trollflow2.cli.Queue") as q_mock: + cli(["-p", os.fspath(product_list_filename), "-m", json.dumps(mda), *files]) + new_process.assert_called_once_with(files, mda, product_list_filename, q_mock.return_value) From 13f9285074c2ffd5b42672dd6a272b3310e8e904 Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Fri, 2 Jun 2023 12:47:05 +0200 Subject: [PATCH 6/8] Remove stickler config --- .stickler.yml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .stickler.yml diff --git a/.stickler.yml b/.stickler.yml deleted file mode 100644 index e5e6fe74..00000000 --- a/.stickler.yml +++ /dev/null @@ -1,7 +0,0 @@ -linters: - flake8: - fixer: true - python: 3 - config: setup.cfg -fixers: - enable: true From 96610b162a6e42b3541cc0481aa59c9ea2aa2e1e Mon Sep 17 00:00:00 2001 From: Martin Raspaud Date: Thu, 14 Dec 2023 13:04:51 +0100 Subject: [PATCH 7/8] Allow parsing datetimes --- trollflow2/cli.py | 26 ++++++++++++- trollflow2/tests/test_cli.py | 73 ++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/trollflow2/cli.py b/trollflow2/cli.py index 13e13b6f..c1d7dc30 100644 --- a/trollflow2/cli.py +++ b/trollflow2/cli.py @@ -3,6 +3,7 @@ import argparse import json import logging +from datetime import datetime from queue import Queue import yaml @@ -41,7 +42,8 @@ def cli(args=None): with logging_on(log_config): logger.info("Starting Satpy.") produced_files = Queue() - process_files(args.files, json.loads(args.metadata), args.product_list, produced_files) + process_files(args.files, json.loads(args.metadata, object_hook=datetime_decoder), + args.product_list, produced_files) def _read_log_config(args): @@ -51,3 +53,25 @@ def _read_log_config(args): with open(log_config) as fd: log_config = yaml.safe_load(fd.read()) return log_config + + +def datetime_decoder(dct): + """Decode datetimes to python objects.""" + if isinstance(dct, list): + pairs = enumerate(dct) + elif isinstance(dct, dict): + pairs = dct.items() + result = [] + for key, val in pairs: + if isinstance(val, str): + try: + val = datetime.fromisoformat(val) + except ValueError: + pass + elif isinstance(val, (dict, list)): + val = datetime_decoder(val) + result.append((key, val)) + if isinstance(dct, list): + return [x[1] for x in result] + elif isinstance(dct, dict): + return dict(result) diff --git a/trollflow2/tests/test_cli.py b/trollflow2/tests/test_cli.py index a0b760d9..4021eaaa 100644 --- a/trollflow2/tests/test_cli.py +++ b/trollflow2/tests/test_cli.py @@ -2,9 +2,11 @@ import json import os +from datetime import datetime from unittest import mock import pytest +from trollsift import compose from trollflow2.cli import cli, parse_args from trollflow2.tests.test_launcher import pnuus_log_config @@ -37,6 +39,30 @@ """ +yaml_test_load_save = """ +product_list: + output_dir: &output_dir + {output_dir} + publish_topic: /MSG_0deg/L3 + reader: satpy_cf_nc + fname_pattern: + "{file_pattern}" + formats: + - format: tif + writer: geotiff + areas: + null: + products: + chanel_5: {{}} + +workers: + - fun: !!python/name:trollflow2.plugins.create_scene + - fun: !!python/name:trollflow2.plugins.load_composites + - fun: !!python/name:trollflow2.plugins.resample + - fun: !!python/name:trollflow2.plugins.save_datasets +""" + + def test_arg_parsing(): """Test parsing args.""" product_list = "my_product_list.yaml" @@ -105,3 +131,50 @@ def test_cli_starts_processing_when_files_are_provided(tmp_path): with mock.patch("trollflow2.cli.Queue") as q_mock: cli(["-p", os.fspath(product_list_filename), "-m", json.dumps(mda), *files]) new_process.assert_called_once_with(files, mda, product_list_filename, q_mock.return_value) + + +def test_full_chain_cli_is_creating_output_file(tmp_path): + """Test that the full chain cli is creating an output file.""" + start_time = datetime(2022, 2, 2, 11, 22) + end_time = datetime(2022, 2, 2, 11, 23) + attrs = dict(start_time=start_time, platform_name="sat1", sensor="nose", end_time=end_time) + + data_filename = create_cf_data_file(attrs, tmp_path) + + product_list = "my_product_list.yaml" + files = [data_filename] + product_list_filename = os.fspath(tmp_path / product_list) + output_file_pattern = "{start_time:%Y%m%d_%H%M}_{platform_name}_{product}.{format}" + with open(product_list_filename, "w") as fd: + fd.write(yaml_test_load_save.format(file_pattern=output_file_pattern, + output_dir=tmp_path)) + cli(["-p", os.fspath(product_list_filename), "-m", json.dumps(attrs, default=datetime_encoder), *files]) + + attrs.update({"product": "chanel_5", "format": "tif"}) + + expected_filename = tmp_path / compose(output_file_pattern, attrs) + + assert os.path.exists(expected_filename) + + +def create_cf_data_file(attrs, tmp_path): + """Create a data file for satpy to read.""" + import xarray as xr + from satpy import Scene + scn = Scene() + + scn["chanel_5"] = xr.DataArray([[0, 1, 2], [3, 4, 5]], dims=["y", "x"], attrs=attrs) + + data_filepattern = str(tmp_path / "{platform_name}-{sensor}-{start_time:%Y%m%d%H%M%S}-{end_time:%Y%m%d%H%M%S}.nc") + + scn.save_dataset("chanel_5", filename=data_filepattern, writer="cf") + + return compose(data_filepattern, attrs) + + +def datetime_encoder(obj): + """Encode datetimes into iso format.""" + try: + return obj.isoformat() + except AttributeError: + raise TypeError(repr(obj) + " is not JSON serializable") From fcd1b7876d7fa2742330443aeb1a6627b0a68053 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Tue, 22 Oct 2024 14:26:19 +0300 Subject: [PATCH 8/8] Rename variable dct to datetimes --- trollflow2/cli.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/trollflow2/cli.py b/trollflow2/cli.py index c1d7dc30..aca3bc91 100644 --- a/trollflow2/cli.py +++ b/trollflow2/cli.py @@ -55,12 +55,12 @@ def _read_log_config(args): return log_config -def datetime_decoder(dct): +def datetime_decoder(datetimes): """Decode datetimes to python objects.""" - if isinstance(dct, list): - pairs = enumerate(dct) - elif isinstance(dct, dict): - pairs = dct.items() + if isinstance(datetimes, list): + pairs = enumerate(datetimes) + elif isinstance(datetimes, dict): + pairs = datetimes.items() result = [] for key, val in pairs: if isinstance(val, str): @@ -71,7 +71,7 @@ def datetime_decoder(dct): elif isinstance(val, (dict, list)): val = datetime_decoder(val) result.append((key, val)) - if isinstance(dct, list): + if isinstance(datetimes, list): return [x[1] for x in result] - elif isinstance(dct, dict): + elif isinstance(datetimes, dict): return dict(result)