From cc3c11d0130812c73f02deee9cfda8bd9a0cc19f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Mon, 22 Jan 2024 13:37:59 +0100 Subject: [PATCH] WIP: crawler refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Aurélien Bompard --- mirrormanager2/crawler/__init__.py | 0 mirrormanager2/crawler/cli.py | 221 +++++++++++++ mirrormanager2/crawler/connection_pool.py | 54 ++++ mirrormanager2/crawler/connector.py | 102 ++++++ mirrormanager2/crawler/constants.py | 11 + mirrormanager2/crawler/continents.py | 95 ++++++ mirrormanager2/crawler/crawler.py | 375 ++++++++++++++++++++++ mirrormanager2/crawler/ftp_connector.py | 117 +++++++ mirrormanager2/crawler/http.py | 182 +++++++++++ mirrormanager2/crawler/http_connector.py | 130 ++++++++ mirrormanager2/crawler/log.py | 84 +++++ mirrormanager2/crawler/notif.py | 41 +++ mirrormanager2/crawler/propagation.py | 130 ++++++++ mirrormanager2/crawler/reporter.py | 164 ++++++++++ mirrormanager2/crawler/rsync_connector.py | 117 +++++++ mirrormanager2/crawler/threads.py | 71 ++++ mirrormanager2/lib/__init__.py | 17 + pyproject.toml | 1 + 18 files changed, 1912 insertions(+) create mode 100644 mirrormanager2/crawler/__init__.py create mode 100755 mirrormanager2/crawler/cli.py create mode 100755 mirrormanager2/crawler/connection_pool.py create mode 100755 mirrormanager2/crawler/connector.py create mode 100755 mirrormanager2/crawler/constants.py create mode 100755 mirrormanager2/crawler/continents.py create mode 100755 mirrormanager2/crawler/crawler.py create mode 100755 mirrormanager2/crawler/ftp_connector.py create mode 100755 mirrormanager2/crawler/http.py create mode 100755 mirrormanager2/crawler/http_connector.py create mode 100755 mirrormanager2/crawler/log.py create mode 100755 mirrormanager2/crawler/notif.py create mode 100755 mirrormanager2/crawler/propagation.py create mode 100755 mirrormanager2/crawler/reporter.py create mode 100755 mirrormanager2/crawler/rsync_connector.py create mode 100755 mirrormanager2/crawler/threads.py diff --git a/mirrormanager2/crawler/__init__.py b/mirrormanager2/crawler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mirrormanager2/crawler/cli.py b/mirrormanager2/crawler/cli.py new file mode 100755 index 000000000..7b08d488c --- /dev/null +++ b/mirrormanager2/crawler/cli.py @@ -0,0 +1,221 @@ +import logging +import os +import time +from functools import partial + +import click + +import mirrormanager2.lib +from mirrormanager2.lib.database import get_db_manager + +from ..utility.common import read_config +from .constants import CONTINENTS +from .crawler import worker +from .log import setup_logging +from .threads import run_in_threadpool + +logger = logging.getLogger("crawler") + +# def notify(options, topic, msg): +# if not options["fedmsg"]: +# return +# +# mirrormanager2.lib.notifications.fedmsg_publish( +# f"mirrormanager.crawler.{topic}", +# msg, +# ) + + +def validate_continents(ctx, param, value): + value = [c.upper() for c in value] + for item in value: + cont = item.lstrip("^") + if cont not in CONTINENTS: + raise click.BadParameter( + f"Unknown continent {cont}. Known continents: {', '.join(CONTINENTS)}" + ) + return value + + +@click.group() +@click.option( + "-c", + "--config", + envvar="MM2_CONFIG", + default="/etc/mirrormanager/mirrormanager2.cfg", + help="Configuration file to use", +) +@click.option( + "--include-private", + is_flag=True, + default=False, + help="Include hosts marked 'private' in the crawl", +) +@click.option( + "-t", + "--threads", + type=int, + default=10, + help="max threads to start in parallel", + # callback=set_global, +) +@click.option( + "--timeout-minutes", + "timeout", + type=int, + default=120, + help="per-host timeout, in minutes", + # callback=set_global, +) +@click.option( + "--startid", + type=int, + metavar="ID", + default=0, + help="Start crawling at host ID (default=0)", +) +@click.option( + "--stopid", + type=int, + metavar="ID", + default=0, + help="Stop crawling before host ID (default=no limit)", +) +@click.option( + "--category", + "categories", + multiple=True, + help="Category to scan (default=all), can be repeated", +) +@click.option( + "--disable-fedmsg", + "fedmsg", + is_flag=True, + default=True, + help="Disable fedora-messaging notifications at the beginning and end of crawl", +) +@click.option( + "--canary", + is_flag=True, + default=False, + help="Fast crawl by only checking if mirror can be reached", +) +@click.option( + "--repodata", + is_flag=True, + default=False, + help="Fast crawl by only checking if the repodata is up to date", +) +@click.option( + "--continent", + "continents", + multiple=True, + callback=validate_continents, + help="Limit crawling by continent. Exclude by prefixing with'^'", +) +@click.option( + "--debug", + "-d", + is_flag=True, + default=False, + help="enable printing of debug-level messages", +) +@click.pass_context +def main(ctx, config, debug, startid, stopid, **kwargs): + ctx.ensure_object(dict) + setup_logging(debug) + ctx.obj["options"] = ctx.params + config = read_config(config) + ctx.obj["config"] = config + db_manager = get_db_manager(config) + session = db_manager.Session() + + # Get *all* of the mirrors + hosts = mirrormanager2.lib.get_mirrors( + session, + private=False, + order_by_crawl_duration=True, + admin_active=True, + user_active=True, + site_private=False, + site_user_active=True, + site_admin_active=True, + ) + + # # Get a list of host names for fedmsg + # host_names = [ + # host.name + # for host in hosts + # if (not host.id < options["startid"] and not host.id >= options["stopid"]) + # ] + + # Limit our host list down to only the ones we really want to crawl + hosts = [host for host in hosts if (host.id >= startid and (not stopid or host.id < stopid))] + ctx.obj["hosts"] = hosts + + session.close() + + +@main.command() +@click.pass_context +def crawl(ctx): + options = ctx.obj["options"] + config = ctx.obj["config"] + starttime = time.monotonic() + host_ids = [host.id for host in ctx.obj["hosts"]] + + # And then, for debugging, only do one host + # hosts = [hosts.next()] + + # hostlist = [dict(id=id, host=host) for id, host in zip(hosts, host_names)] + # msg = dict(hosts=hostlist) + # msg["options"] = options + # notify(options, "start", msg) + + # Before we do work, chdir to /var/tmp/. mirrormanager1 did this and I'm + # not totally sure why... + os.chdir("/var/tmp") + + results = run_in_threadpool( + partial(worker, options, config), + host_ids, + max_threads=options["threads"], + timeout=options["timeout"], + ) + + # Put a bow on the results for fedmsg + results = [ + dict(result=result, host=host.name, id=host.id) + for result, host in zip(results, ctx.obj["hosts"]) + ] + # notify(options, "complete", dict(results=results)) + print(results) + + # if options["canary"]: + # mode = " in canary mode" + # elif options["repodata"]: + # mode = " in repodata mode" + # else: + # mode = "" + # logger.info("%d of %d hosts failed%s" % (hosts_failed, current_host, mode)) + logger.info("Crawler finished after %d seconds", (time.monotonic() - starttime)) + return results + + +@main.command() +@click.option( + "--prop-repo", + default="rawhide", + help="Repository prefix to use for propagation. Defaults to 'rawhide'.", +) +@click.pass_context +def propagation(ctx, prop_repo): + """Print out information about repomd.xml propagation. + + Defaults to development/rawhide/x86_64/os/repodata + Only the category 'Fedora Linux' is supported + """ + print(ctx.obj) + print(ctx.params) + # options = ctx.obj["options"] + pass diff --git a/mirrormanager2/crawler/connection_pool.py b/mirrormanager2/crawler/connection_pool.py new file mode 100755 index 000000000..8fe6afb1b --- /dev/null +++ b/mirrormanager2/crawler/connection_pool.py @@ -0,0 +1,54 @@ +import logging +from functools import partial +from urllib.parse import urlsplit + +from .ftp_connector import FTPConnector +from .http_connector import HTTPConnector, HTTPSConnector + +logger = logging.getLogger("crawler") + + +def _get_connection_class(scheme): + if scheme == "http": + return HTTPConnector + if scheme == "https": + return HTTPSConnector + if scheme == "ftp": + return FTPConnector + + +class ConnectionPool: + def __init__(self, config, debuglevel=0, timeout_minutes=120): + self._connections = {} + self.config = config + self.debuglevel = debuglevel + self.timeout_minutes = timeout_minutes + + def get(self, url): + scheme, netloc, path, query, fragment = urlsplit(url) + if netloc not in self._connections: + connection_class = _get_connection_class(scheme) + self._connections[netloc] = connection_class( + debuglevel=self.debuglevel, + timeout_minutes=self.timeout_minutes, + on_closed=partial(self._remove_connection, netloc), + ) + # self._connections[netloc] = self._connect(netloc) + # self._connections[netloc].set_debuglevel(self.debuglevel) + return self._connections[netloc] + + def close(self, url): + scheme, netloc, path, query, fragment = urlsplit(url) + try: + connection = self._connections[netloc] + except KeyError: + return + connection.close() + + def _remove_connection(self, netloc, connection): + del self._connections[netloc] + + def close_all(self): + for connection in list(self._connections.values()): + connection.close() + self._connections = {} diff --git a/mirrormanager2/crawler/connector.py b/mirrormanager2/crawler/connector.py new file mode 100755 index 000000000..7f6b3af4e --- /dev/null +++ b/mirrormanager2/crawler/connector.py @@ -0,0 +1,102 @@ +import hashlib +import logging +from urllib.parse import urlsplit + +logger = logging.getLogger("crawler") + + +class TryLater(Exception): + pass + + +class ForbiddenExpected(Exception): + pass + + +class Connector: + scheme = None + + def __init__(self, debuglevel, timeout_minutes, on_closed): + self.debuglevel = debuglevel + # ftplib and httplib take the timeout in seconds + self.timeout = timeout_minutes * 60 + self._connection = None + self._on_closed = on_closed + + def open(self, url): + scheme, netloc, path, query, fragment = urlsplit(url) + self._connection = self._connect(netloc) + return self._connection + + def close(self): + if self._connection is not None: + self._close() + self._on_closed(self) + + def _connect(self, url): + raise NotImplementedError + + def _close(self): + raise NotImplementedError + + def _get_file(self, url): + raise NotImplementedError + + # TODO: backoff on TryAgain with message + # f"Server load exceeded on {host!r} - try later ({try_later_delay} seconds)" + def check_dir(self, url, directory): + return self._check_dir(url, directory) + + def _check_dir(self, url, directory): + raise NotImplementedError + + def compare_sha256(self, directory, filename, graburl): + """looks for a FileDetails object that matches the given URL""" + found = False + contents = self._get_file(graburl) + sha256 = hashlib.sha256(contents).hexdigest() + for fd in list(directory.fileDetails): + if fd.filename == filename and fd.sha256 is not None: + if fd.sha256 == sha256: + found = True + break + return found + + def _get_dir_url(self, url, directory, category_prefix_length): + dirname = directory.name[category_prefix_length:] + return f"{url}/{dirname}" + + def check_category( + self, + url, + trydirs, + category_prefix_length, + timeout, + only_repodata, + ): + statuses = {} + for d in trydirs: + timeout.check() + + if not d.readable: + continue + + # d.files is a dict which contains the last (maybe 10) files + # of the current directory. umdl copies the pickled dict + # into the database. It is either a dict or nothing. + if not isinstance(d.files, dict): + continue + + if only_repodata and not d.name.endswith("/repodata"): + continue + + dir_url = self._get_dir_url(url, d, category_prefix_length) + + dir_status = self.check_dir(dir_url, d) + if dir_status is None: + # could be a dir with no files, or an unreadable dir. + # defer decision on this dir, let a child decide. + return + statuses[d] = dir_status + + return statuses diff --git a/mirrormanager2/crawler/constants.py b/mirrormanager2/crawler/constants.py new file mode 100755 index 000000000..4c8a9c40e --- /dev/null +++ b/mirrormanager2/crawler/constants.py @@ -0,0 +1,11 @@ +# hard coded list of continents; let's hope this does not change all the time +# this is according to GeoIP +CONTINENTS = ["AF", "AN", "AS", "EU", "NA", "OC", "SA", "--"] + +# propagation URLs +PROPAGATION_ARCH = "x86_64" + +# number of minutes to wait if a signal is received to shutdown the crawler +SHUTDOWN_TIMEOUT = 5 + +THREAD_TIMEOUT = 120 # minutes diff --git a/mirrormanager2/crawler/continents.py b/mirrormanager2/crawler/continents.py new file mode 100755 index 000000000..0bbe03c66 --- /dev/null +++ b/mirrormanager2/crawler/continents.py @@ -0,0 +1,95 @@ +import csv +import functools +import logging +import os +import socket +from functools import cache +from importlib import resources +from urllib.parse import urlparse + +import geoip2 + +import mirrormanager2.lib + +from .constants import CONTINENTS + +logger = logging.getLogger("crawler") + + +class WrongContinent(Exception): + pass + + +class BrokenBaseUrl(ValueError): + pass + + +def filter_continents(asked): + continents = [] + for continent in CONTINENTS: + if f"^{continent}" in asked: + continue + if asked and continent not in asked: + continue + continents.append(continent) + return continents + + +@functools.cache +def get_country_continents(session): + country_continent_csv = resources.files("mirrormanager2.crawler").joinpath( + "country_continent.csv" + ) + new_country_continents = {} + with country_continent_csv.open("r") as infile: + reader = csv.reader(infile) + new_country_continents = {rows[0]: rows[1] for rows in reader} + for c in mirrormanager2.lib.get_country_continent_redirect(session): + new_country_continents[c.country] = c.continent + return new_country_continents + + +@cache +def get_geoip(base_dir): + return geoip2.database.Reader(os.path.join(base_dir, "GeoLite2-Country.mmdb")) + + +def check_continent(config, options, session, categoryUrl): + gi = get_geoip(config["GEOIP_BASE"]) + continents = filter_continents(options["continents"]) + country_continents = get_country_continents(session) + # Before the first network access to the mirror let's + # check if continent mode is enabled and verfiy if + # the mirror is on the target continent. + # The continent check takes the first URL of the first category + # for the decision on which continent the mirror is. + try: + hostname = urlparse.urlsplit(categoryUrl)[1] + except Exception as e: + # Not being able the split the URL is strange. + # Something is broken. + raise BrokenBaseUrl() from e + + # The function urlsplit() does not remove ':' in case someone + # specified a port. Only look at the first element before ':' + hostname = hostname.split(":")[0] + + try: + hostname = socket.gethostbyname(hostname) + except Exception as e: + # Name resolution failed. This means + # that the base URL is broken. + raise BrokenBaseUrl() from e + + country = gi.country(hostname).country.iso_code + if not country: + # For hosts with no country in the GeoIP database + # the default is 'US' as that is where most of + # Fedora infrastructure systems are running + country = "US" + if country_continents[country] in continents: + return + # And another return value. '8' is used for mirrors on + # the wrong continent. The crawl should not be listed in + # the database at all. + raise WrongContinent diff --git a/mirrormanager2/crawler/crawler.py b/mirrormanager2/crawler/crawler.py new file mode 100755 index 000000000..7ec1beb0b --- /dev/null +++ b/mirrormanager2/crawler/crawler.py @@ -0,0 +1,375 @@ +import logging +import sys + +import mirrormanager2.lib +from mirrormanager2.lib.database import get_db_manager +from mirrormanager2.lib.model import HostCategoryDir + +from .connection_pool import ConnectionPool +from .continents import BrokenBaseUrl, WrongContinent, check_continent +from .reporter import Reporter +from .threads import ThreadTimeout, get_thread_id, on_thread_started + +logger = logging.getLogger("crawler") + + +class CrawlerError(Exception): + pass + + +class AllCategoriesFailed(CrawlerError): + pass + + +class NoCategory(CrawlerError): + pass + + +class CategoryNotAccessible(CrawlerError): + pass + + +def get_preferred_urls(host_category): + """return which of the hosts connection method should be used + rsync > http(s) > ftp""" + urls = [hcurl.url for hcurl in host_category.urls if hcurl.url is not None] + + def _preferred_method(url): + if url.startswith("rsync:"): + return 1 + elif url.startswith("ftp:"): + return 2 + elif url.startswith(("http:", "https:")): + return 3 + else: + return 4 + + urls.sort(key=_preferred_method) + return urls + + +class Crawler: + def __init__(self, config, session, options, host): + self.config = config + self.options = options + self.session = session + self.host = host + self.connection_pool = ConnectionPool( + config, debuglevel=2 if options["debug"] else 0, timeout_minutes=1 + ) + self.timeout = ThreadTimeout(options["timeout"]) + self.host_category_dirs = {} + + def _parent(self, directory): + parentDir = None + splitpath = directory.name.split("/") + if len(splitpath[:-1]) > 0: + parentPath = "/".join(splitpath[:-1]) + parentDir = mirrormanager2.lib.get_directory_by_name(self.session, parentPath) + return parentDir + + def add_parents(self, host_category_dirs, hc, d): + parentDir = self._parent(self.session, d) + if parentDir is not None: + if (hc, parentDir) not in host_category_dirs: + host_category_dirs[(hc, parentDir)] = None + if parentDir != hc.category.topdir: # stop at top of the category + return self.add_parents(host_category_dirs, hc, parentDir) + + return host_category_dirs + + def select_host_categories_to_scan(self, ignore_empty=False): + result = [] + if self.options["categories"]: + for category in self.options["categories"]: + hc = mirrormanager2.lib.get_host_category_by_hostid_category( + self.session, host_id=self.host.id, category=category + ) + for entry in hc: + result.append(entry) + else: + result = list(self.host.categories) + if not result and not ignore_empty: + # If the host has no categories do not auto-disable it. Just skip the host. + raise NoCategory + return result + + def crawl(self): + """This function scans all categories a host has defined. + If a RSYNC URL is available it tries to scan the host requiring + only single network connection. If this is not possible or fails + it tries to scan whole directories using FTP and if that also + fails it scans the hosts file by file using HTTP. + Canary mode only tries to determine if the mirror is up and + repodata mode only scans all the repodata/ directories.""" + self.timeout.start() + successful_categories = 0 + host_category_dirs = {} + + host_categories_to_scan = self.select_host_categories_to_scan() + + for hc in host_categories_to_scan: + self.timeout.check() + if hc.always_up2date: + successful_categories += 1 + continue + try: + result = self._scan_host_category(hc) + except CategoryNotAccessible: + result = None + else: + # Record that this host has at least one (or more) categories + # which is accessible via http or ftp + successful_categories += 1 + host_category_dirs.update(result or {}) + + self.connection_pool.close_all() + + if successful_categories == 0: + raise AllCategoriesFailed + + return host_category_dirs + + def check_for_base_dir(self, urls): + """Check if at least one of the given URL exists on the remote host. + This is used to detect mirrors which have completely dropped our content. + This is only looking at http and ftp URLs as those URLs are actually + relevant for normal access. If both tests fail the mirror will be marked + as failed during crawl. + """ + client_urls = [url for url in urls if url.startswith("http:")] + for url in client_urls: + connector = self.connection_pool.get(url) + try: + exists = connector.check_url(url) + except Exception: + exists = False + logger.exception("Could not get the base dir") + if not exists: + logger.warning("Base URL %s does not exist.", url) + continue + # The base http URL seems to work. Good! + return True + # Reaching this point means that no functional http/ftp has been + # found. This means that the mirror will not work for normal http + # and ftp users. + return False + + def _scan_host_category(self, hc): + category = hc.category + urls = get_preferred_urls(hc) + if not urls: + raise CategoryNotAccessible + + if self.options["continents"]: + # Only check for continent if something specified + # on the command-line + check_continent(self.config, self.options, self.session, urls[0]) + + if not self.check_for_base_dir(urls): + raise CategoryNotAccessible + if self.options["canary"]: + return + + category_prefix_length = len(category.topdir.name) + if category_prefix_length > 0: + category_prefix_length += 1 + + if self.options["canary"]: + logger.info("canary scanning category %s", category.name) + elif self.options["repodata"]: + logger.info("repodata scanning category %s", category.name) + else: + logger.info("scanning category %s", category.name) + + trydirs = list(hc.category.directories) + + host_category_dirs = {} + + while urls: + url = urls.pop(0, None) + if url.endswith("/"): + url = url[:-1] + + logger.info("Crawling with URL %s", url) + + # No rsync in repodata mode, we only retrive a small subset of + # existing files + if self.options["repodata"] and url.startswith("rsync:"): + continue + + connector = self.connection_pool.get(url) + dir_statuses = connector.check_category( + url, trydirs, category_prefix_length, self.timeout, self.options["repodata"] + ) + if dir_statuses is None: + continue # try next access method + for directory, dir_status in dir_statuses.items(): + host_category_dirs[(hc, directory)] = dir_status + if dir_status: + # make sure our parent dirs appear on the list too + host_category_dirs = self.add_parents(host_category_dirs, hc, directory) + else: + logger.warning("Not up2date: %s" % (directory.name)) + + # we know about the status of all files in this category + # no further checks necessary + # do the next category + return host_category_dirs + raise CategoryNotAccessible + + def sync_hcds(self, host_category_dirs): + stats = dict( + up2date=0, + not_up2date=0, + unchanged=0, + unreadable=0, + unknown=0, + newdir=0, + deleted_on_master=0, + duration=0, + ) + current_hcds = {} + stats["duration"] = self.timeout.elapsed() + keys = host_category_dirs.keys() + keys = sorted(keys, key=lambda t: t[1].name) + stats["numkeys"] = len(keys) + for hc, d in keys: + status = host_category_dirs[(hc, d)] + if status is None: + stats["unknown"] += 1 + continue + + topname = hc.category.topdir.name + toplen = len(topname) + if d.name.startswith("/"): + toplen += 1 + path = d.name[toplen:] + + hcd = mirrormanager2.lib.get_hostcategorydir_by_hostcategoryid_and_path( + self.session, host_category_id=hc.id, path=path + ) + if len(hcd) > 0: + hcd = hcd[0] + else: + # don't create HCDs for directories which aren't up2date on the + # mirror chances are the mirror is excluding that directory + if not status: + continue + hcd = HostCategoryDir(host_category_id=hc.id, path=path, directory_id=d.id) + stats["newdir"] += 1 + + if hcd.directory is None: + hcd.directory = d + if hcd.up2date != status: + hcd.up2date = status + self.session.add(hcd) + if status is False: + logger.info("Directory %s is not up-to-date on this host." % d.name) + stats["not_up2date"] += 1 + else: + logger.info(d.name) + stats["up2date"] += 1 + else: + stats["unchanged"] += 1 + + current_hcds[hcd] = True + + # In repodata mode we only want to update the files actually scanned. + # Do not mark files which have not been scanned as not being up to date. + if not self.options["repodata"]: + # now-historical HostCategoryDirs are not up2date + # we wait for a cascading Directory delete to delete this + host_categories_to_scan = self.select_host_categories_to_scan(ignore_empty=True) + for hc in host_categories_to_scan: + for hcd in list(hc.directories): + if hcd.directory is not None and not hcd.directory.readable: + stats["unreadable"] += 1 + continue + if hcd not in current_hcds: + if hcd.up2date is not False: + hcd.up2date = False + self.session.add(hcd) + stats["deleted_on_master"] += 1 + self.session.commit() + return stats + + +def crawl_and_report(options, crawler, reporter, host): + # Do not update last crawl duration in canary/repodata mode. + # This duration is completely different from the real time + # required to crawl the complete host so that it does not help + # to remember it. + record_duration = not options["repodata"] and not options["canary"] + + reporter.record_crawl_start() + try: + host_category_dirs = crawler.crawl() + except AllCategoriesFailed: + if options["canary"]: + # If running in canary mode do not auto disable mirrors + # if they have failed. + # Let's mark the complete mirror as not being up to date. + reporter.mark_not_up2date( + reason="Canary mode failed for all categories. Marking host as not up to date.", + ) + else: + # all categories have failed due to broken base URLs + # and that this host should me marked as failed during crawl + reporter.record_crawl_failure() + except TimeoutError: + reporter.mark_not_up2date( + reason="Crawler timed out before completing. Host is likely overloaded.", + ) + reporter.record_crawl_failure() + reporter.record_crawl_end(record_duration=True) + return + except WrongContinent: + logger.info("Skipping %r; wrong continent" % host) + return + except BrokenBaseUrl: + logger.info("Skipping %r; broken base URL" % host) + return + except NoCategory: + # no category to crawl found. This is to make sure, + # that host.crawl_failures is not reset to zero for crawling + # non existing categories on this host + logger.info("No categories to crawl on host %r" % host) + # No need to update the crawl duration. + record_duration = False + except Exception: + logger.exception("Unhandled exception raised.") + reporter.mark_not_up2date( + reason="Unhandled exception raised. This is a bug in the MM crawler.", + exc=sys.exc_info(), + ) + else: + # Resetting as this only counts consecutive crawl failures + reporter.reset_crawl_failures() + stats = crawler.sync_hcds(host_category_dirs) + reporter.report_stats(stats) + + reporter.record_crawl_end(record_duration=record_duration) + return "SUCCESS" + + +def worker(options, config, host_id): + db_manager = get_db_manager(config) + with db_manager.Session() as session: + host = mirrormanager2.lib.get_host(session, host_id) + on_thread_started(host_id=host_id, host_name=host.name) + if host.private and not options["include_private"]: + return + + logger.info(f"Worker {get_thread_id()!r} starting on host {host!r}") + + crawler = Crawler(config, session, options, host) + reporter = Reporter(config, session, host) + + try: + result = crawl_and_report(options, crawler, reporter, host) + except Exception: + logger.exception(f"Failure in thread {get_thread_id()!r}, host {host!r}") + session.commit() + logger.info(f"Ending crawl of {host!r}") + return result or "FAILURE" diff --git a/mirrormanager2/crawler/ftp_connector.py b/mirrormanager2/crawler/ftp_connector.py new file mode 100755 index 000000000..16033d4de --- /dev/null +++ b/mirrormanager2/crawler/ftp_connector.py @@ -0,0 +1,117 @@ +import ftplib +import logging +from contextlib import suppress +from ftplib import FTP +from urllib.parse import urlsplit + +from .connector import Connector, TryLater + +logger = logging.getLogger("crawler") + + +class FTPConnector(Connector): + def _connect(self, netloc): + conn = FTP(netloc, timeout=self.timeout) + conn.set_debuglevel(self.debuglevel) + conn.login() + return conn + + def _close(self): + with suppress(Exception): + self._connection.quit() + + def _ftp_dir(self, url): + try: + conn = self.open(url) + except Exception: + return None + scheme, netloc, path, query, fragment = urlsplit(url) + results = [] + + def _callback(line): + if self.debuglevel > 0: + logger.info(line) + results.append(line) + + conn.dir(path, _callback) + return results + + def get_ftp_dir(self, url, readable, i=0): + if i > 1: + raise TryLater() + + try: + listing = self._ftp_dir(url) + except ftplib.error_perm as e: + # Returned by Princeton University when directory does not exist + if str(e).startswith("550"): + return [] + # Returned by Princeton University when directory isn't readable + # (pre-bitflip) + if str(e).startswith("553"): + if readable: + return [] + else: + return None + # Returned by ftp2.surplux.net when cannot log in due to connection + # restrictions + if str(e).startswith("530"): + self.close_ftp(url) + return self.get_ftp_dir(url, readable, i + 1) + if str(e).startswith("500"): # Oops + raise TryLater() from e + else: + logger.error(f"unknown permanent error {e} on {url}") + raise + except ftplib.error_temp as e: + # Returned by Boston University when directory does not exist + if str(e).startswith("450"): + return [] + # Returned by Princeton University when cannot log in due to + # connection restrictions + if str(e).startswith("421"): + logger.info("Connections Exceeded %s" % url) + raise TryLater() from e + if str(e).startswith("425"): + logger.info("Failed to establish connection on %s" % url) + raise TryLater() from e + else: + logger.error(f"unknown error {e} on {url}") + raise + except (OSError, EOFError): + self.close(url) + return self.get_ftp_dir(url, readable, i + 1) + + results = {} + for line in listing: + if line.startswith("total"): + # some servers first include a line starting with the word 'total' + # that we can ignore + continue + fields = line.split() + try: + results[fields[8]] = {"size": fields[4]} + except IndexError: # line doesn't have 8 fields, it's not a dir line + pass + return results + + def compare_sha256(self, d, filename, graburl): + return True # Not implemented on FTP + + def _check_file(self, current_file_info, db_file_info): + try: + return float(current_file_info["size"]) == float(db_file_info["size"]) + except Exception: + return False + + def _check_dir(self, url, directory): + results = self.get_ftp_dir(url, directory.readable) + if results is None: + return None + + for filename in directory.files: + status = self._check_file(results[filename], directory.files[filename]) + if not status: + # Shortcut: we don't need to go over other files + return False + return True diff --git a/mirrormanager2/crawler/http.py b/mirrormanager2/crawler/http.py new file mode 100755 index 000000000..e96e22b8c --- /dev/null +++ b/mirrormanager2/crawler/http.py @@ -0,0 +1,182 @@ +import logging +from http.client import HTTPConnection, HTTPResponse, HTTPSConnection +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +class ForbiddenExpected(Exception): + pass + + +class HTTPUnknown(Exception): + pass + + +class HTTP500(Exception): + pass + + +################################################ +# overrides for http.client because we're +# handling keepalives ourself +################################################ +class myHTTPResponse(HTTPResponse): + def begin(self): + HTTPResponse.begin(self) + self.will_close = False + + def isclosed(self): + """This is a hack, because otherwise httplib will fail getresponse()""" + return True + + def keepalive_ok(self): + # HTTP/1.1 connections stay open until closed + if self.version == 11: + ka = self.msg.getheader("connection") + if ka and "close" in ka.lower(): + return False + else: + return True + + # other HTTP connections may have a connection: keep-alive header + ka = self.msg.getheader("connection") + if ka and "keep-alive" in ka.lower(): + return True + + try: + ka = self.msg.getheader("keep-alive") + if ka is not None: + maxidx = ka.index("max=") + maxval = ka[maxidx + 4 :] + if maxval == "1": + return False + return True + else: + ka = self.msg.getheader("connection") + if ka and "keep-alive" in ka.lower(): + return True + return False + except Exception: + return False + return False + + +class myHTTPConnection(HTTPConnection): + response_class = myHTTPResponse + + def end_request(self): + self.__response = None + + +class myHTTPSConnection(HTTPSConnection): + response_class = myHTTPResponse + + def end_request(self): + self.__response = None + + +################################################ +# the magic begins + + +def handle_redirect(hoststate, url, location, filedata, recursion, readable): + if recursion > 10: + raise HTTPUnknown() + if location.startswith("/"): + scheme, netloc, path, query, fragment = urlparse.urlsplit(url) + location = f"{scheme}:{netloc}{location}" + return check_head(hoststate, location, filedata, recursion + 1, readable) + + +def check_head(hoststate, url, filedata, recursion, readable, retry=0): + """Returns tuple: + True - URL exists + False - URL doesn't exist + None - we don't know + """ + + try: + conn = hoststate.open_http(url) + except Exception: + return None + + scheme, netloc, path, query, fragment = urlparse.urlsplit(url) + reqpath = path + if len(query) > 0: + reqpath += "?%s" % query + if len(fragment) > 0: + reqpath += "#%s" % fragment + + r = None + try: + conn.request( + "HEAD", + reqpath, + headers={ + "Connection": "Keep-Alive", + "Pragma": "no-cache", + "User-Agent": "mirrormanager-crawler/0.1 (+https://" + "github.com/fedora-infra/mirrormanager2/)", + }, + ) + r = conn.getresponse() + status = r.status + except Exception as e: + if retry == 0: + # If the above attempt to connect to the mirror fails, the crawler + # will retry once. One possible reason for a connection failure is + # that the connection, which is kept open to leverage keep-alive, + # has been closed by the remote end. Therefore we are closing + # the connection and are restarting the whole operation. + hoststate.close_http(url) + return check_head(hoststate, url, filedata, recursion, readable, retry=1) + else: + raise HTTPUnknown() from e + + conn.end_request() + keepalive_ok = r.keepalive_ok() + if keepalive_ok: + hoststate.keepalives_available = True + if not keepalive_ok: + hoststate.close_http(url) + + content_length = r.getheader("Content-Length") + # last_modified = r.getheader('Last-Modified') + + if status >= 200 and status < 300: + # lighttpd returns a Content-Length for directories + # apache and nginx do not + # For the basic check in check_for_base_dir() it is only + # relevant if the directory exists or not. Therefore + # passing None as filedata[]. This needs to be handled here. + if filedata is None: + # The file/directory seems to exist + return True + # fixme should check last_modified too + if float(filedata["size"]) == float(content_length) or content_length is None: + # handle no content-length header, streaming/chunked return + # or zero-length file + return True + else: + return False + if status >= 300 and status < 400: + return handle_redirect( + hoststate, url, r.getheader("Location"), filedata, recursion, readable + ) + elif status >= 400 and status < 500: + if status == 403: # forbidden + # may be a hidden dir still + if readable: + return False + else: + raise ForbiddenExpected() + elif status == 404 or status == 410: # not found / gone + return False + # we don't know + return None + elif status >= 500: + raise HTTP500() + + logger.info("status = %s" % status) + raise HTTPUnknown() diff --git a/mirrormanager2/crawler/http_connector.py b/mirrormanager2/crawler/http_connector.py new file mode 100755 index 000000000..098e2bf94 --- /dev/null +++ b/mirrormanager2/crawler/http_connector.py @@ -0,0 +1,130 @@ +import logging +from contextlib import suppress + +import requests + +from .connector import Connector, ForbiddenExpected +from .http import HTTP500, HTTPUnknown + +logger = logging.getLogger("crawler") + + +# TODO: rebase on requests + + +class HTTPConnector(Connector): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.keepalives_available = False + + def _connect(self, netloc): + session = requests.Session() + session.headers = { + "Connection": "Keep-Alive", + "Pragma": "no-cache", + "User-Agent": "mirrormanager-crawler/0.1 (+https://" + "github.com/fedora-infra/mirrormanager2/)", + } + # return myHTTPConnection(netloc, timeout=self.timeout) + + def _close(self): + self._connection.close() + + # def _get_redirect_url(self, url, location): + # if location.startswith("/"): + # scheme, netloc, path, query, fragment = urlsplit(url) + # location = f"{scheme}:{netloc}{location}" + # return location + + def check_url(self, url): + conn = self.open(url) + try: + result = self._get_content_length(conn, url, readable=True) + except (HTTPUnknown, HTTP500): + result = False + return False if (result is False or result is None) else True + + def _get_content_length(self, conn, url, readable, recursion=0, retry=0): + response = conn.head(url) + if response.ok: + return response.headers.get("Content-Length") + if response.status_code == 404 or response.status_code == 410: + # Not Found / Gone + return False + if response.status_code == 403: + # may be a hidden dir still + if readable: + return False + else: + raise ForbiddenExpected() + response.raise_for_status() + + def _check_file(self, conn, url, filedata, readable): + """Returns tuple: + True - URL exists + False - URL doesn't exist + None - we don't know + """ + try: + content_length = self._get_content_length(conn, url, readable) + except Exception: + return False + # lighttpd returns a Content-Length for directories + # apache and nginx do not + # For the basic check in check_for_base_dir() it is only + # relevant if the directory exists or not. Therefore + # passing None as filedata[]. This needs to be handled here. + if filedata is None: + # The file/directory seems to exist + return True + # fixme should check last_modified too + if content_length is not None and float(filedata["size"]) != float(content_length): + return False + + # handle no content-length header, streaming/chunked return + # or zero-length file + return True + + def _check_dir(self, url, directory): + try: + conn = self.open(url) + except Exception: + return None + for filename in directory.files: + file_url = f"{url}/{filename}" + exists = self._check_file(conn, file_url, directory.files[filename], directory.readable) + if filename == "repomd.xml" and exists: + # Additional optional check + with suppress(Exception): + exists = self.compare_sha256(directory, filename, file_url) + if exists is False or exists is None: + # Shortcut: we don't need to go over other files + return exists + return True + + def get_file(self, url): + conn = self.open(url) + try: + conn.request( + "GET", + url, + headers=self.headers, + ) + r = conn.getresponse() + status = r.status + except Exception: + return b"" + conn.end_request() + keepalive_ok = r.keepalive_ok() + if not keepalive_ok: + self.close(url) + + if status >= 300 and status < 400: + return self.get_file(r.getheader("Location")) + elif status < 200 or status >= 400: + return b"" + return r.read() + + +class HTTPSConnector(Connector): + pass diff --git a/mirrormanager2/crawler/log.py b/mirrormanager2/crawler/log.py new file mode 100755 index 000000000..a9a03f67c --- /dev/null +++ b/mirrormanager2/crawler/log.py @@ -0,0 +1,84 @@ +import logging +import os + +from .threads import get_thread_id, threadlocal + +logger = logging.getLogger("crawler") +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +master_formatter = ( + # "%(levelname)s:%(name)s:%(hosts)s:%(threads)s:%(hostid)s:%(hostname)s:%(message)s" + "%(levelname)s:%(name)s:%(hostid)s:%(hostname)s:%(message)s" +) + +current_host = 0 +all_hosts = 0 +threads = 0 +threads_active = 0 + + +# To insert information about the number of hosts and threads in each master +# log message this filter is necessary +class MasterFilter(logging.Filter): + def filter(self, record): + record.hosts = "Hosts(%d/%d)" % (current_host, all_hosts) + record.threads = "Threads(%d/%d)" % (threads_active, threads) + try: + record.hostid = threadlocal.host_id + record.hostname = threadlocal.host_name + except Exception: + record.hostid = 0 + record.hostname = "master" + return True + + +# This filter is necessary to enable logging per thread into a separate file +# Based on http://plumberjack.blogspot.de/2010/09/configuring-logging-for-web.html +class InjectingFilter(logging.Filter): + def __init__(self, thread_id): + self.thread_id = thread_id + + def filter(self, record): + try: + return threadlocal.thread_id == self.thread_id + except Exception: + return False + + +def setup_logging(debug): + logging.basicConfig(format=master_formatter) + f = MasterFilter() + logger.addFilter(f) + if debug: + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + return logger + + +def thread_file_logger(log_dir, host_id, debug): + # check if the directory exists + if log_dir is not None: + log_dir += "/crawler" + if not os.path.isdir(log_dir): + # MM_LOG_DIR/crawler seems to be configured but does not exist + # not logging + logger.warning("Directory " + log_dir + " does not exists." " Not logging per host") + log_dir = None + + log_file = None + fh = None + if log_dir is not None: + log_file = log_dir + "/" + str(host_id) + ".log" + fh = logging.FileHandler(log_file) + threadlocal.thread_id = get_thread_id() + f = InjectingFilter(get_thread_id()) + fh.addFilter(f) + + if debug: + fh.setLevel(logging.DEBUG) + else: + fh.setLevel(logging.INFO) + fh.setFormatter(formatter) + logger.addHandler(fh) + + return log_file, fh diff --git a/mirrormanager2/crawler/notif.py b/mirrormanager2/crawler/notif.py new file mode 100755 index 000000000..109872eee --- /dev/null +++ b/mirrormanager2/crawler/notif.py @@ -0,0 +1,41 @@ +import logging + +import mirrormanager2.lib + +logger = logging.getLogger("crawler") + + +def notify(options, topic, msg): + if not options["fedmsg"]: + return + + mirrormanager2.lib.notifications.fedmsg_publish( + f"mirrormanager.crawler.{topic}", + msg, + ) + + +def _get_host_names(hosts, options): + # Get a list of host names for fedmsg + return [ + host.name + for host in hosts + if (not host.id < options["startid"] and not host.id >= options["stopid"]) + ] + + +def notify_start(hosts, options): + host_names = _get_host_names(hosts, options) + hostlist = [dict(id=id, host=host) for id, host in zip(hosts, host_names)] + msg = dict(hosts=hostlist) + msg["options"] = options + notify(options, "start", msg) + + +def notify_complete(hosts, options, return_codes): + # Put a bow on the results for fedmsg + host_names = _get_host_names(hosts, options) + results = [ + dict(rc=rc, host=host, id=id) for rc, host, id in zip(return_codes, host_names, hosts) + ] + notify(options, "complete", dict(results=results)) diff --git a/mirrormanager2/crawler/propagation.py b/mirrormanager2/crawler/propagation.py new file mode 100755 index 000000000..d622e63e2 --- /dev/null +++ b/mirrormanager2/crawler/propagation.py @@ -0,0 +1,130 @@ +import hashlib +import logging + +import requests + +import mirrormanager2.lib +from mirrormanager2.lib.database import get_db_manager + +from .connection_pool import ConnectionPool +from .constants import PROPAGATION_ARCH +from .threads import get_thread_id + +logger = logging.getLogger("crawler") + + +class CrawlerError(Exception): + pass + + +class AllCategoriesFailed(CrawlerError): + pass + + +class NoCategory(CrawlerError): + pass + + +class Crawler: + def __init__(self, config, options, session): + self.config = config + self.options = options + self.session = session + self.connection_pool = ConnectionPool( + config, debuglevel=2 if options["debug"] else 0, timeout_minutes=1 + ) + + def select_host_categories_to_scan(self, host, ignore_empty=False): + result = [] + if self.options["categories"]: + for category in self.options["categories"]: + hc = mirrormanager2.lib.get_host_category_by_hostid_category( + self.session, host_id=host.id, category=category + ) + for entry in hc: + result.append(entry) + else: + result = list(host.categories) + if not result and not ignore_empty: + # If the host has no categories do not auto-disable it. Just skip the host. + raise NoCategory + return result + + def check_propagation(self, host): + repo = mirrormanager2.lib.get_repo_prefix_arch( + self.session, self.options["prop_repo"], PROPAGATION_ARCH + ) + if repo is None: + return + repo_dir = repo.directory + if repo_dir is None: + return + # now = datetime.now(tz=timezone.utc) + for hc in self.select_host_categories_to_scan(host): + # timeout_check() + category = hc.category + url = self._get_http_url(hc) + topdir = category.topdir.name + self._check_propagation(url, topdir, repo_dir) + + def _get_http_url(self, host_category): + for url in host_category.urls: + if url.startswith("http:"): + if not url.endswith("/"): + url += "/" + return url + + def _check_propagation(self, url, repo_dir, topdir): + # Print out information about the repomd.xml status + path = repo_dir.name + if repo_dir.name.startswith(topdir): + path = repo_dir.name[len(topdir) + 1 :] + fd = mirrormanager2.lib.get_file_detail( + self.session, "repomd.xml", repo_dir.id, reverse=True + ) + + url = f"{url}{path}/repodata/repomd.xml" + try: + contents = requests.get(url, timeout=30) + except requests.exceptions.ConnectionError: + logger.info( + "URL::{}::SHA256::{}::{}::{}::503::{}".format( + url, "NOSUM", "check_start", fd.sha256, path + ) + ) + return None + has = hashlib.sha256() + has.update(contents.content) + csum = has.hexdigest() + logger.info( + f"URL::{url}::SHA256::{csum}::check_start::{fd.sha256}::{contents.status_code}::{path}" + ) + + +def check_propagation(session, options, crawler, host): + crawler.check_propagation(host) + + +def worker(options, config, host_id): + db_manager = get_db_manager(config) + with db_manager.Session() as session: + host = mirrormanager2.lib.get_host(session, host_id) + if host.private and not options["include_private"]: + return 1 # TODO: use some sort of ScanResult enum + + logger.info(f"Worker {get_thread_id()!r} starting on host {host!r}") + + crawler = Crawler(config, options, session) + # reporter = Reporter(config, session, host) + + try: + crawler.check_propagation(host) + except Exception: + logger.exception(f"Failure in thread {get_thread_id()!r}, host {host!r}") + session.commit() + + logger.info(f"Ending propagation crawl of {host!r}") + # if fh: + # logger.removeHandler(fh) + # fh.close() + # gc.collect() diff --git a/mirrormanager2/crawler/reporter.py b/mirrormanager2/crawler/reporter.py new file mode 100755 index 000000000..bb56f780c --- /dev/null +++ b/mirrormanager2/crawler/reporter.py @@ -0,0 +1,164 @@ +import logging +import smtplib +import time +from datetime import datetime, timedelta, timezone + +from mirrormanager2.lib import get_file_details_with_checksum + +from .threads import threadlocal + +logger = logging.getLogger("crawler") + + +class Reporter: + def __init__(self, config, session, host): + self.config = config + self.session = session + self.host = host + self.host_failed = False + + def send_email(self, report_str, exc): + if not self.config.get("CRAWLER_SEND_EMAIL", False): + return + + SMTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z" + msg = """From: {} + To: {} + Subject: {} MirrorManager crawler report + Date: {} + + """.format( + self.config.get("EMAIL_FROM"), + self.config.get("ADMIN_EMAIL"), + self.host.name, + time.strftime(SMTP_DATE_FORMAT), + ) + + msg += report_str + "\n" + msg += "Log can be found at {}/{}.log\n".format( + self.config.get("crawler.logdir"), str(self.host.id) + ) + if exc is not None: + msg += f"Exception info: type {exc[0]}; value {exc[1]}\n" + msg += str(exc[2]) + try: + smtp = smtplib.SMTP(self.config.get("SMTP_SERVER")) + + username = self.config.get("SMTP_USERNAME") + password = self.config.get("SMTP_PASSWORD") + + if username and password: + smtp.login(username, password) + + smtp.sendmail(self.config.get("SMTP_SERVER"), self.config.get("ADMIN_EMAIL"), msg) + except Exception: + logger.exception("Error sending email") + logger.debug("Email message follows:") + logger.debug(msg) + + try: + smtp.quit() + except Exception: + pass + + def mark_not_up2date(self, reason="Unknown", exc=None): + """This function marks a complete host as not being up to date. + It usually is called if the scan of a single category has failed. + This is something the crawler does at multiple places: Failure + in the scan of a single category disables the complete host.""" + # Watch out: set_not_up2date(session) is commiting all changes + # in this thread to the database + self.host_failed = True + self.host.set_not_up2date(self.session) + msg = f"Host {self.host.id} marked not up2date: {reason}" + logger.warning(msg) + if exc is not None: + logger.debug(f"{exc[0]} {exc[1]} {exc[2]}") + self.send_email(msg, exc) + + def record_crawl_failure(self): + self.host_failed = True + try: + self.host.crawl_failures += 1 + except TypeError: + self.host.crawl_failures = 1 + + auto_disable = self.config.get("CRAWLER_AUTO_DISABLE", 4) + if self.host.crawl_failures >= auto_disable: + self.host.disable_reason = ( + "Host has been disabled (user_active) after %d" + " consecutive crawl failures" % auto_disable + ) + self.host.user_active = False + + def record_crawl_start(self): + threadlocal.starttime = time.monotonic() + + def record_crawl_end(self, record_duration=True): + self.host.last_crawled = datetime.now(tz=timezone.utc) + last_crawl_duration = time.monotonic() - threadlocal.starttime + if record_duration: + self.host.last_crawl_duration = last_crawl_duration + + def reset_crawl_failures(self): + self.host.crawl_failures = 0 + + def report_stats(self, stats): + logger.info("Crawl results for %s", self.host.name) + msg = "Crawl duration: %d seconds" % stats["duration"] + logger.info(msg) + msg = "Total directories: %d" % stats["numkeys"] + logger.info(msg) + msg = "Unreadable directories: %d" % stats["unreadable"] + logger.info(msg) + msg = "Changed to up2date: %d" % stats["up2date"] + logger.info(msg) + msg = "Changed to not up2date: %d" % stats["not_up2date"] + logger.info(msg) + msg = "Unchanged: %d" % stats["unchanged"] + logger.info(msg) + msg = "Unknown disposition: %d" % stats["unknown"] + logger.info(msg) + msg = "New HostCategoryDirs created: %d" % stats["newdir"] + logger.info(msg) + msg = ( + "HostCategoryDirs now deleted on the master, marked not " + "up2date: %d" % stats["deleted_on_master"] + ) + logger.info(msg) + + +class PropagationReporter: + def __init__(self, config, session, version): + self.config = config + self.session = session + self.version = version + self._propagation = { + "same_day": 0, + "one_day": 0, + "two_day": 0, + "older": 0, + "no_info": 0, + } + + def record_propagation(self, file_detail, checksum): + if checksum is None: + self._propagation["no_info"] += 1 + return + today = datetime.today() + age_threshold = today - timedelta(days=5) + previous_file_detail = get_file_details_with_checksum( + self.session, file_detail, checksum, age_threshold + ) + if previous_file_detail is None: + self._propagation["older"] += 1 + return + previous_ts = datetime.fromtimestamp(previous_file_detail.timestamp, tz=timezone.utc) + if today - previous_ts > timedelta(days=3): + self._propagation["older"] += 1 + elif today - previous_ts > timedelta(days=2): + self._propagation["two_day"] += 1 + elif today - previous_ts > timedelta(days=1): + self._propagation["one_day"] += 1 + else: + self._propagation["same_day"] += 1 diff --git a/mirrormanager2/crawler/rsync_connector.py b/mirrormanager2/crawler/rsync_connector.py new file mode 100755 index 000000000..6328ea31f --- /dev/null +++ b/mirrormanager2/crawler/rsync_connector.py @@ -0,0 +1,117 @@ +import logging +import os +import time + +from mirrormanager2.lib.sync import run_rsync + +from .connector import Connector + +logger = logging.getLogger("crawler") + + +class RsyncConnector(Connector): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._scan_result = None + + def _run(self, url): + if not url.endswith("/"): + url += "/" + rsync_start_time = time.monotonic() + try: + result, listing = run_rsync( + url, self.config["CRAWLER_RSYNC_PARAMETERS"], logger, int(self.timeout * 0.9) + ) + except Exception: + logger.exception("Failed to run rsync.", exc_info=True) + return False + rsync_stop_time = time.monotonic() + logger.info("rsync time: %s", rsync_stop_time - rsync_start_time) + if result == 10: + # no rsync content, fail! + logger.warning( + "Connection to %s Refused. Please check that the URL is " + "correct and that the host has an rsync module still available.", + url, + ) + return False + if result > 0: + logger.info("rsync returned exit code %d" % result) + + rsync = {} + # put the rsync listing in a dict for easy access + while True: + line = listing.readline() + if not line: + break + fields = line.split() + try: + rsync[fields[4]] = { + "mode": fields[0], + "size": fields[1], + "date": fields[2], + "time": fields[3], + } + except IndexError: + logger.debug("invalid rsync line: %s\n" % line) + + # run_rsync() returns a temporary file which needs to be closed + listing.close() + + logger.debug("rsync listing has %d lines" % len(rsync)) + return rsync + + def _check_file(self, current_file_info, db_file_info): + if current_file_info["mode"].startswith("l"): + # ignore symlink size differences + return True + + try: + return float(current_file_info["size"]) != float(db_file_info["size"]) + except ValueError: # one of the conversion to float() failed + logger.debug("Invalid size value for file %s", current_file_info) + return False + + def _check_dir(self, dirname, directory): + for filename in sorted(directory.files): + if len(dirname) == 0: + key = filename + else: + key = os.path.join(dirname, filename) + + logger.debug("trying with key %s", key) + try: + current_file_info = self._scan_result[filename] + except KeyError: # file is not in the rsync listing + logger.debug("Missing remote file %s", key) + return False + + try: + status = self._check_file(current_file_info, directory.files[filename]) + if not status: + # Shortcut: we don't need to go over other files + return False + except Exception: # something else went wrong + logger.exception("Exception caught when scanning %s", filename) + return False + + return True + + def _get_dir_url(self, url, directory, category_prefix_length): + # We don't need the whole URL, the scan has already been done + return directory.name[category_prefix_length:] + + def check_category( + self, + url, + trydirs, + category_prefix_length, + timeout, + only_repodata, + ): + # Scan once for the entire category + self._scan_result = self._run(url) + if not self._scan_result: + # no rsync content, fail! + return None + return super().check_category(url, trydirs, category_prefix_length, timeout, only_repodata) diff --git a/mirrormanager2/crawler/threads.py b/mirrormanager2/crawler/threads.py new file mode 100755 index 000000000..0515b6315 --- /dev/null +++ b/mirrormanager2/crawler/threads.py @@ -0,0 +1,71 @@ +import hashlib +import logging +import signal +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial + +from .constants import THREAD_TIMEOUT + +logger = logging.getLogger(__name__) + +# This is the global timeout variable, so that it can be +# decreased by the signal handler and thus change in all threads. +timeout = 120 + +# Variable used to coordinate graceful shutdown of all threads +shutdown = False + + +# This is a "thread local" object that allows us to store the start time of +# each worker thread (so they can measure and check if they should time out or +# not...) +threadlocal = threading.local() + + +def get_thread_id(): + """Silly util that returns a git-style short-hash id of the thread.""" + return hashlib.md5(str(threading.current_thread().ident).encode("ascii")).hexdigest()[:7] + + +def sigalrm_handler(threadpool, signal, stackframe): + logger.warning("Received SIGALRM. Shutting down thread pool.") + threadpool.shutdown(wait=False) + + +def on_thread_started(host_id, host_name): + threadlocal.host_id = host_id + threadlocal.host_name = host_name + + +# run_in_threadpool(partial(worker, options, config), hosts) +def run_in_threadpool(fn, iterable, max_threads, timeout): + # Then create a threadpool to handle as many at a time as we like + # threadpool = multiprocessing.pool.ThreadPool(processes=threads) + threadpool = ThreadPoolExecutor(max_workers=max_threads) + + signal.signal(signal.SIGALRM, partial(sigalrm_handler, threadpool)) + + results = [] + with threadpool: + futures = threadpool.map(fn, iterable, timeout=timeout) + for future in as_completed(futures): + results.append(future.result()) + return results + + +class ThreadTimeout: + def __init__(self, max_duration=THREAD_TIMEOUT): + self.max_duration = max_duration + + def start(self): + threadlocal.starttime = time.monotonic() + + def check(self): + elapsed = self.elapsed() + if elapsed > (THREAD_TIMEOUT * 60): + raise TimeoutError(f"Thread {get_thread_id()} timed out after {elapsed}s") + + def elapsed(self): + return time.monotonic() - threadlocal.starttime diff --git a/mirrormanager2/lib/__init__.py b/mirrormanager2/lib/__init__.py index e60b2e3da..86e2a084c 100644 --- a/mirrormanager2/lib/__init__.py +++ b/mirrormanager2/lib/__init__.py @@ -772,6 +772,23 @@ def get_file_details(session): return query.all() +def get_file_details_with_checksum(session, file_detail, checksum, age_threshold): + if len(checksum) != 64: + # Only SHA256 is supported yet. + return None + query = ( + session.query(model.FileDetail) + .filter( + model.FileDetail.directory_id == file_detail.directory_id, + model.FileDetail.filename == file_detail.filename, + model.FileDetail.sha256 == checksum, + model.FileDetail.timestamp > int(age_threshold.timestamp()), + ) + .order_by(model.FileDetail.timestamp) + ) + return query.last() + + def get_directories(session): """Return all Directory in the database. diff --git a/pyproject.toml b/pyproject.toml index 7eed7198f..0e04e84b9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ responses = "^0.23.3" report_mirror = { reference = "client/report_mirror", type = "file" } mirrorlist_statistics = { reference = "mirrorlist/mirrorlist_statistics", type = "file" } mm2_crawler = "mirrormanager2.utility.crawler:main" +mm2_crawler2 = "mirrormanager2.crawler.cli:main" mm2_emergency-expire-repo = "mirrormanager2.utility.emergency_expire_repo:main" mm2_generate-worldmap = "mirrormanager2.utility.generate_worldmap:main" mm2_get_global_netblocks = { reference = "utility/mm2_get_global_netblocks", type = "file" }