Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix posttroll-based notifier for move it server #195

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions trollmoves/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@
publisher = create_publisher(cmd_args.port, self.name)
super().__init__(cmd_args, publisher=publisher)
self.request_manager = RequestManager
self.function_to_run_on_matching_files = partial(process_notify, publisher=self.publisher)
self.function_to_run_on_matching_files = partial(process_notification, publisher=self.publisher)

def reload_cfg_file(self, filename):
"""Reload configuration file."""
Expand Down Expand Up @@ -481,11 +481,11 @@
class Listener(Thread):
"""A message listener for the server."""

def __init__(self, attrs, publisher):
def __init__(self, function_to_run_on_message, config):
"""Initialize the listener."""
super(Listener, self).__init__()
self.attrs = attrs
self.publisher = publisher
self.attrs = config
self.function_to_run_on_message = function_to_run_on_message
self.loop = True

def run(self):
Expand All @@ -509,27 +509,7 @@
continue
if not _files_in_message_are_local(msg):
break
self._send_message(msg)

def _send_message(self, msg):
LOGGER.debug('We have a match: %s', str(msg))
info = self._collect_message_info(msg)
msg = Message(self.attrs["topic"], msg.type, info)
self.publisher.send(str(msg))
self._add_files_to_cache(msg)
LOGGER.debug("Message sent: %s", str(msg))

def _collect_message_info(self, msg):
info = _collect_attribute_info(self.attrs)
info.update(msg.data)
info['request_address'] = self.attrs.get(
"request_address", get_own_ip()) + ":" + self.attrs["request_port"]
return info

def _add_files_to_cache(self, msg):
with file_cache_lock:
for filename in gen_dict_extract(msg.data, 'uid'):
file_cache.appendleft(self.attrs["topic"] + '/' + filename)
self.function_to_run_on_message(msg)

Check warning on line 512 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L512

Added line #L512 was not covered by tests

def stop(self):
"""Stop the listener."""
Expand Down Expand Up @@ -649,7 +629,7 @@
if key in CONNECTION_CONFIG_ITEMS:
warnings.warn(
f"Consider using connection_parameters__{key} instead of {key}.",
category=UserWarning)
category=UserWarning, stacklevel=2)
res["connection_parameters"][key] = original[key]
del res[key]
return res
Expand Down Expand Up @@ -750,11 +730,8 @@
notifier_builder = _get_notifier_builder(use_polling, self.config)

self.function_to_run = partial(function_to_run_on_matching_files, chain_config=self.config)
pattern = globify(self.config["origin"])
timeout = float(self.config.get("watchdog_timeout", 1.))
LOGGER.debug("Watchdog timeout: %.1f", timeout)

self.notifier = notifier_builder(pattern, self.function_to_run, timeout)
self.notifier = notifier_builder(self.function_to_run)

def start(self):
"""Start the chain."""
Expand Down Expand Up @@ -782,14 +759,17 @@

def _get_notifier_builder(use_polling, chain_config):
if 'origin' in chain_config:
pattern = globify(chain_config["origin"])
timeout = float(chain_config.get("watchdog_timeout", 1.))
LOGGER.debug("Watchdog timeout: %.1f", timeout)
if use_polling:
LOGGER.info("Using Watchdog notifier")
notifier_builder = create_watchdog_polling_notifier
notifier_builder = partial(create_watchdog_polling_notifier, pattern, timeout=timeout)
else:
LOGGER.info("Using os-based notifier")
notifier_builder = create_watchdog_os_notifier
notifier_builder = partial(create_watchdog_os_notifier, pattern, timeout=timeout)
elif 'listen' in chain_config:
notifier_builder = create_posttroll_notifier
notifier_builder = partial(create_posttroll_notifier, config=chain_config)

return notifier_builder

Expand Down Expand Up @@ -819,16 +799,46 @@
return observer


def process_notify(orig_pathname, publisher, chain_config):
def process_notification(notification, publisher, chain_config):
"""Publish what we have."""
if os.stat(orig_pathname).st_size == 0:
LOGGER.debug("Ignoring empty file: %s", orig_pathname)
return
if isinstance(notification, Message):
process_message(chain_config, notification, publisher)

Check warning on line 805 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L805

