diff --git a/trollmoves/server.py b/trollmoves/server.py index eafc769..cf0be32 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -353,7 +353,7 @@ def __init__(self, cmd_args): publisher = create_publisher(cmd_args.port, self.name) super().__init__(cmd_args, publisher=publisher) self.request_manager = RequestManager - self.function_to_run_on_matching_files = partial(process_notify, publisher=self.publisher) + self.function_to_run_on_matching_files = partial(process_notification, publisher=self.publisher) def reload_cfg_file(self, filename): """Reload configuration file.""" @@ -481,11 +481,11 @@ def _sanitize_message_destination(message): class Listener(Thread): """A message listener for the server.""" - def __init__(self, attrs, publisher): + def __init__(self, function_to_run_on_message, config): """Initialize the listener.""" super(Listener, self).__init__() - self.attrs = attrs - self.publisher = publisher + self.attrs = config + self.function_to_run_on_message = function_to_run_on_message self.loop = True def run(self): @@ -509,27 +509,7 @@ def _run(self, sub): continue if not _files_in_message_are_local(msg): break - self._send_message(msg) - - def _send_message(self, msg): - LOGGER.debug('We have a match: %s', str(msg)) - info = self._collect_message_info(msg) - msg = Message(self.attrs["topic"], msg.type, info) - self.publisher.send(str(msg)) - self._add_files_to_cache(msg) - LOGGER.debug("Message sent: %s", str(msg)) - - def _collect_message_info(self, msg): - info = _collect_attribute_info(self.attrs) - info.update(msg.data) - info['request_address'] = self.attrs.get( - "request_address", get_own_ip()) + ":" + self.attrs["request_port"] - return info - - def _add_files_to_cache(self, msg): - with file_cache_lock: - for filename in gen_dict_extract(msg.data, 'uid'): - file_cache.appendleft(self.attrs["topic"] + '/' + filename) + self.function_to_run_on_message(msg) def stop(self): """Stop the listener.""" @@ -649,7 +629,7 @@ def _form_connection_parameters_dict(original): if key in CONNECTION_CONFIG_ITEMS: warnings.warn( f"Consider using connection_parameters__{key} instead of {key}.", - category=UserWarning) + category=UserWarning, stacklevel=2) res["connection_parameters"][key] = original[key] del res[key] return res @@ -750,11 +730,8 @@ def create_notifier(self, notifier_builder, use_polling, function_to_run_on_matc notifier_builder = _get_notifier_builder(use_polling, self.config) self.function_to_run = partial(function_to_run_on_matching_files, chain_config=self.config) - pattern = globify(self.config["origin"]) - timeout = float(self.config.get("watchdog_timeout", 1.)) - LOGGER.debug("Watchdog timeout: %.1f", timeout) - self.notifier = notifier_builder(pattern, self.function_to_run, timeout) + self.notifier = notifier_builder(self.function_to_run) def start(self): """Start the chain.""" @@ -782,14 +759,17 @@ def _add_chain(chains, chain_name, chain_config, manager): def _get_notifier_builder(use_polling, chain_config): if 'origin' in chain_config: + pattern = globify(chain_config["origin"]) + timeout = float(chain_config.get("watchdog_timeout", 1.)) + LOGGER.debug("Watchdog timeout: %.1f", timeout) if use_polling: LOGGER.info("Using Watchdog notifier") - notifier_builder = create_watchdog_polling_notifier + notifier_builder = partial(create_watchdog_polling_notifier, pattern, timeout=timeout) else: LOGGER.info("Using os-based notifier") - notifier_builder = create_watchdog_os_notifier + notifier_builder = partial(create_watchdog_os_notifier, pattern, timeout=timeout) elif 'listen' in chain_config: - notifier_builder = create_posttroll_notifier + notifier_builder = partial(create_posttroll_notifier, config=chain_config) return notifier_builder @@ -819,16 +799,46 @@ def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observe return observer -def process_notify(orig_pathname, publisher, chain_config): +def process_notification(notification, publisher, chain_config): """Publish what we have.""" - if os.stat(orig_pathname).st_size == 0: - LOGGER.debug("Ignoring empty file: %s", orig_pathname) - return + if isinstance(notification, Message): + process_message(chain_config, notification, publisher) else: - LOGGER.debug('We have a match: %s', orig_pathname) + process_path(chain_config, notification, publisher) + + +def process_message(chain_config, msg, publisher): + """Modify and publish a message.""" + LOGGER.debug('We have a match: %s', str(msg)) + info = _collect_message_info(msg) + msg = Message(chain_config["topic"], msg.type, info) + publisher.send(str(msg)) + _add_files_to_cache(msg, chain_config) + LOGGER.debug("Message sent: %s", str(msg)) + - pathname = unpack(orig_pathname, **chain_config) - publish_file(orig_pathname, publisher, chain_config, pathname) +def _collect_message_info(msg, config): + info = _collect_attribute_info(config) + info.update(msg.data) + info['request_address'] = config.get( + "request_address", get_own_ip()) + ":" + config["request_port"] + return info + + +def _add_files_to_cache(msg, config): + with file_cache_lock: + for filename in gen_dict_extract(msg.data, 'uid'): + file_cache.appendleft(config["topic"] + '/' + filename) + + +def process_path(chain_config, path, publisher): + """Create a message and publish a file.""" + if os.stat(path).st_size == 0: + LOGGER.debug("Ignoring empty file: %s", path) + else: + LOGGER.debug('We have a match: %s', path) + pathname = unpack(path, **chain_config) + publish_file(path, publisher, chain_config, pathname) def publish_file(orig_pathname, publisher, attrs, unpacked_pathname): @@ -872,9 +882,9 @@ def _get_notify_message_info(attrs, orig_pathname, pathname): return info -def create_posttroll_notifier(attrs, publisher): +def create_posttroll_notifier(function_to_run, config): """Create a notifier listening to posttroll messages from *attrs*.""" - listener = Listener(attrs, publisher) + listener = Listener(function_to_run, config) return listener, None diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index 9035708..bff61c7 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -118,6 +118,16 @@ def test_create_watchdog_notifier_timeout_default(PollingObserver, config, expec PollingObserver.assert_called_with(timeout=expected_timeout) +def test_create_posttroll_notifier(): + """Test creating a posttroll notifier.""" + from trollmoves.server import Chain + config = {"listen": "some_topic"} + chain = Chain("some_chain", config) + function_to_run = MagicMock() + # assert no crash + chain.create_notifier(notifier_builder=None, use_polling=True, function_to_run_on_matching_files=function_to_run) + + def test_handler_does_not_dispatch_files_not_matching_pattern(): """Test that the handle does not dispatch files that are not matching the pattern.""" from trollmoves.server import WatchdogCreationHandler @@ -152,14 +162,15 @@ def _run_process_notify(process_notify, publisher): @patch("trollmoves.server.file_cache", new_callable=deque) -@patch("trollmoves.server.Message") -def test_process_notify_matching_file(Message, file_cache): +def test_process_notify_matching_file(file_cache): """Test process_notify() with a file matching the configured pattern.""" - from trollmoves.server import process_notify + from posttroll.message import Message + + from trollmoves.server import process_notification publisher = MagicMock() - pathname, fname, kwargs = _run_process_notify(process_notify, publisher) + pathname, fname, kwargs = _run_process_notify(process_notification, publisher) # Check that the message was formed correctly message_info = {'start_time': dt.datetime(2020, 4, 28, 10, 0), @@ -167,8 +178,12 @@ def test_process_notify_matching_file(Message, file_cache): 'uri': pathname, 'uid': fname, 'request_address': 'localhost:9001'} - Message.assert_called_with(kwargs['topic'], 'file', message_info) - publisher.send.assert_called_with(str(Message.return_value)) + + message = Message(rawstr=publisher.send.mock_calls[0][1][0]) + assert message.subject == kwargs["topic"] + assert message.type == "file" + assert message.data == message_info + assert "/topic/20200428_1000_foo.tif" in file_cache assert len(file_cache) == 1