From dc6ed6799ead7f0773efe4d1d7288fafd0806e3a Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 16 Feb 2024 08:50:48 +0200 Subject: [PATCH 1/5] Pass full config from dispatcher, get connection parameters in S3Mover --- trollmoves/dispatcher.py | 3 +-- trollmoves/movers.py | 3 ++- trollmoves/tests/test_dispatcher.py | 6 +++--- trollmoves/tests/test_movers.py | 12 ++++++------ 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/trollmoves/dispatcher.py b/trollmoves/dispatcher.py index 74cf3402..467c5909 100644 --- a/trollmoves/dispatcher.py +++ b/trollmoves/dispatcher.py @@ -277,7 +277,6 @@ def create_dest_url(self, msg, client, conf): config = self.config[client].copy() _verify_filepattern(config, msg) config.update(conf) - connection_parameters = config.get('connection_parameters') host = config['host'] @@ -289,7 +288,7 @@ def create_dest_url(self, msg, client, conf): metadata) parts = urlsplit(host) host_path = urlunsplit((parts.scheme, parts.netloc, path, parts.query, parts.fragment)) - return host_path, connection_parameters, client + return host_path, config, client def _get_file_messages_from_dataset_message(self, msg): """From a dataset type message create individual messages for each file in the dataset.""" diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 6c91c871..33e8821b 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -489,7 +489,8 @@ def copy(self): """Copy the file to a bucket.""" if S3FileSystem is None: raise ImportError("S3Mover requires 's3fs' to be installed.") - s3 = S3FileSystem(**self.attrs) + connection_parameters = self.attrs.get("connection_parameters", {}) + s3 = S3FileSystem(**connection_parameters) destination_file_path = self._get_destination() LOGGER.debug('destination_file_path = %s', destination_file_path) _create_s3_destination_path(s3, destination_file_path) diff --git a/trollmoves/tests/test_dispatcher.py b/trollmoves/tests/test_dispatcher.py index 6eec8216..a961a986 100644 --- a/trollmoves/tests/test_dispatcher.py +++ b/trollmoves/tests/test_dispatcher.py @@ -454,7 +454,7 @@ def test_get_destinations_single_destination(viirs_green_snow_message, dispatche assert len(res) == 1 url, attrs, client = res[0] assert url == expected_url - assert attrs == expected_attrs + assert attrs["connection_parameters"] == expected_attrs assert client == "target1" @@ -481,7 +481,7 @@ def _assert_get_destinations_res(res, expected_length, expected_url, expected_at assert len(res) == expected_length for i, (url, attrs, client) in enumerate(res): assert url == expected_url[i] - assert attrs == expected_attrs[i] + assert attrs["connection_parameters"] == expected_attrs[i] assert client == expected_client[i] @@ -684,7 +684,7 @@ def test_create_dest_url_ssh_no_username(create_dest_url_message): expected_url = "ssh://server.target2.com/satellite/viirs/sat_201909190919_NOAA-20.tif" assert url == expected_url - assert params == {'ssh_key_filename': '~/.ssh/rsa_id.pub'} + assert params["connection_parameters"] == {'ssh_key_filename': '~/.ssh/rsa_id.pub'} assert client == "target2" finally: if dispatcher is not None: diff --git a/trollmoves/tests/test_movers.py b/trollmoves/tests/test_movers.py index 49d511a1..5440a7a0 100644 --- a/trollmoves/tests/test_movers.py +++ b/trollmoves/tests/test_movers.py @@ -173,14 +173,14 @@ def test_s3_copy_file_to_base_using_connection_parameters(S3FileSystem): """Test copying to base of S3 bucket.""" # Get the connection parameters: config = yaml.safe_load(test_yaml_s3_connection_params) - attrs = config['target-s3-example1']['connection_parameters'] + attrs = config['target-s3-example1'] s3_mover = _get_s3_mover(ORIGIN, "s3://data-bucket/", **attrs) - assert s3_mover.attrs['client_kwargs'] == {'endpoint_url': 'https://minio-server.mydomain.se:9000', - 'verify': False} - assert s3_mover.attrs['secret'] == 'my-super-secret-key' - assert s3_mover.attrs['key'] == 'my-access-key' - assert s3_mover.attrs['use_ssl'] is True + assert s3_mover.attrs['connection_parameters']['client_kwargs'] == { + 'endpoint_url': 'https://minio-server.mydomain.se:9000', 'verify': False} + assert s3_mover.attrs['connection_parameters']['secret'] == 'my-super-secret-key' + assert s3_mover.attrs['connection_parameters']['key'] == 'my-access-key' + assert s3_mover.attrs['connection_parameters']['use_ssl'] is True s3_mover.copy() From 85ea67f9b92b7ab533370847ceb1770cd5fd9978 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 16 Feb 2024 11:04:33 +0200 Subject: [PATCH 2/5] Add dict config item handling to Server --- trollmoves/movers.py | 21 +++++++++++++++++++++ trollmoves/server.py | 29 +++++++++++++++++++++++++++++ trollmoves/tests/test_server.py | 31 +++++++++++++++++++++++++++---- 3 files changed, 77 insertions(+), 4 deletions(-) diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 33e8821b..7ca70a88 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -483,6 +483,27 @@ class S3Mover(Mover): changing the filename. The new destination filename will be the last part of the provided destination following the last slash ('/'). + In the Trollmoves Server config, which is in .ini format, the connection parameters + and other dictionary-like items can be defined with douple underscore format:: + + connection_parameters__secret = secret + connection_parameters__client_kwargs__endpoint_url = https://endpoint.url + connection_parameters__client_kwargs__verify = false + + will result in a nested dictionary item:: + + { + 'connection_parameters': { + 'secret': 'secret', + 'client_kwargs': { + 'endpoint_url': 'https://endpoint.url', + 'verify': False + } + } + } + + Note that boolean values are converted. No handling for numeric values have been implemented! + """ def copy(self): diff --git a/trollmoves/server.py b/trollmoves/server.py index 2d65a21b..f5b53580 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -584,6 +584,7 @@ def _read_ini_config(filename): _parse_nameserver(res[section], cp_[section]) _parse_addresses(res[section]) _parse_delete(res[section], cp_[section]) + res[section] = _create_config_sub_dicts(res[section]) if not _check_origin_and_listen(res, section): continue if not _check_topic(res, section): @@ -622,6 +623,34 @@ def _parse_delete(conf, raw_conf): conf["delete"] = val +def _create_config_sub_dicts(original): + # Take a copy so we can modify the values if necessary + res = dict(original.items()) + for key in original.keys(): + parts = key.split("__") + if len(parts) > 1: + _create_dicts(res, parts, original[key]) + del res[key] + return res + + +def _create_dicts(res, parts, val): + cur = res + for part in parts[:-1]: + if part not in cur: + cur[part] = {} + cur = cur[part] + cur[parts[-1]] = _check_bool(val) + + +def _check_bool(val): + if val.lower() in ["0", "false"]: + return False + elif val.lower() in ["1", "true"]: + return True + return val + + def _check_origin_and_listen(res, section): if ("origin" not in res[section]) and ('listen' not in res[section]): LOGGER.warning("Incomplete section %s: add an 'origin' or 'listen' item.", section) diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index e41a12af..5974bc4a 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -170,7 +170,7 @@ def test_empty_init_arguments_does_not_crash_add(self): Deleter(dict()).add('bla') -config_file = b""" +CONFIG_INI = b""" [eumetcast-hrit-0deg] origin = /local_disk/tellicast/received/MSGHRIT/H-000-{nominal_time:%Y%m%d%H%M}-{compressed:_<2s} request_port = 9094 @@ -178,9 +178,32 @@ def test_empty_init_arguments_does_not_crash_add(self): info = sensor=seviri;variant=0DEG topic = /1b/hrit-segment/0deg delete = False +connection_parameters__secret = secret +connection_parameters__client_kwargs__endpoint_url = https://endpoint.url +connection_parameters__client_kwargs__verify = false """ +def test_read_config_ini_with_dicts(): + """Test reading a config in ini format when dictionary values should be created.""" + from trollmoves.server import read_config + + with NamedTemporaryFile(suffix=".ini") as config_file: + config_file.write(CONFIG_INI) + config_file.flush() + config = read_config(config_file.name) + eumetcast = config["eumetcast-hrit-0deg"] + assert "origin" in eumetcast + assert "request_port" in eumetcast + assert "publisher_port" in eumetcast + assert "info" in eumetcast + assert "topic" in eumetcast + assert "delete" in eumetcast + assert eumetcast["connection_parameters"]["secret"] == "secret" + assert eumetcast["connection_parameters"]["client_kwargs"]["endpoint_url"] == "https://endpoint.url" + assert eumetcast["connection_parameters"]["client_kwargs"]["verify"] is False + + class TestMoveItServer: """Test the move it server.""" @@ -195,7 +218,7 @@ def test_reloads_config_crashes_when_config_file_does_not_exist(self): def test_reloads_config_on_example_config(self, fake_publisher): """Test that config can be reloaded with basic example.""" with NamedTemporaryFile() as temporary_config_file: - temporary_config_file.write(config_file) + temporary_config_file.write(CONFIG_INI) config_filename = temporary_config_file.name cmd_args = parse_args(["--port", "9999", config_filename]) server = MoveItServer(cmd_args) @@ -206,7 +229,7 @@ def test_reloads_config_on_example_config(self, fake_publisher): def test_reloads_config_calls_reload_config(self, mock_reload_config, mock_publisher): """Test that config file can be reloaded.""" with NamedTemporaryFile() as temporary_config_file: - temporary_config_file.write(config_file) + temporary_config_file.write(CONFIG_INI) config_filename = temporary_config_file.name cmd_args = parse_args(["--port", "9999", config_filename]) server = MoveItServer(cmd_args) @@ -218,7 +241,7 @@ def test_reloads_config_calls_reload_config(self, mock_reload_config, mock_publi def test_signal_reloads_config_calls_reload_config(self, mock_reload_config, mock_publisher): """Test that config file can be reloaded through signal.""" with NamedTemporaryFile() as temporary_config_file: - temporary_config_file.write(config_file) + temporary_config_file.write(CONFIG_INI) config_filename = temporary_config_file.name cmd_args = parse_args([config_filename]) client = MoveItServer(cmd_args) From b0570dcde2960a6d0ebb9a1a3d156e5de2795d97 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 16 Feb 2024 14:28:15 +0200 Subject: [PATCH 3/5] Pass only connection parameters to movers --- trollmoves/dispatcher.py | 3 ++- trollmoves/movers.py | 5 ++--- trollmoves/server.py | 18 +++++++++++++++++- trollmoves/tests/test_dispatcher.py | 6 +++--- trollmoves/tests/test_server.py | 20 +++++++++++++++++--- 5 files changed, 41 insertions(+), 11 deletions(-) diff --git a/trollmoves/dispatcher.py b/trollmoves/dispatcher.py index 467c5909..74cf3402 100644 --- a/trollmoves/dispatcher.py +++ b/trollmoves/dispatcher.py @@ -277,6 +277,7 @@ def create_dest_url(self, msg, client, conf): config = self.config[client].copy() _verify_filepattern(config, msg) config.update(conf) + connection_parameters = config.get('connection_parameters') host = config['host'] @@ -288,7 +289,7 @@ def create_dest_url(self, msg, client, conf): metadata) parts = urlsplit(host) host_path = urlunsplit((parts.scheme, parts.netloc, path, parts.query, parts.fragment)) - return host_path, config, client + return host_path, connection_parameters, client def _get_file_messages_from_dataset_message(self, msg): """From a dataset type message create individual messages for each file in the dataset.""" diff --git a/trollmoves/movers.py b/trollmoves/movers.py index 7ca70a88..ce97fd8f 100644 --- a/trollmoves/movers.py +++ b/trollmoves/movers.py @@ -502,7 +502,7 @@ class S3Mover(Mover): } } - Note that boolean values are converted. No handling for numeric values have been implemented! + Note that boolean values are converted. Numeric values are handled where they are used. """ @@ -510,8 +510,7 @@ def copy(self): """Copy the file to a bucket.""" if S3FileSystem is None: raise ImportError("S3Mover requires 's3fs' to be installed.") - connection_parameters = self.attrs.get("connection_parameters", {}) - s3 = S3FileSystem(**connection_parameters) + s3 = S3FileSystem(**self.attrs) destination_file_path = self._get_destination() LOGGER.debug('destination_file_path = %s', destination_file_path) _create_s3_destination_path(s3, destination_file_path) diff --git a/trollmoves/server.py b/trollmoves/server.py index f5b53580..94930289 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -64,6 +64,8 @@ file_cache_lock = Lock() START_TIME = datetime.datetime.utcnow() +CONNECTION_CONFIG_ITEMS = ["connection_uptime", "ssh_key_filename", "ssh_connection_timeout", "ssh_private_key_file"] + class RequestManager(Thread): """Manage requests.""" @@ -165,7 +167,8 @@ def _move_file(self, pathname, message, rel_path): return_message = None try: destination = move_it(pathname, message.data['destination'], - self._attrs, rel_path=rel_path, + self._attrs["connection_parameters"], + rel_path=rel_path, backup_targets=message.data.get('backup_targets', None)) message.data['destination'] = destination except Exception as err: @@ -585,6 +588,7 @@ def _read_ini_config(filename): _parse_addresses(res[section]) _parse_delete(res[section], cp_[section]) res[section] = _create_config_sub_dicts(res[section]) + res[section] = _form_connection_parameters_dict(res[section]) if not _check_origin_and_listen(res, section): continue if not _check_topic(res, section): @@ -651,6 +655,18 @@ def _check_bool(val): return val +def _form_connection_parameters_dict(original): + # Take a copy so we can modify the values if necessary + res = dict(original.items()) + if "connection_parameters" not in res: + res["connection_parameters"] = {} + for key in original.keys(): + if key in CONNECTION_CONFIG_ITEMS: + res["connection_parameters"][key] = original[key] + del res[key] + return res + + def _check_origin_and_listen(res, section): if ("origin" not in res[section]) and ('listen' not in res[section]): LOGGER.warning("Incomplete section %s: add an 'origin' or 'listen' item.", section) diff --git a/trollmoves/tests/test_dispatcher.py b/trollmoves/tests/test_dispatcher.py index a961a986..6eec8216 100644 --- a/trollmoves/tests/test_dispatcher.py +++ b/trollmoves/tests/test_dispatcher.py @@ -454,7 +454,7 @@ def test_get_destinations_single_destination(viirs_green_snow_message, dispatche assert len(res) == 1 url, attrs, client = res[0] assert url == expected_url - assert attrs["connection_parameters"] == expected_attrs + assert attrs == expected_attrs assert client == "target1" @@ -481,7 +481,7 @@ def _assert_get_destinations_res(res, expected_length, expected_url, expected_at assert len(res) == expected_length for i, (url, attrs, client) in enumerate(res): assert url == expected_url[i] - assert attrs["connection_parameters"] == expected_attrs[i] + assert attrs == expected_attrs[i] assert client == expected_client[i] @@ -684,7 +684,7 @@ def test_create_dest_url_ssh_no_username(create_dest_url_message): expected_url = "ssh://server.target2.com/satellite/viirs/sat_201909190919_NOAA-20.tif" assert url == expected_url - assert params["connection_parameters"] == {'ssh_key_filename': '~/.ssh/rsa_id.pub'} + assert params == {'ssh_key_filename': '~/.ssh/rsa_id.pub'} assert client == "target2" finally: if dispatcher is not None: diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index 5974bc4a..7018ec13 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -178,6 +178,11 @@ def test_empty_init_arguments_does_not_crash_add(self): info = sensor=seviri;variant=0DEG topic = /1b/hrit-segment/0deg delete = False +# Everything below this should end up in connection_parameters dict +connection_uptime = 30 +ssh_key_filename = id_rsa.pub +ssh_private_key_file = id_rsa +ssh_connection_timeout = 30 connection_parameters__secret = secret connection_parameters__client_kwargs__endpoint_url = https://endpoint.url connection_parameters__client_kwargs__verify = false @@ -199,9 +204,18 @@ def test_read_config_ini_with_dicts(): assert "info" in eumetcast assert "topic" in eumetcast assert "delete" in eumetcast - assert eumetcast["connection_parameters"]["secret"] == "secret" - assert eumetcast["connection_parameters"]["client_kwargs"]["endpoint_url"] == "https://endpoint.url" - assert eumetcast["connection_parameters"]["client_kwargs"]["verify"] is False + expected_conn_params = { + "secret": "secret", + "client_kwargs": { + "endpoint_url": "https://endpoint.url", + "verify": False, + }, + "connection_uptime": "30", + "ssh_key_filename": "id_rsa.pub", + "ssh_private_key_file": "id_rsa", + "ssh_connection_timeout": "30", + } + assert eumetcast["connection_parameters"] == expected_conn_params class TestMoveItServer: From 53a4bc34a390a1a980adbefc19c17b03e5e9fa4a Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 16 Feb 2024 14:51:56 +0200 Subject: [PATCH 4/5] Pass only connection parameters in S3Mover test --- trollmoves/tests/test_movers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/trollmoves/tests/test_movers.py b/trollmoves/tests/test_movers.py index 5440a7a0..2ebf28e2 100644 --- a/trollmoves/tests/test_movers.py +++ b/trollmoves/tests/test_movers.py @@ -173,14 +173,14 @@ def test_s3_copy_file_to_base_using_connection_parameters(S3FileSystem): """Test copying to base of S3 bucket.""" # Get the connection parameters: config = yaml.safe_load(test_yaml_s3_connection_params) - attrs = config['target-s3-example1'] + attrs = config['target-s3-example1']['connection_parameters'] s3_mover = _get_s3_mover(ORIGIN, "s3://data-bucket/", **attrs) - assert s3_mover.attrs['connection_parameters']['client_kwargs'] == { + assert s3_mover.attrs['client_kwargs'] == { 'endpoint_url': 'https://minio-server.mydomain.se:9000', 'verify': False} - assert s3_mover.attrs['connection_parameters']['secret'] == 'my-super-secret-key' - assert s3_mover.attrs['connection_parameters']['key'] == 'my-access-key' - assert s3_mover.attrs['connection_parameters']['use_ssl'] is True + assert s3_mover.attrs['secret'] == 'my-super-secret-key' + assert s3_mover.attrs['key'] == 'my-access-key' + assert s3_mover.attrs['use_ssl'] is True s3_mover.copy() From e158f3a1fc2fc7a15a6e3872cd0ca45589b90080 Mon Sep 17 00:00:00 2001 From: Panu Lahtinen Date: Fri, 16 Feb 2024 15:04:52 +0200 Subject: [PATCH 5/5] Warn if any of the connection options are not prefixed with connection_parameters__ --- examples/move_it_server.ini | 6 ++++-- trollmoves/server.py | 4 ++++ trollmoves/tests/test_server.py | 3 ++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/examples/move_it_server.ini b/examples/move_it_server.ini index 4d94b6f4..10577780 100644 --- a/examples/move_it_server.ini +++ b/examples/move_it_server.ini @@ -6,7 +6,8 @@ # The values defined in individual sections will override these settings. # Path to SSH _private_ key for key-based identification for ssh transfers -ssh_key_filename = /home/username/.ssh/id_rsa +# Put this in connection_parameters dictionary by adding the prefix +connection_parameters__ssh_key_filename = /home/username/.ssh/id_rsa # Set watchdog polling timeout (interval) in seconds. # Only effective if "-w" commandline argument is given @@ -34,7 +35,8 @@ topic = /1b/hrit-segment/0deg # Do not delete the compressed file delete = False # Path to SSH key _private_ key used for transfers -# ssh_key_filename = /home/user/.ssh/id_rsa +# Put this in connection_parameters dictionary by adding the prefix +# connection_parameters__ssh_key_filename = /home/user/.ssh/id_rsa [aapp-data-ears-pps-process] diff --git a/trollmoves/server.py b/trollmoves/server.py index 94930289..d38b0d2f 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -32,6 +32,7 @@ import subprocess import tempfile import time +import warnings from collections import deque from threading import Lock, Thread from configparser import ConfigParser @@ -662,6 +663,9 @@ def _form_connection_parameters_dict(original): res["connection_parameters"] = {} for key in original.keys(): if key in CONNECTION_CONFIG_ITEMS: + warnings.warn( + f"Consider using connection_parameters__{key} instead of {key}.", + category=UserWarning) res["connection_parameters"][key] = original[key] del res[key] return res diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index 7018ec13..cac0a6b2 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -196,7 +196,8 @@ def test_read_config_ini_with_dicts(): with NamedTemporaryFile(suffix=".ini") as config_file: config_file.write(CONFIG_INI) config_file.flush() - config = read_config(config_file.name) + with pytest.warns(UserWarning, match="Consider using connection_parameters__"): + config = read_config(config_file.name) eumetcast = config["eumetcast-hrit-0deg"] assert "origin" in eumetcast assert "request_port" in eumetcast