Skip to content

Commit

Permalink
WIP: crawler refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed Jan 31, 2024
1 parent c9576c9 commit 37c0bff
Show file tree
Hide file tree
Showing 18 changed files with 1,912 additions and 0 deletions.
Empty file.
221 changes: 221 additions & 0 deletions mirrormanager2/crawler/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import logging
import os
import time
from functools import partial

import click

import mirrormanager2.lib
from mirrormanager2.lib.database import get_db_manager

from ..utility.common import read_config
from .constants import CONTINENTS
from .crawler import worker
from .log import setup_logging
from .threads import run_in_threadpool

logger = logging.getLogger("crawler")

# def notify(options, topic, msg):
# if not options["fedmsg"]:
# return
#
# mirrormanager2.lib.notifications.fedmsg_publish(
# f"mirrormanager.crawler.{topic}",
# msg,
# )


def validate_continents(ctx, param, value):
value = [c.upper() for c in value]
for item in value:
cont = item.lstrip("^")
if cont not in CONTINENTS:
raise click.BadParameter(
f"Unknown continent {cont}. Known continents: {', '.join(CONTINENTS)}"
)
return value


@click.group()
@click.option(
"-c",
"--config",
envvar="MM2_CONFIG",
default="/etc/mirrormanager/mirrormanager2.cfg",
help="Configuration file to use",
)
@click.option(
"--include-private",
is_flag=True,
default=False,
help="Include hosts marked 'private' in the crawl",
)
@click.option(
"-t",
"--threads",
type=int,
default=10,
help="max threads to start in parallel",
# callback=set_global,
)
@click.option(
"--timeout-minutes",
"timeout",
type=int,
default=120,
help="per-host timeout, in minutes",
# callback=set_global,
)
@click.option(
"--startid",
type=int,
metavar="ID",
default=0,
help="Start crawling at host ID (default=0)",
)
@click.option(
"--stopid",
type=int,
metavar="ID",
default=0,
help="Stop crawling before host ID (default=no limit)",
)
@click.option(
"--category",
"categories",
multiple=True,
help="Category to scan (default=all), can be repeated",
)
@click.option(
"--disable-fedmsg",
"fedmsg",
is_flag=True,
default=True,
help="Disable fedora-messaging notifications at the beginning and end of crawl",
)
@click.option(
"--canary",
is_flag=True,
default=False,
help="Fast crawl by only checking if mirror can be reached",
)
@click.option(
"--repodata",
is_flag=True,
default=False,
help="Fast crawl by only checking if the repodata is up to date",
)
@click.option(
"--continent",
"continents",
multiple=True,
callback=validate_continents,
help="Limit crawling by continent. Exclude by prefixing with'^'",
)
@click.option(
"--debug",
"-d",
is_flag=True,
default=False,
help="enable printing of debug-level messages",
)
@click.pass_context
def main(ctx, config, debug, startid, stopid, **kwargs):
ctx.ensure_object(dict)
setup_logging(debug)
ctx.obj["options"] = ctx.params
config = read_config(config)
ctx.obj["config"] = config
db_manager = get_db_manager(config)
session = db_manager.Session()

# Get *all* of the mirrors
hosts = mirrormanager2.lib.get_mirrors(
session,
private=False,
order_by_crawl_duration=True,
admin_active=True,
user_active=True,
site_private=False,
site_user_active=True,
site_admin_active=True,
)

# # Get a list of host names for fedmsg
# host_names = [
# host.name
# for host in hosts
# if (not host.id < options["startid"] and not host.id >= options["stopid"])
# ]

# Limit our host list down to only the ones we really want to crawl
hosts = [host for host in hosts if (host.id >= startid and (not stopid or host.id < stopid))]
ctx.obj["hosts"] = hosts

session.close()


@main.command()
@click.pass_context
def crawl(ctx):
options = ctx.obj["options"]
config = ctx.obj["config"]
starttime = time.monotonic()
host_ids = [host.id for host in ctx.obj["hosts"]]

# And then, for debugging, only do one host
# hosts = [hosts.next()]

# hostlist = [dict(id=id, host=host) for id, host in zip(hosts, host_names)]
# msg = dict(hosts=hostlist)
# msg["options"] = options
# notify(options, "start", msg)

# Before we do work, chdir to /var/tmp/. mirrormanager1 did this and I'm
# not totally sure why...
os.chdir("/var/tmp")

results = run_in_threadpool(
partial(worker, options, config),
host_ids,
max_threads=options["threads"],
timeout=options["timeout"],
)

# Put a bow on the results for fedmsg
results = [
dict(result=result, host=host.name, id=host.id)
for result, host in zip(results, ctx.obj["hosts"])
]
# notify(options, "complete", dict(results=results))
print(results)

# if options["canary"]:
# mode = " in canary mode"
# elif options["repodata"]:
# mode = " in repodata mode"
# else:
# mode = ""
# logger.info("%d of %d hosts failed%s" % (hosts_failed, current_host, mode))
logger.info("Crawler finished after %d seconds", (time.monotonic() - starttime))
return results