Added line #L805 was not covered by tests
else:
LOGGER.debug('We have a match: %s', orig_pathname)
process_path(chain_config, notification, publisher)


def process_message(chain_config, msg, publisher):
"""Modify and publish a message."""
LOGGER.debug('We have a match: %s', str(msg))
info = _collect_message_info(msg)
msg = Message(chain_config["topic"], msg.type, info)
publisher.send(str(msg))
_add_files_to_cache(msg, chain_config)
LOGGER.debug("Message sent: %s", str(msg))

Check warning on line 817 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L812-L817

Added lines #L812 - L817 were not covered by tests


pathname = unpack(orig_pathname, **chain_config)
publish_file(orig_pathname, publisher, chain_config, pathname)
def _collect_message_info(msg, config):
info = _collect_attribute_info(config)
info.update(msg.data)
info['request_address'] = config.get(

Check warning on line 823 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L821-L823

Added lines #L821 - L823 were not covered by tests
"request_address", get_own_ip()) + ":" + config["request_port"]
return info

Check warning on line 825 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L825

Added line #L825 was not covered by tests


def _add_files_to_cache(msg, config):
with file_cache_lock:
for filename in gen_dict_extract(msg.data, 'uid'):
file_cache.appendleft(config["topic"] + '/' + filename)

Check warning on line 831 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L829-L831

Added lines #L829 - L831 were not covered by tests


def process_path(chain_config, path, publisher):
"""Create a message and publish a file."""
if os.stat(path).st_size == 0:
LOGGER.debug("Ignoring empty file: %s", path)

Check warning on line 837 in trollmoves/server.py

View check run for this annotation

Codecov / codecov/patch

trollmoves/server.py#L837

Added line #L837 was not covered by tests
else:
LOGGER.debug('We have a match: %s', path)
pathname = unpack(path, **chain_config)
publish_file(path, publisher, chain_config, pathname)


def publish_file(orig_pathname, publisher, attrs, unpacked_pathname):
Expand Down Expand Up @@ -872,9 +882,9 @@
return info


def create_posttroll_notifier(attrs, publisher):
def create_posttroll_notifier(function_to_run, config):
"""Create a notifier listening to posttroll messages from *attrs*."""
listener = Listener(attrs, publisher)
listener = Listener(function_to_run, config)

return listener, None

Expand Down
27 changes: 21 additions & 6 deletions trollmoves/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def test_create_watchdog_notifier_timeout_default(PollingObserver, config, expec
PollingObserver.assert_called_with(timeout=expected_timeout)


def test_create_posttroll_notifier():
"""Test creating a posttroll notifier."""
from trollmoves.server import Chain
config = {"listen": "some_topic"}
chain = Chain("some_chain", config)
function_to_run = MagicMock()
# assert no crash
chain.create_notifier(notifier_builder=None, use_polling=True, function_to_run_on_matching_files=function_to_run)


def test_handler_does_not_dispatch_files_not_matching_pattern():
"""Test that the handle does not dispatch files that are not matching the pattern."""
from trollmoves.server import WatchdogCreationHandler
Expand Down Expand Up @@ -152,23 +162,28 @@ def _run_process_notify(process_notify, publisher):


@patch("trollmoves.server.file_cache", new_callable=deque)
@patch("trollmoves.server.Message")
def test_process_notify_matching_file(Message, file_cache):
def test_process_notify_matching_file(file_cache):
"""Test process_notify() with a file matching the configured pattern."""
from trollmoves.server import process_notify
from posttroll.message import Message

from trollmoves.server import process_notification

publisher = MagicMock()

pathname, fname, kwargs = _run_process_notify(process_notify, publisher)
pathname, fname, kwargs = _run_process_notify(process_notification, publisher)

# Check that the message was formed correctly
message_info = {'start_time': dt.datetime(2020, 4, 28, 10, 0),
'product': 'foo',
'uri': pathname,
'uid': fname,
'request_address': 'localhost:9001'}
Message.assert_called_with(kwargs['topic'], 'file', message_info)
publisher.send.assert_called_with(str(Message.return_value))

message = Message(rawstr=publisher.send.mock_calls[0][1][0])
assert message.subject == kwargs["topic"]
assert message.type == "file"
assert message.data == message_info

assert "/topic/20200428_1000_foo.tif" in file_cache
assert len(file_cache) == 1

Expand Down