Skip to content

Commit

Permalink
WIP: crawler refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed Feb 1, 2024
1 parent c9576c9 commit 7f2ef60
Show file tree
Hide file tree
Showing 18 changed files with 1,771 additions and 1 deletion.
Empty file.
239 changes: 239 additions & 0 deletions mirrormanager2/crawler/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import logging
import os
import time

import click
from tqdm.auto import tqdm as TQDM
from tqdm.contrib.concurrent import ensure_lock

import mirrormanager2.lib
from mirrormanager2.lib.database import get_db_manager

from ..utility.common import read_config
from .constants import CONTINENTS
from .crawler import worker
from .log import setup_logging
from .threads import run_in_threadpool

logger = logging.getLogger("crawler")

# def notify(options, topic, msg):
# if not options["fedmsg"]:
# return
#
# mirrormanager2.lib.notifications.fedmsg_publish(
# f"mirrormanager.crawler.{topic}",
# msg,
# )


def validate_continents(ctx, param, value):
value = [c.upper() for c in value]
for item in value:
cont = item.lstrip("^")
if cont not in CONTINENTS:
raise click.BadParameter(
f"Unknown continent {cont}. Known continents: {', '.join(CONTINENTS)}"
)
return value


@click.group()
@click.option(
"-c",
"--config",
envvar="MM2_CONFIG",
default="/etc/mirrormanager/mirrormanager2.cfg",
help="Configuration file to use",
)
@click.option(
"--include-private",
is_flag=True,
default=False,
help="Include hosts marked 'private' in the crawl",
)
@click.option(
"-t",
"--threads",
type=int,
default=10,
help="max threads to start in parallel",
)
@click.option(
"--timeout-minutes",
"global_timeout",
type=int,
default=120,
callback=lambda ctx, param, value: value * 60,
help="global timeout, in minutes",
)
@click.option(
"--host-timeout",
"host_timeout",
type=int,
default=30,
help="host timeout, in seconds",
)
@click.option(
"--startid",
type=int,
metavar="ID",
default=0,
help="Start crawling at host ID (default=0)",
)
@click.option(
"--stopid",
type=int,
metavar="ID",
default=0,
help="Stop crawling before host ID (default=no limit)",
)
@click.option(
"--category",
"categories",
multiple=True,
help="Category to scan (default=all), can be repeated",
)
@click.option(
"--disable-fedmsg",
"fedmsg",
is_flag=True,
default=True,
help="Disable fedora-messaging notifications at the beginning and end of crawl",
)
@click.option(
"--canary",
is_flag=True,
default=False,
help="Fast crawl by only checking if mirror can be reached",
)
@click.option(
"--repodata",
is_flag=True,
default=False,
help="Fast crawl by only checking if the repodata is up to date",
)
@click.option(
"--continent",
"continents",
multiple=True,
callback=validate_continents,
help="Limit crawling by continent. Exclude by prefixing with'^'",
)
@click.option(
"--debug",
"-d",
is_flag=True,
default=False,
help="enable printing of debug-level messages",
)
@click.pass_context
def main(ctx, config, debug, startid, stopid, **kwargs):
ctx.ensure_object(dict)
setup_logging(debug)
ctx.obj["options"] = ctx.params
config = read_config(config)
ctx.obj["config"] = config
db_manager = get_db_manager(config)
session = db_manager.Session()

# Get *all* of the mirrors
hosts = mirrormanager2.lib.get_mirrors(
session,
private=False,
order_by_crawl_duration=True,
admin_active=True,
user_active=True,
site_private=False,
site_user_active=True,
site_admin_active=True,
)

# # Get a list of host names for fedmsg
# host_names = [
# host.name
# for host in hosts
# if (not host.id < options["startid"] and not host.id >= options["stopid"])
# ]

# Limit our host list down to only the ones we really want to crawl
hosts = [host for host in hosts if (host.id >= startid and (not stopid or host.id < stopid))]
ctx.obj["hosts"] = hosts

session.close()


@main.command()
@click.pass_context
def crawl(ctx):
options = ctx.obj["options"]
config = ctx.obj["config"]
starttime = time.monotonic()
host_ids = [host.id for host in ctx.obj["hosts"]]

# And then, for debugging, only do one host
# hosts = [hosts.next()]

# hostlist = [dict(id=id, host=host) for id, host in zip(hosts, host_names)]
# msg = dict(hosts=hostlist)
# msg["options"] = options
# notify(options, "start", msg)

# Before we do work, chdir to /var/tmp/. mirrormanager1 did this and I'm
# not totally sure why...
os.chdir("/var/tmp")

results = []
with ensure_lock(TQDM) as tqdm_lock:
with TQDM(total=len(host_ids), unit="mirrors") as progress_bar:
futures = run_in_threadpool(
worker,
host_ids,
fn_args=(options, config),
timeout=options["global_timeout"],
executor_kwargs={
"max_workers": options["threads"],
"initializer": TQDM.set_lock,
"initargs": (tqdm_lock,),
},
)
for future in futures:
progress_bar.update()
results.append(future)

# # Put a bow on the results for fedmsg
# results = [
# dict(result=result, host=host.name, id=host.id)
# for result, host in zip(results, ctx.obj["hosts"])
# ]
# notify(options, "complete", dict(results=results))
logger.info(repr(results))

# if options["canary"]:
# mode = " in canary mode"
# elif options["repodata"]:
# mode = " in repodata mode"
# else:
# mode = ""
# logger.info("%d of %d hosts failed%s" % (hosts_failed, current_host, mode))
logger.info("Crawler finished after %d seconds", (time.monotonic() - starttime))
return results


@main.command()
@click.option(
"--prop-repo",
default="rawhide",
help="Repository prefix to use for propagation. Defaults to 'rawhide'.",
)
@click.pass_context
def propagation(ctx, prop_repo):
"""Print out information about repomd.xml propagation.
Defaults to development/rawhide/x86_64/os/repodata
Only the category 'Fedora Linux' is supported
"""
print(ctx.obj)
print(ctx.params)
# options = ctx.obj["options"]
pass
58 changes: 58 additions & 0 deletions mirrormanager2/crawler/connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from functools import partial
from urllib.parse import urlsplit

from .ftp_connector import FTPConnector
from .http_connector import HTTPConnector, HTTPSConnector
from .rsync_connector import RsyncConnector

logger = logging.getLogger("crawler")


def _get_connection_class(scheme):
if scheme == "http":
return HTTPConnector
if scheme == "https":
return HTTPSConnector
if scheme == "ftp":
return FTPConnector
if scheme == "rsync":
return RsyncConnector


class ConnectionPool:
def __init__(self, config, debuglevel=0, timeout=600):
self._connections = {}
self.config = config
self.debuglevel = debuglevel
self.timeout = timeout

def get(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
if netloc not in self._connections:
connection_class = _get_connection_class(scheme)
self._connections[(scheme, netloc)] = connection_class(
config=self.config,
debuglevel=self.debuglevel,
timeout=self.timeout,
on_closed=partial(self._remove_connection, scheme, netloc),
)
# self._connections[(scheme, netloc)] = self._connect(netloc)
# self._connections[(scheme, netloc)].set_debuglevel(self.debuglevel)
return self._connections[(scheme, netloc)]

def close(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
try:
connection = self._connections[(scheme, netloc)]
except KeyError:
return
connection.close()

def _remove_connection(self, scheme, netloc, connection):
del self._connections[(scheme, netloc)]

def close_all(self):
for connection in list(self._connections.values()):
connection.close()
self._connections = {}
Loading

0 comments on commit 7f2ef60

Please sign in to comment.