Skip to content

Commit

Permalink
Merge branch 'main' into fix-utcnow
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud authored May 27, 2024
2 parents 2834106 + 00f0750 commit 80716be
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 47 deletions.
96 changes: 55 additions & 41 deletions trollmoves/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def __init__(self, cmd_args):
publisher = create_publisher(cmd_args.port, self.name)
super().__init__(cmd_args, publisher=publisher)
self.request_manager = RequestManager
self.function_to_run_on_matching_files = partial(process_notify, publisher=self.publisher)
self.function_to_run_on_matching_files = partial(process_notification, publisher=self.publisher)

def reload_cfg_file(self, filename):
"""Reload configuration file."""
Expand Down Expand Up @@ -481,11 +481,11 @@ def _sanitize_message_destination(message):
class Listener(Thread):
"""A message listener for the server."""

def __init__(self, attrs, publisher):
def __init__(self, function_to_run_on_message, config):
"""Initialize the listener."""
super(Listener, self).__init__()
self.attrs = attrs
self.publisher = publisher
self.attrs = config
self.function_to_run_on_message = function_to_run_on_message
self.loop = True

def run(self):
Expand All @@ -509,27 +509,7 @@ def _run(self, sub):
continue
if not _files_in_message_are_local(msg):
break
self._send_message(msg)

def _send_message(self, msg):
LOGGER.debug('We have a match: %s', str(msg))
info = self._collect_message_info(msg)
msg = Message(self.attrs["topic"], msg.type, info)
self.publisher.send(str(msg))
self._add_files_to_cache(msg)
LOGGER.debug("Message sent: %s", str(msg))

def _collect_message_info(self, msg):
info = _collect_attribute_info(self.attrs)
info.update(msg.data)
info['request_address'] = self.attrs.get(
"request_address", get_own_ip()) + ":" + self.attrs["request_port"]
return info

def _add_files_to_cache(self, msg):
with file_cache_lock:
for filename in gen_dict_extract(msg.data, 'uid'):
file_cache.appendleft(self.attrs["topic"] + '/' + filename)
self.function_to_run_on_message(msg)

