diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2cdf54a9..a5523086 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,8 +1,13 @@ exclude: '^$' fail_fast: false repos: -- repo: https://github.com/pre-commit/pre-commit-hooks + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v2.2.3 hooks: - id: flake8 additional_dependencies: [flake8-docstrings, flake8-debugger, flake8-bugbear, flake8-rst-docstrings] + - repo: https://github.com/pycqa/isort + rev: 5.12.0 + hooks: + - id: isort + language_version: python3 \ No newline at end of file diff --git a/bin/move_it.py b/bin/move_it.py index 4319a5bc..f71d36c2 100644 --- a/bin/move_it.py +++ b/bin/move_it.py @@ -123,459 +123,31 @@ executed tree seconds after the list of file is gathered. """ -import fnmatch -import glob import logging import logging.handlers -import os -import subprocess -import sys -import time -from configparser import RawConfigParser -from urllib.parse import urlparse, urlunparse -import argparse -import signal - -import bz2 -import pyinotify -from trollsift import globify, parse -# messaging is optional -try: - from posttroll.publisher import NoisyPublisher - from posttroll.message import Message -except ImportError: - print("\nNOTICE! Import of posttroll failed, " - "messaging will not be used.\n") - -from trollmoves.movers import MOVERS +from trollmoves.logging import setup_logging +from trollmoves.move_it import MoveItSimple +from trollmoves.server import parse_args LOGGER = logging.getLogger("move_it") LOG_FORMAT = "[%(asctime)s %(levelname)-8s] %(message)s" -chains = {} - -# Config management - - -def read_config(filename): - """Read the config file called *filename*.""" - cp_ = RawConfigParser() - cp_.read(filename) - - res = {} - - for section in cp_.sections(): - res[section] = dict(cp_.items(section)) - res[section].setdefault("delete", False) - if res[section]["delete"] in ["", "False", "false", "0", "off"]: - res[section]["delete"] = False - res[section].setdefault("working_directory", None) - res[section].setdefault("compression", False) - - if "origin" not in res[section]: - LOGGER.warning("Incomplete section %s: add an 'origin' item.", - section) - LOGGER.info("Ignoring section %s: incomplete.", section) - del res[section] - continue - if "destinations" not in res[section]: - LOGGER.warning("Incomplete section %s: add an 'destinations' item.", - section) - LOGGER.info("Ignoring section %s: incomplete.", section) - del res[section] - continue - else: - res[section]["destinations"] = res[section]["destinations"].split() - - if "topic" in res[section]: - try: - res[section]["publish_port"] = int( - res[section]["publish_port"]) - except (KeyError, ValueError): - res[section]["publish_port"] = 0 - return res - - -def reload_config(filename, disable_backlog=False): - """Rebuild chains if needed (if the configuration changed) from *filename*.""" - LOGGER.debug("New config file detected! %s", filename) - - new_chains = read_config(filename) - - old_glob = [] - - config_changed = False - for key, val in new_chains.items(): - identical = True - if key in chains: - for key2, val2 in new_chains[key].items(): - if ((key2 not in ["notifier", "publisher"]) and - ((key2 not in chains[key]) or - (chains[key][key2] != val2))): - identical = False - config_changed = True - break - if identical: - continue - - chains[key]["notifier"].stop() - if "publisher" in chains[key]: - chains[key]["publisher"].stop() - - chains[key] = val - try: - chains[key]["publisher"] = NoisyPublisher("move_it_" + key, - val["publish_port"]) - except (KeyError, NameError): - pass - chains[key]["notifier"] = create_notifier(val) - # create logger too! - if "publisher" in chains[key]: - pub = chains[key]["publisher"].start() - chains[key]["notifier"].start() - old_glob.append(globify(val["origin"])) - - if "publisher" in chains[key]: - def copy_hook(pathname, dest, val=val, pub=pub): - fname = os.path.basename(pathname) - - destination = urlunparse((dest.scheme, - dest.hostname, - os.path.join(dest.path, fname), - dest.params, - dest.query, - dest.fragment)) - info = val.get("info", "") - if info: - info = dict((elt.strip().split('=') - for elt in info.split(";"))) - for infokey, infoval in info.items(): - if "," in infoval: - info[infokey] = infoval.split(",") - else: - info = {} - try: - info.update(parse(os.path.basename(val["origin"]), - os.path.basename(pathname))) - except ValueError: - info.update(parse(os.path.basename(os.path.splitext(val["origin"])[0]), - os.path.basename(pathname))) - - info['uri'] = destination - info['uid'] = fname - msg = Message(val["topic"], 'file', info) - pub.send(str(msg)) - LOGGER.debug("Message sent: %s", str(msg)) - - chains[key]["copy_hook"] = copy_hook - - def delete_hook(pathname, val=val, pub=pub): - fname = os.path.basename(pathname) - info = val.get("info", "") - if info: - info = dict((elt.strip().split('=') - for elt in info.split(";"))) - info['uri'] = pathname - info['uid'] = fname - msg = Message(val["topic"], 'del', info) - pub.send(str(msg)) - LOGGER.debug("Message sent: %s", str(msg)) - - chains[key]["delete_hook"] = delete_hook - - if not identical: - LOGGER.debug("Updated %s", key) - else: - LOGGER.debug("Added %s", key) - - for key in (set(chains.keys()) - set(new_chains.keys())): - chains[key]["notifier"].stop() - del chains[key] - LOGGER.debug("Removed %s", key) - - if config_changed: - LOGGER.debug("Reloaded config from %s", filename) - else: - LOGGER.debug("No changes to reload in %s", filename) - - if old_glob and not disable_backlog: - fnames = [] - for pattern in old_glob: - fnames += glob.glob(pattern) - if fnames: - time.sleep(3) - LOGGER.debug("Touching old files") - for fname in fnames: - if os.path.exists(fname): - fp_ = open(fname, "ab") - fp_.close() - old_glob = [] - LOGGER.info("Old files transferred") - -# Unpackers - -# xrit - - -def check_output(*popenargs, **kwargs): - """Copy from python 2.7, `subprocess.check_output`.""" - if 'stdout' in kwargs: - raise ValueError('stdout argument not allowed, it will be overridden.') - LOGGER.debug("Calling %s", str(popenargs)) - process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) - output, unused_err = process.communicate() - del unused_err - retcode = process.poll() - if retcode: - cmd = kwargs.get("args") - if cmd is None: - cmd = popenargs[0] - raise RuntimeError(output) - return output - - -def xrit(pathname, destination=None, cmd="./xRITDecompress"): - """Unpack xrit data.""" - opath, ofile = os.path.split(pathname) - destination = destination or "/tmp/" - dest_url = urlparse(destination) - if dest_url.scheme in ("", "file"): - _ = check_output([cmd, pathname], cwd=(destination or opath)) - else: - LOGGER.exception("Can not extract file %s to %s, destination has to be local.", - pathname, destination) - LOGGER.info("Successfully extracted %s to %s", pathname, - destination) - return os.path.join((destination or opath), ofile[:-2] + "__") - - -# bzip - -BLOCK_SIZE = 1024 - - -def bzip(origin, destination=None): - """Unzip files.""" - ofile = os.path.split(origin)[1] - destfile = os.path.join(destination or "/tmp/", ofile[:-4]) - with open(destfile, "wb") as dest: - try: - orig = bz2.BZ2File(origin, "r") - while True: - block = orig.read(BLOCK_SIZE) - - if not block: - break - dest.write(block) - LOGGER.debug("Bunzipped %s to %s", origin, destfile) - finally: - orig.close() - return destfile - -# Mover - - -def move_it(pathname, destinations, hook=None): - """Check if the file pointed by *filename* is in the filelist, and move it if it is.""" - err = None - for dest in destinations: - LOGGER.debug("Copying to: %s", dest) - dest_url = urlparse(dest) - try: - mover = MOVERS[dest_url.scheme] - except KeyError: - LOGGER.error("Unsupported protocol '%s'. Could not copy %s to %s", - str(dest_url.scheme), pathname, str(dest)) - continue - try: - mover(pathname, dest_url).copy() - if hook: - hook(pathname, dest_url) - except Exception: - LOGGER.exception("Something went wrong during copy of %s to %s", - pathname, str(dest)) - continue - else: - LOGGER.info("Successfully copied %s to %s", pathname, - str(dest)) - - if err is not None: - raise err - - -# Generic event handler - -class EventHandler(pyinotify.ProcessEvent): - """Handle events with a generic *fun* function.""" - - def __init__(self, fun, *args, **kwargs): - """Initialize event handler.""" - pyinotify.ProcessEvent.__init__(self, *args, **kwargs) - self._fun = fun - - def process_IN_CLOSE_WRITE(self, event): - """Process on closing a writable file.""" - self._fun(event.pathname) - - def process_IN_CREATE(self, event): - """Process on closing after linking.""" - try: - if os.stat(event.pathname).st_nlink > 1: - self._fun(event.pathname) - except OSError: - return - - def process_IN_MOVED_TO(self, event): - """Process on closing after moving.""" - self._fun(event.pathname) - - -def create_notifier(attrs): - """Create a notifier from the specified configuration attributes *attrs*.""" - tmask = (pyinotify.IN_CLOSE_WRITE | - pyinotify.IN_MOVED_TO | - pyinotify.IN_CREATE) - - wm_ = pyinotify.WatchManager() - - opath, ofile = os.path.split(globify(attrs["origin"])) - - def fun(pathname): - """Execute unpacking and copying/moving of *pathname*.""" - efile = os.path.basename(pathname) - if fnmatch.fnmatch(efile, ofile): - LOGGER.info("We have a match: %s", str(pathname)) - if attrs["compression"]: - try: - unpack_fun = eval(attrs["compression"]) - if "prog" in attrs: - new_path = unpack_fun(pathname, - attrs["working_directory"], - attrs["prog"]) - else: - new_path = unpack_fun(pathname, - attrs["working_directory"]) - except Exception: - LOGGER.exception("Could not decompress %s", pathname) - return - - else: - new_path = pathname - try: - move_it(new_path, attrs["destinations"], - attrs.get("copy_hook", None)) - except Exception: - LOGGER.error("Something went wrong during copy of %s", - pathname) - else: - if attrs["delete"]: - try: - os.remove(pathname) - if attrs["delete_hook"]: - attrs["delete_hook"](pathname) - LOGGER.debug("Removed %s", pathname) - except OSError as e__: - if e__.errno == 2: - LOGGER.debug("Already deleted: %s", pathname) - else: - raise - - # delete temporary file - if pathname != new_path: - try: - os.remove(new_path) - except OSError as e__: - if e__.errno == 2: - pass - else: - raise - - tnotifier = pyinotify.ThreadedNotifier(wm_, EventHandler(fun)) - - wm_.add_watch(opath, tmask) - - return tnotifier - - -def terminate(chainss): - """Terminate transfer chains.""" - for chain in chainss.itervalues(): - chain["notifier"].stop() - if "publisher" in chain: - chain["publisher"].stop() - LOGGER.info("Shutting down.") - print("Thank you for using pytroll/move_it." - " See you soon on pytroll.org!") - time.sleep(1) - sys.exit(0) - - -def parse_args(): - """Parse commandline arguments.""" - parser = argparse.ArgumentParser() - parser.add_argument("config_file", - help="The configuration file to run on.") - parser.add_argument("-l", "--log", - help="The file to log to. stdout otherwise.") - parser.add_argument("-d", "--disable-backlog", default=False, - action="store_true", - help="Disable handling of backlog. Default: resend exising files.") - return parser.parse_args() - - -def setup_logging(cmd_args): - """Set up logging.""" - global LOGGER - LOGGER = logging.getLogger('move_it') - LOGGER.setLevel(logging.DEBUG) - - if cmd_args.log: - fh_ = logging.handlers.TimedRotatingFileHandler( - os.path.join(cmd_args.log), - "midnight", - backupCount=7) - else: - fh_ = logging.StreamHandler() - - formatter = logging.Formatter(LOG_FORMAT) - fh_.setFormatter(formatter) - - LOGGER.addHandler(fh_) - - pyinotify.log.handlers = [fh_] - - def main(): - """Run move_it.""" - cmd_args = parse_args() - setup_logging(cmd_args) - - LOGGER.info("Starting up.") - - mask = (pyinotify.IN_CLOSE_WRITE | - pyinotify.IN_MOVED_TO | - pyinotify.IN_CREATE) - watchman = pyinotify.WatchManager() - - notifier = pyinotify.Notifier(watchman, EventHandler(reload_config)) - watchman.add_watch(cmd_args.config_file, mask) - - def chains_stop(*args): - del args - terminate(chains) - - signal.signal(signal.SIGTERM, chains_stop) + """Start the server.""" + cmd_args = parse_args(default_port=None) + logger = setup_logging("move_it", cmd_args) + move_it_thread = MoveItSimple(cmd_args) try: - reload_config(cmd_args.config_file, - disable_backlog=cmd_args.disable_backlog) - notifier.loop() + move_it_thread.reload_cfg_file(cmd_args.config_file) + move_it_thread.run() except KeyboardInterrupt: - LOGGER.debug("Interrupting") + logger.debug("Stopping Move It") finally: - terminate(chains) + if move_it_thread.running: + move_it_thread.chains_stop() if __name__ == '__main__': diff --git a/trollmoves/mirror.py b/trollmoves/mirror.py index 16ed7a7b..0735ed64 100644 --- a/trollmoves/mirror.py +++ b/trollmoves/mirror.py @@ -22,21 +22,18 @@ """All you need for mirroring.""" import argparse -import os import logging - -from urllib.parse import urlparse, urlunparse +import os from threading import Lock, Timer +from urllib.parse import urlparse, urlunparse from posttroll.message import Message from posttroll.publisher import get_own_ip -from trollmoves.client import Listener -from trollmoves.client import request_push +from trollmoves.client import Listener, request_push from trollmoves.logging import add_logging_options_to_parser -from trollmoves.server import RequestManager, Deleter, AbstractMoveItServer from trollmoves.move_it_base import create_publisher - +from trollmoves.server import AbstractMoveItServer, Deleter, RequestManager LOGGER = logging.getLogger(__name__) file_registry = {} @@ -92,6 +89,7 @@ def __init__(self, cmd_args): publisher = create_publisher(cmd_args.port, self.name) super().__init__(cmd_args, publisher=publisher) self.request_manager = MirrorRequestManager + self.function_to_run_on_matching_files = noop def reload_cfg_file(self, filename): """Reload the config file.""" @@ -108,7 +106,7 @@ def create_listener_notifier(self, attrs, publisher): attrs["publisher"] = publisher listeners = Listeners(attrs.pop("client_topic"), attrs.pop("providers"), **attrs) - return listeners, noop + return listeners def noop(*args, **kwargs): diff --git a/trollmoves/move_it.py b/trollmoves/move_it.py new file mode 100644 index 00000000..04b54170 --- /dev/null +++ b/trollmoves/move_it.py @@ -0,0 +1,107 @@ +"""Main module for the standalone move_it.""" + +import logging +import os +from functools import partial +from urllib.parse import urlparse + +from trollmoves.move_it_base import create_publisher +from trollmoves.movers import MOVERS +from trollmoves.server import (AbstractMoveItServer, + create_message_with_request_info, unpack) + +LOGGER = logging.getLogger(__name__) + + +class MoveItSimple(AbstractMoveItServer): + """Wrapper class for Move It.""" + + def __init__(self, cmd_args): + """Initialize server.""" + self.name = "move_it" + self.publisher = create_publisher(cmd_args.port, self.name) + super().__init__(cmd_args, self.publisher) + self.request_manager = None + self.function_to_run_on_matching_files = partial(process_notify, publisher=self.publisher) + + def reload_cfg_file(self, filename): + """Reload configuration file.""" + self.reload_config(filename, + disable_backlog=self.cmd_args.disable_backlog) + + def signal_reload_cfg_file(self, *args): + """Handle reload signal.""" + del args + self.reload_cfg_file(self.cmd_args.config_file) + + +def publish_hook(pathname, dest_url, config, publisher): + """Publish hook for move_it.""" + dest = os.path.join(dest_url.path, os.path.basename(pathname)) + msg = create_message_with_request_info(dest, pathname, config) + publisher.send(msg) + + +def process_notify(pathname, publisher, chain_config): + """Execute unpacking and copying/moving of *pathname*.""" + LOGGER.info("We have a match: %s", str(pathname)) + new_path = unpack(pathname, **chain_config) + try: + if publisher is not None: + publisher_hook = partial(publish_hook, publisher=publisher, config=chain_config) + else: + publisher_hook = None + move_it(new_path, chain_config["destinations"].split(), publisher_hook) + except Exception: + LOGGER.error("Something went wrong during copy of %s", + pathname) + else: + if chain_config["delete"]: + try: + os.remove(pathname) + if chain_config["delete_hook"]: + chain_config["delete_hook"](pathname) + LOGGER.debug("Removed %s", pathname) + except OSError as e__: + if e__.errno == 2: + LOGGER.debug("Already deleted: %s", pathname) + else: + raise + + # delete temporary file + if pathname != new_path: + try: + os.remove(new_path) + except OSError as e__: + if e__.errno == 2: + pass + else: + raise + + +def move_it(pathname, destinations, hook=None): + """Check if the file pointed by *filename* is in the filelist, and move it if it is.""" + err = None + for dest in destinations: + LOGGER.debug("Copying to: %s", dest) + dest_url = urlparse(dest) + try: + mover = MOVERS[dest_url.scheme] + except KeyError: + LOGGER.error("Unsupported protocol '%s'. Could not copy %s to %s", + str(dest_url.scheme), pathname, str(dest)) + continue + try: + mover(pathname, dest_url).copy() + if hook: + hook(pathname, dest_url) + except Exception: + LOGGER.exception("Something went wrong during copy of %s to %s", + pathname, str(dest)) + continue + else: + LOGGER.info("Successfully copied %s to %s", pathname, + str(dest)) + + if err is not None: + raise err diff --git a/trollmoves/move_it_base.py b/trollmoves/move_it_base.py index 64785047..ef4a52f5 100644 --- a/trollmoves/move_it_base.py +++ b/trollmoves/move_it_base.py @@ -75,13 +75,9 @@ def terminate(self): def setup_watchers(self): """Set up watcher for the configuration file.""" config_file = self.cmd_args.config_file + reload_function = self.reload_cfg_file - observer = Observer() - handler = WatchdogChangeHandler(self.reload_cfg_file) - - observer.schedule(handler, config_file) - - self.notifier = observer + self.notifier = create_notifier_for_file(config_file, reload_function) def run(self): """Start the transfer chains.""" @@ -107,9 +103,20 @@ def _run(self): raise NotImplementedError +def create_notifier_for_file(file_to_watch, function_to_run_on_file): + """Create a notifier for a given file.""" + observer = Observer() + handler = WatchdogChangeHandler(function_to_run_on_file) + + observer.schedule(handler, file_to_watch) + return observer + + def create_publisher(port, publisher_name): """Create a publisher using port *port* and start it.""" LOGGER.info("Starting publisher on port %s.", str(port)) + if port is None: + return None publisher = Publisher("tcp://*:" + str(port), publisher_name) publisher.start() return publisher diff --git a/trollmoves/server.py b/trollmoves/server.py index 6d9f8e66..fa328021 100644 --- a/trollmoves/server.py +++ b/trollmoves/server.py @@ -23,6 +23,7 @@ """Classes and functions for Trollmoves server.""" import argparse +import bz2 import datetime import errno import fnmatch @@ -33,30 +34,29 @@ import tempfile import time from collections import deque -from threading import Lock, Thread from configparser import ConfigParser -from queue import Empty, Queue -from urllib.parse import urlparse from contextlib import suppress from functools import partial +from queue import Empty, Queue +from threading import Lock, Thread +from urllib.parse import urlparse -import bz2 -from zmq import NOBLOCK, POLLIN, PULL, PUSH, ROUTER, Poller, ZMQError -from watchdog.events import FileSystemEventHandler -from watchdog.observers.polling import PollingObserver -from watchdog.observers import Observer from posttroll import get_context from posttroll.message import Message, MessageError from posttroll.publisher import get_own_ip from posttroll.subscriber import Subscribe from trollsift import globify, parse +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver +from zmq import NOBLOCK, POLLIN, PULL, PUSH, ROUTER, Poller, ZMQError from trollmoves.client import DEFAULT_REQ_TIMEOUT +from trollmoves.logging import add_logging_options_to_parser +from trollmoves.move_it_base import MoveItBase, create_publisher from trollmoves.movers import move_it from trollmoves.utils import (clean_url, gen_dict_contains, gen_dict_extract, is_file_local) -from trollmoves.move_it_base import MoveItBase, create_publisher -from trollmoves.logging import add_logging_options_to_parser LOGGER = logging.getLogger(__name__) @@ -317,14 +317,14 @@ def terminate(self, publisher=None): def reload_config(self, filename, notifier_builder=None, disable_backlog=False, - use_watchdog=False): + use_polling=False): """Rebuild chains if needed (if the configuration changed) from *filename*.""" LOGGER.debug("New config file detected: %s", filename) new_chain_configs = read_config(filename) - old_glob = _update_chains(self.chains, new_chain_configs, self.request_manager, use_watchdog, self.publisher, - notifier_builder) + old_glob = _update_chains(self.chains, new_chain_configs, self.request_manager, use_polling, + notifier_builder, self.function_to_run_on_matching_files) _disable_removed_chains(self.chains, new_chain_configs) LOGGER.debug("Reloaded config from %s", filename) _process_old_files(old_glob, disable_backlog) @@ -336,6 +336,8 @@ def _run(self): except ZMQError: if self.running: raise + except AttributeError: + pass class MoveItServer(AbstractMoveItServer): @@ -347,12 +349,13 @@ def __init__(self, cmd_args): publisher = create_publisher(cmd_args.port, self.name) super().__init__(cmd_args, publisher=publisher) self.request_manager = RequestManager + self.function_to_run_on_matching_files = partial(process_notify, publisher=self.publisher) def reload_cfg_file(self, filename): """Reload configuration file.""" self.reload_config(filename, disable_backlog=self.cmd_args.disable_backlog, - use_watchdog=self.cmd_args.watchdog) + use_polling=self.cmd_args.watchdog) def signal_reload_cfg_file(self, *args): """Handle reload signal.""" @@ -627,7 +630,8 @@ def _verify_publish_port(conf): conf["publish_port"] = 0 -def _update_chains(chains, new_chain_configs, manager, use_watchdog, publisher, notifier_builder): +def _update_chains(chains, new_chain_configs, manager, use_polling, notifier_builder, + function_to_run_on_matching_files): old_glob = [] for chain_name, chain_config in new_chain_configs.items(): chain_updated = False @@ -642,10 +646,11 @@ def _update_chains(chains, new_chain_configs, manager, use_watchdog, publisher, except ConfigError: continue - fun = chain.create_notifier_and_get_function(notifier_builder, use_watchdog, publisher) + chain.create_notifier(notifier_builder, use_polling, function_to_run_on_matching_files) + chain.start() if 'origin' in chain_config: - old_glob.append((globify(chain_config["origin"]), fun, chain_config)) + old_glob.append((globify(chain_config["origin"]), chain.function_to_run, chain_config)) if chain_updated: LOGGER.debug("Updated %s", chain_name) @@ -656,14 +661,12 @@ def _update_chains(chains, new_chain_configs, manager, use_watchdog, publisher, def _chains_are_identical(chains, new_chains, chain_name): - identical = True for config_key, config_value in new_chains[chain_name].items(): if ((config_key not in ["notifier", "publisher"]) and ((config_key not in chains[chain_name].config) or (chains[chain_name].config[config_key] != config_value))): - identical = False - break - return identical + return False + return True class Chain: @@ -676,6 +679,7 @@ def __init__(self, name, config): self.request_manager = None self.notifier = None self.needs_manager = "request_port" in self.config + self.function_to_run = None def create_manager(self, manager): """Create a request manager.""" @@ -690,16 +694,24 @@ def create_manager(self, manager): LOGGER.error('Invalid config parameters in %s: %s', self.name, str(err)) LOGGER.warning('Remove and skip %s', self.name) raise - self.request_manager.start() - def create_notifier_and_get_function(self, notifier_builder, use_watchdog, publisher): + def create_notifier(self, notifier_builder, use_polling, function_to_run_on_matching_files): """Create a notifier and get the function.""" if notifier_builder is None: - notifier_builder = _get_notifier_builder(use_watchdog, self.config) - self.notifier, fun = notifier_builder(self.config, publisher) - self.notifier.start() + notifier_builder = _get_notifier_builder(use_polling, self.config) + + self.function_to_run = partial(function_to_run_on_matching_files, chain_config=self.config) + pattern = globify(self.config["origin"]) + timeout = float(self.config.get("watchdog_timeout", 1.)) + LOGGER.debug("Watchdog timeout: %.1f", timeout) - return fun + self.notifier = notifier_builder(pattern, self.function_to_run, timeout) + + def start(self): + """Start the chain.""" + if self.request_manager is not None: + self.request_manager.start() + self.notifier.start() def stop(self): """Stop the chain.""" @@ -719,47 +731,44 @@ def _add_chain(chains, chain_name, chain_config, manager): return current_chain -def _get_notifier_builder(use_watchdog, val): - if 'origin' in val: - if use_watchdog: +def _get_notifier_builder(use_polling, chain_config): + if 'origin' in chain_config: + if use_polling: LOGGER.info("Using Watchdog notifier") notifier_builder = create_watchdog_polling_notifier else: LOGGER.info("Using os-based notifier") notifier_builder = create_watchdog_os_notifier - elif 'listen' in val: + elif 'listen' in chain_config: notifier_builder = create_posttroll_notifier return notifier_builder -def create_watchdog_polling_notifier(attrs, publisher): +def create_watchdog_polling_notifier(pattern, function_to_run_on_matching_files, timeout=1.0): """Create a notifier from the specified configuration attributes *attrs*.""" - return create_watchdog_notifier(attrs, publisher, PollingObserver) + observer_class = partial(PollingObserver, timeout=timeout) + return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class) -def create_watchdog_os_notifier(attrs, publisher): +def create_watchdog_os_notifier(pattern, function_to_run_on_matching_files, timeout=1.0): """Create a notifier from the specified configuration attributes *attrs*.""" - return create_watchdog_notifier(attrs, publisher, Observer) + observer_class = partial(Observer, timeout=timeout) + return create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class) -def create_watchdog_notifier(attrs, publisher, observer_class): +def create_watchdog_notifier(pattern, function_to_run_on_matching_files, observer_class): """Create a watchdog notifier.""" - pattern = globify(attrs["origin"]) opath = os.path.dirname(pattern) - - timeout = float(attrs.get("watchdog_timeout", 1.)) - LOGGER.debug("Watchdog timeout: %.1f", timeout) - observer = observer_class(timeout=timeout) - partial_process_notify = partial(process_notify, publisher, attrs) - handler = WatchdogCreationHandler(partial_process_notify, pattern) + observer = observer_class() + handler = WatchdogCreationHandler(function_to_run_on_matching_files, pattern) observer.schedule(handler, opath) - return observer, partial_process_notify + return observer -def process_notify(orig_pathname, publisher, attrs): +def process_notify(orig_pathname, publisher, chain_config): """Publish what we have.""" if os.stat(orig_pathname).st_size == 0: LOGGER.debug("Ignoring empty file: %s", orig_pathname) @@ -767,11 +776,16 @@ def process_notify(orig_pathname, publisher, attrs): else: LOGGER.debug('We have a match: %s', orig_pathname) - pathname = unpack(orig_pathname, **attrs) + pathname = unpack(orig_pathname, **chain_config) + publish_file(orig_pathname, publisher, chain_config, pathname) + + +def publish_file(orig_pathname, publisher, attrs, unpacked_pathname): + """Publish a file.""" if "request_port" in attrs: - msg = create_message_with_request_info(pathname, orig_pathname, attrs) + msg = create_message_with_request_info(unpacked_pathname, orig_pathname, attrs) else: - msg = create_message_with_remote_fs_info(pathname, orig_pathname, attrs) + msg = create_message_with_remote_fs_info(unpacked_pathname, orig_pathname, attrs) publisher.send(str(msg)) LOGGER.debug("Message sent: %s", str(msg)) @@ -781,7 +795,7 @@ class WatchdogCreationHandler(FileSystemEventHandler): def __init__(self, fun, pattern): """Initialize the processor.""" - super().__init__(self) + super().__init__() self.pattern = pattern self.fun = fun @@ -816,7 +830,8 @@ def create_message_with_request_info(pathname, orig_pathname, attrs): def create_message_with_remote_fs_info(pathname, orig_pathname, attrs): """Create a message containing remote filesystem info.""" - from pytroll_collectors.fsspec_to_message import extract_local_files_to_message_for_remote_use + from pytroll_collectors.fsspec_to_message import \ + extract_local_files_to_message_for_remote_use msg = extract_local_files_to_message_for_remote_use(pathname, attrs['topic'], attrs.get("unpack")) info = _collect_attribute_info(attrs) info.update(parse(attrs["origin"], orig_pathname)) @@ -929,14 +944,14 @@ def unpack(pathname, return pathname -def parse_args(args=None): +def parse_args(args=None, default_port=9010): """Parse command-line arguments.""" parser = argparse.ArgumentParser() parser.add_argument("config_file", help="The configuration file to run on.") parser.add_argument("-p", "--port", help="The port to publish on. 9010 is the default", - default=9010) + default=default_port) parser.add_argument("--disable-backlog", help="Disable glob and handling of backlog of files at start/restart", action='store_true') diff --git a/trollmoves/tests/test_move_it.py b/trollmoves/tests/test_move_it.py new file mode 100644 index 00000000..f5b09a3c --- /dev/null +++ b/trollmoves/tests/test_move_it.py @@ -0,0 +1,85 @@ +"""Tests for move_it.""" + +import os +import time + +from posttroll.testing import patched_publisher + +from trollmoves.move_it import MoveItSimple +from trollmoves.server import parse_args + +move_it_config_template = """[eumetcast_hrit] +delete=False +topic=/HRIT/L0/dev +info=sensors=seviri;stream=eumetcast +""" + + +def test_move_it_moves_files(tmp_path): + """Test that move it moves a file.""" + input_dir = tmp_path / "in" + output_dir = tmp_path / "out" + os.mkdir(input_dir) + os.mkdir(output_dir) + origin = "origin=" + str(input_dir / "bla{number:1s}.txt") + destinations = "destinations=" + str(output_dir) + local_move_it_config = "\n".join([move_it_config_template, origin, destinations]) + config_file = tmp_path / "move_it.cfg" + with open(config_file, "w") as fd: + fd.write(local_move_it_config) + + cmd_args = parse_args([str(config_file)], default_port=None) + move_it_thread = MoveItSimple(cmd_args) + + move_it_thread.reload_cfg_file(cmd_args.config_file) + + from threading import Thread + thr = Thread(target=move_it_thread.run) + thr.start() + + with open(input_dir / "bla1.txt", "w"): + pass + time.sleep(.1) + try: + assert move_it_thread.publisher is None + assert os.path.exists(output_dir / "bla1.txt") + finally: + move_it_thread.chains_stop() + + +def test_move_it_published_a_message(tmp_path): + """Test that move it is publishing messages when provided a port.""" + input_dir = tmp_path / "in" + output_dir = tmp_path / "out" + os.mkdir(input_dir) + os.mkdir(output_dir) + origin = "origin=" + str(input_dir / "bla{number:1s}.txt") + destinations = "destinations=" + str(output_dir) + local_move_it_config = "\n".join([move_it_config_template, origin, destinations]) + config_file = tmp_path / "move_it.cfg" + with open(config_file, "w") as fd: + fd.write(local_move_it_config) + + with patched_publisher() as message_list: + cmd_args = parse_args([str(config_file), "-p", "2022"]) + move_it_thread = MoveItSimple(cmd_args) + + move_it_thread.reload_cfg_file(cmd_args.config_file) + + from threading import Thread + thr = Thread(target=move_it_thread.run) + thr.start() + + with open(input_dir / "bla1.txt", "w"): + pass + time.sleep(.1) + try: + assert os.path.exists(output_dir / "bla1.txt") + finally: + move_it_thread.chains_stop() + + assert len(message_list) == 1 + message = message_list[0] + assert message.type == "file" + assert message.data == {"sensors": "seviri", "stream": "eumetcast", "number": "1", + "uri": str(output_dir / "bla1.txt"), "uid": "bla1.txt"} diff --git a/trollmoves/tests/test_server.py b/trollmoves/tests/test_server.py index 6abda364..07a03ff2 100644 --- a/trollmoves/tests/test_server.py +++ b/trollmoves/tests/test_server.py @@ -22,51 +22,26 @@ """Test Trollmoves server.""" -from unittest.mock import MagicMock, patch -import unittest -from tempfile import TemporaryDirectory, NamedTemporaryFile +import datetime as dt import os -from collections import deque import time -import datetime as dt -import pytest +import unittest +from collections import deque +from tempfile import NamedTemporaryFile, TemporaryDirectory +from unittest.mock import MagicMock, patch +import pytest from trollsift import globify -from trollmoves.server import MoveItServer, parse_args - -@patch("trollmoves.server.process_notify") -def test_create_watchdog_notifier(process_notify): - """Test creating a watchdog notifier.""" - from trollmoves.server import create_watchdog_polling_notifier - - fname = "20200428_1000_foo.tif" - fname_pattern = "{start_time:%Y%m%d_%H%M}_{product}.tif" - publisher = "publisher" - with TemporaryDirectory() as tmpdir: - pattern_path = os.path.join(tmpdir, fname_pattern) - file_path = os.path.join(tmpdir, fname) - attrs = {"origin": pattern_path} - observer, fun = create_watchdog_polling_notifier(attrs, publisher) - observer.start() - - with open(os.path.join(file_path), "w") as fid: - fid.write('') - - # Wait for a while for the watchdog to register the event - time.sleep(2.0) - - observer.stop() - observer.join() - - fun.assert_called_with(file_path, publisher, globify(pattern_path), attrs) +from trollmoves.server import MoveItServer, parse_args def test_file_detected_with_inotify_is_published(tmp_path): """Test that a file detected with inotify is published.""" - from posttroll.testing import patched_publisher from threading import Thread + from posttroll.testing import patched_publisher + test_file_path = tmp_path / "my_file.hdf" config_file = f""" @@ -90,7 +65,7 @@ def test_file_detected_with_inotify_is_published(tmp_path): with open(test_file_path, "w") as fd: fd.write("hello!") - time.sleep(.1) + time.sleep(.2) try: assert len(message_list) == 1 assert str(test_file_path) in message_list[0] @@ -99,62 +74,58 @@ def test_file_detected_with_inotify_is_published(tmp_path): thr.join() -@patch("trollmoves.server.WatchdogHandler") -@patch("trollmoves.server.PollingObserver") -@patch("trollmoves.server.process_notify") -def test_create_watchdog_notifier_timeout_default(process_notify, PollingObserver, WatchdogHandler): - """Test creating a watchdog notifier with default settings.""" +def test_create_watchdog_notifier(tmp_path): + """Test creating a polling notifier.""" from trollmoves.server import create_watchdog_polling_notifier - attrs = {"origin": "/tmp"} - publisher = "" - # No timeout, the default should be used - observer, fun = create_watchdog_polling_notifier(attrs, publisher) - PollingObserver.assert_called_with(timeout=1.0) + fname = "20200428_1000_foo.tif" + fname_pattern = "{start_time:%Y%m%d_%H%M}_{product}.tif" + pattern_path = os.path.join(tmp_path, fname_pattern) + file_path = os.path.join(tmp_path, fname) + function_to_run = MagicMock() + observer = create_watchdog_polling_notifier(globify(pattern_path), function_to_run, timeout=.1) + observer.start() + with open(os.path.join(file_path), "w") as fid: + fid.write('') + # Wait for a while for the watchdog to register the event + time.sleep(.2) -@patch("trollmoves.server.WatchdogHandler") -@patch("trollmoves.server.PollingObserver") -@patch("trollmoves.server.process_notify") -def test_create_watchdog_notifier_timeout_float_timeout(process_notify, PollingObserver, WatchdogHandler): - """Test creating a watchdog notifier with default settings.""" - from trollmoves.server import create_watchdog_polling_notifier + observer.stop() + observer.join() - attrs = {"origin": "/tmp", "watchdog_timeout": 2.0} - publisher = "" - observer, fun = create_watchdog_polling_notifier(attrs, publisher) - PollingObserver.assert_called_with(timeout=2.0) + function_to_run.assert_called_with(file_path) -@patch("trollmoves.server.WatchdogHandler") +@pytest.mark.parametrize("config,expected_timeout", + [({"origin": "/tmp"}, 1.0), + ({"origin": "/tmp", "watchdog_timeout": 2.0}, 2.0), + ({"origin": "/tmp", "watchdog_timeout": "3.0"}, 3.0), + ]) @patch("trollmoves.server.PollingObserver") -@patch("trollmoves.server.process_notify") -def test_create_watchdog_notifier_timeout_string_timeout(process_notify, PollingObserver, WatchdogHandler): +def test_create_watchdog_notifier_timeout_default(PollingObserver, config, expected_timeout): """Test creating a watchdog notifier with default settings.""" - from trollmoves.server import create_watchdog_polling_notifier + from trollmoves.server import Chain + chain = Chain("some_chain", config) + function_to_run = MagicMock() + chain.create_notifier(notifier_builder=None, use_polling=True, function_to_run_on_matching_files=function_to_run) + PollingObserver.assert_called_with(timeout=expected_timeout) - attrs = {"origin": "/tmp", "watchdog_timeout": "3.0"} - publisher = "" - observer, fun = create_watchdog_polling_notifier(attrs, publisher) - PollingObserver.assert_called_with(timeout=3.0) +def test_handler_does_not_dispatch_files_not_matching_pattern(): + """Test that the handle does not dispatch files that are not matching the pattern.""" + from trollmoves.server import WatchdogCreationHandler -@patch("trollmoves.server.file_cache", new_callable=deque) -@patch("trollmoves.server.Message") -def test_process_notify_not_matching_file(Message, file_cache): - """Test process_notify() with a file that doesn't match the configured pattern.""" - from trollmoves.server import process_notify - - publisher = MagicMock() - not_matching_pattern = "bar" + function_to_run = MagicMock() - _ = _run_process_notify(process_notify, publisher, not_matching_pattern) + handler = WatchdogCreationHandler(function_to_run, pattern="bar") + event = MagicMock() + event.dest_path = "foo" + event.is_directory = False + assert handler.dispatch(event) is None - publisher.assert_not_called() - assert len(file_cache) == 0 - -def _run_process_notify(process_notify, publisher, pattern=None): +def _run_process_notify(process_notify, publisher): fname = "20200428_1000_foo.tif" fname_pattern = "{start_time:%Y%m%d_%H%M}_{product}.tif" @@ -165,13 +136,11 @@ def _run_process_notify(process_notify, publisher, pattern=None): "request_address": "localhost", "request_port": "9001", "topic": "/topic"} - if pattern is None: - pattern = globify(matching_pattern) with open(os.path.join(pathname), "w") as fid: fid.write('foo') - process_notify(pathname, publisher, pattern, kwargs) + process_notify(pathname, publisher, kwargs) return pathname, fname, kwargs @@ -278,9 +247,10 @@ def test_requestmanager_run_valid_pytroll_message(patch_process_request, patch_poller, patch_get_context): """Test request manager run with valid address and payload.""" + from posttroll.message import _MAGICK from zmq import POLLIN + from trollmoves.server import RequestManager - from posttroll.message import _MAGICK payload = (_MAGICK + r'/test/1/2/3 info ras@hawaii 2008-04-11T22:13:22.123000 v1.01' + r' text/ascii "what' + r"'" + r's up doc"') @@ -308,9 +278,11 @@ def test_requestmanager_run_MessageError_exception(patch_validate_file_pattern, patch_get_context, caplog): """Test request manager run with invalid payload causing a MessageError exception.""" + import logging + from zmq import POLLIN + from trollmoves.server import RequestManager - import logging patch_get_address_and_payload.return_value = "address", "fake_payload" port = 9876 patch_poller.return_value = {'POLLIN': POLLIN}