@main.command()
@click.option(
"--prop-repo",
default="rawhide",
help="Repository prefix to use for propagation. Defaults to 'rawhide'.",
)
@click.pass_context
def propagation(ctx, prop_repo):
"""Print out information about repomd.xml propagation.
Defaults to development/rawhide/x86_64/os/repodata
Only the category 'Fedora Linux' is supported
"""
print(ctx.obj)
print(ctx.params)
# options = ctx.obj["options"]
pass
54 changes: 54 additions & 0 deletions mirrormanager2/crawler/connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from functools import partial
from urllib.parse import urlsplit

from .ftp_connector import FTPConnector
from .http_connector import HTTPConnector, HTTPSConnector

logger = logging.getLogger("crawler")


def _get_connection_class(scheme):
if scheme == "http":
return HTTPConnector
if scheme == "https":
return HTTPSConnector
if scheme == "ftp":
return FTPConnector


class ConnectionPool:
def __init__(self, config, debuglevel=0, timeout_minutes=120):
self._connections = {}
self.config = config
self.debuglevel = debuglevel
self.timeout_minutes = timeout_minutes

def get(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
if netloc not in self._connections:
connection_class = _get_connection_class(scheme)
self._connections[netloc] = connection_class(
debuglevel=self.debuglevel,
timeout_minutes=self.timeout_minutes,
on_closed=partial(self._remove_connection, netloc),
)
# self._connections[netloc] = self._connect(netloc)
# self._connections[netloc].set_debuglevel(self.debuglevel)
return self._connections[netloc]

def close(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
try:
connection = self._connections[netloc]
except KeyError:
return
connection.close()

def _remove_connection(self, netloc, connection):
del self._connections[netloc]

def close_all(self):
for connection in list(self._connections.values()):
connection.close()
self._connections = {}
103 changes: 103 additions & 0 deletions mirrormanager2/crawler/connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import hashlib
import logging
from urllib.parse import urlsplit

logger = logging.getLogger("crawler")


class TryLater(Exception):
pass


class ForbiddenExpected(Exception):
pass


class Connector:
scheme = None

def __init__(self, debuglevel, timeout_minutes, on_closed):
self.debuglevel = debuglevel
# ftplib and httplib take the timeout in seconds
self.timeout = timeout_minutes * 60
self._connection = None
self._on_closed = on_closed

def open(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
self._connection = self._connect(netloc)
self._connection.set_debuglevel(self.debuglevel)
return self._connection

def close(self):
if self._connection is not None:
self._close()
self._on_closed(self)

def _connect(self, url):
raise NotImplementedError

def _close(self):
raise NotImplementedError

def _get_file(self, url):
raise NotImplementedError

# TODO: backoff on TryAgain with message
# f"Server load exceeded on {host!r} - try later ({try_later_delay} seconds)"
def check_dir(self, url, directory):
return self._check_dir(url, directory)

def _check_dir(self, url, directory):
raise NotImplementedError

def compare_sha256(self, directory, filename, graburl):
"""looks for a FileDetails object that matches the given URL"""
found = False
contents = self._get_file(graburl)
sha256 = hashlib.sha256(contents).hexdigest()
for fd in list(directory.fileDetails):
if fd.filename == filename and fd.sha256 is not None:
if fd.sha256 == sha256:
found = True
break
return found

def _get_dir_url(self, url, directory, category_prefix_length):
dirname = directory.name[category_prefix_length:]
return f"{url}/{dirname}"

def check_category(
self,
url,
trydirs,
category_prefix_length,
timeout,
only_repodata,
):
statuses = {}
for d in trydirs:
timeout.check()

if not d.readable:
continue

# d.files is a dict which contains the last (maybe 10) files
# of the current directory. umdl copies the pickled dict
# into the database. It is either a dict or nothing.
if not isinstance(d.files, dict):
continue

if only_repodata and not d.name.endswith("/repodata"):
continue

dir_url = self._get_dir_url(url, d, category_prefix_length)

dir_status = self.check_dir(dir_url, d)
if dir_status is None:
# could be a dir with no files, or an unreadable dir.
# defer decision on this dir, let a child decide.
return
statuses[d] = dir_status

return statuses
11 changes: 11 additions & 0 deletions mirrormanager2/crawler/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# hard coded list of continents; let's hope this does not change all the time
# this is according to GeoIP
CONTINENTS = ["AF", "AN", "AS", "EU", "NA", "OC", "SA", "--"]

# propagation URLs
PROPAGATION_ARCH = "x86_64"

# number of minutes to wait if a signal is received to shutdown the crawler
SHUTDOWN_TIMEOUT = 5

THREAD_TIMEOUT = 120 # minutes
Loading

0 comments on commit 37c0bff

Please sign in to comment.