def stop(self):
"""Stop the listener."""
Expand Down Expand Up @@ -649,8 +629,12 @@ def _form_connection_parameters_dict(original):
if key in CONNECTION_CONFIG_ITEMS:
warnings.warn(
f"Consider using connection_parameters__{key} instead of {key}.",
<<<<<<< fix-utcnow
category=UserWarning,
stacklevel=2)
=======
category=UserWarning, stacklevel=2)
>>>>>>> main
res["connection_parameters"][key] = original[key]
del res[key]
return res
Expand Down Expand Up @@ -751,11 +735,8 @@ def create_notifier(self, notifier_builder, use_polling, function_to_run_on_matc
notifier_builder = _get_notifier_builder(use_polling, self.config)

self.function_to_run = partial(function_to_run_on_matching_files, chain_config=self.config)
pattern = globify(self.config["origin"])
timeout = float(self.config.get("watchdog_timeout", 1.))
LOGGER.debug("Watchdog timeout: %.1f", timeout)

self.notifier = notifier_builder(pattern, self.function_to_run, timeout)
self.notifier = notifier_builder(self.function_to_run)

def start(self):
"""Start the chain."""
Expand Down Expand Up @@ -783,14 +764,17 @@ def _add_chain(chains, chain_name, chain_config, manager):

def _get_notifier_builder(use_polling, chain_config):
if 'origin' in chain_config:
pattern = globify(chain_config["origin"])
timeout = float(chain_config.get("watchdog_timeout", 1.))
LOGGER.debug("Watchdog timeout: %.1f", timeout)
if use_polling:
LOGGER.info("Using Watchdog notifier")
notifier_builder = create_watchdog_polling_notifier
notifier_builder = partial(create_watchdog_polling_notifier, pattern, timeout=timeout)
else:
LOGGER.info("Using os-based notifier")
notifier_builder = create_watchdog_os_notifier
notifier_builder = partial(create_watchdog_os_notifier, pattern, timeout=timeout)
elif 'listen' in chain_config:
notifier_builder = create_posttroll_notifier
notifier_builder = partial(create_posttroll_notifier, config=chain_config)

return notifier_builder

Expand Down Expand Up @@ -820,16 +804,46 @@ def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observe
return observer


def process_notify(orig_pathname, publisher, chain_config):
def process_notification(notification, publisher, chain_config):
"""Publish what we have."""
if os.stat(orig_pathname).st_size == 0:
LOGGER.debug("Ignoring empty file: %s", orig_pathname)
return
if isinstance(notification, Message):
process_message(chain_config, notification, publisher)
else:
LOGGER.debug('We have a match: %s', orig_pathname)
process_path(chain_config, notification, publisher)


def process_message(chain_config, msg, publisher):
"""Modify and publish a message."""
LOGGER.debug('We have a match: %s', str(msg))
info = _collect_message_info(msg)
msg = Message(chain_config["topic"], msg.type, info)
publisher.send(str(msg))
_add_files_to_cache(msg, chain_config)
LOGGER.debug("Message sent: %s", str(msg))


pathname = unpack(orig_pathname, **chain_config)
publish_file(orig_pathname, publisher, chain_config, pathname)
def _collect_message_info(msg, config):
info = _collect_attribute_info(config)
info.update(msg.data)
info['request_address'] = config.get(
"request_address", get_own_ip()) + ":" + config["request_port"]
return info


def _add_files_to_cache(msg, config):
with file_cache_lock:
for filename in gen_dict_extract(msg.data, 'uid'):
file_cache.appendleft(config["topic"] + '/' + filename)


def process_path(chain_config, path, publisher):
"""Create a message and publish a file."""
if os.stat(path).st_size == 0:
LOGGER.debug("Ignoring empty file: %s", path)
else:
LOGGER.debug('We have a match: %s', path)
pathname = unpack(path, **chain_config)
publish_file(path, publisher, chain_config, pathname)


def publish_file(orig_pathname, publisher, attrs, unpacked_pathname):
Expand Down Expand Up @@ -873,9 +887,9 @@ def _get_notify_message_info(attrs, orig_pathname, pathname):
return info


def create_posttroll_notifier(attrs, publisher):
def create_posttroll_notifier(function_to_run, config):
"""Create a notifier listening to posttroll messages from *attrs*."""
listener = Listener(attrs, publisher)
listener = Listener(function_to_run, config)

return listener, None

Expand Down
27 changes: 21 additions & 6 deletions trollmoves/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def test_create_watchdog_notifier_timeout_default(PollingObserver, config, expec
PollingObserver.assert_called_with(timeout=expected_timeout)


def test_create_posttroll_notifier():
"""Test creating a posttroll notifier."""
from trollmoves.server import Chain
config = {"listen": "some_topic"}
chain = Chain("some_chain", config)
function_to_run = MagicMock()
# assert no crash
chain.create_notifier(notifier_builder=None, use_polling=True, function_to_run_on_matching_files=function_to_run)


def test_handler_does_not_dispatch_files_not_matching_pattern():
"""Test that the handle does not dispatch files that are not matching the pattern."""
from trollmoves.server import WatchdogCreationHandler
Expand Down Expand Up @@ -152,23 +162,28 @@ def _run_process_notify(process_notify, publisher):


@patch("trollmoves.server.file_cache", new_callable=deque)
@patch("trollmoves.server.Message")
def test_process_notify_matching_file(Message, file_cache):
def test_process_notify_matching_file(file_cache):
"""Test process_notify() with a file matching the configured pattern."""
from trollmoves.server import process_notify
from posttroll.message import Message

from trollmoves.server import process_notification

publisher = MagicMock()

pathname, fname, kwargs = _run_process_notify(process_notify, publisher)
pathname, fname, kwargs = _run_process_notify(process_notification, publisher)

# Check that the message was formed correctly
message_info = {'start_time': dt.datetime(2020, 4, 28, 10, 0),
'product': 'foo',
'uri': pathname,
'uid': fname,
'request_address': 'localhost:9001'}
Message.assert_called_with(kwargs['topic'], 'file', message_info)
publisher.send.assert_called_with(str(Message.return_value))

message = Message(rawstr=publisher.send.mock_calls[0][1][0])
assert message.subject == kwargs["topic"]
assert message.type == "file"
assert message.data == message_info

assert "/topic/20200428_1000_foo.tif" in file_cache
assert len(file_cache) == 1

Expand Down

0 comments on commit 80716be

Please sign in to comment.