Skip to content

Commit

Permalink
WIP: crawler refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed Feb 2, 2024
1 parent c9576c9 commit 1acf9cb
Show file tree
Hide file tree
Showing 18 changed files with 1,947 additions and 1 deletion.
Empty file.
315 changes: 315 additions & 0 deletions mirrormanager2/crawler/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
import logging
import os
import time

import click
from rich.console import Console
from rich.progress import Progress
from rich.table import Table

import mirrormanager2.lib
from mirrormanager2.lib.database import get_db_manager

from ..utility.common import read_config
from .constants import CONTINENTS
from .crawler import CrawlResult, worker
from .log import setup_logging
from .threads import run_in_threadpool

logger = logging.getLogger("crawler")

# def notify(options, topic, msg):
# if not options["fedmsg"]:
# return
#
# mirrormanager2.lib.notifications.fedmsg_publish(
# f"mirrormanager.crawler.{topic}",
# msg,
# )
DEFAULT_THREAD_COUNT = min(100, os.cpu_count() * 20)


def validate_continents(ctx, param, value):
value = [c.upper() for c in value]
for item in value:
cont = item.lstrip("^")
if cont not in CONTINENTS:
raise click.BadParameter(
f"Unknown continent {cont}. Known continents: {', '.join(CONTINENTS)}"
)
return value


@click.group()
@click.option(
"-c",
"--config",
envvar="MM2_CONFIG",
default="/etc/mirrormanager/mirrormanager2.cfg",
help="Configuration file to use",
)
@click.option(
"--include-private",
is_flag=True,
default=False,
help="Include hosts marked 'private' in the crawl",
)
@click.option(
"-t",
"--threads",
type=int,
default=DEFAULT_THREAD_COUNT,
help="max threads to start in parallel",
)
@click.option(
"--timeout-minutes",
"global_timeout",
type=int,
default=120,
callback=lambda ctx, param, value: value * 60,
help="global timeout, in minutes",
)
@click.option(
"--host-timeout",
"host_timeout",
type=int,
default=30,
help="host timeout, in seconds",
)
@click.option(
"--startid",
type=int,
metavar="ID",
default=0,
help="Start crawling at host ID (default=0)",
)
@click.option(
"--stopid",
type=int,
metavar="ID",
default=0,
help="Stop crawling before host ID (default=no limit)",
)
@click.option(
"--category",
"categories",
multiple=True,
help="Category to scan (default=all), can be repeated",
)
@click.option(
"--disable-fedmsg",
"fedmsg",
is_flag=True,
default=True,
help="Disable fedora-messaging notifications at the beginning and end of crawl",
)
@click.option(
"--continent",
"continents",
multiple=True,
callback=validate_continents,
help="Limit crawling by continent. Exclude by prefixing with'^'",
)
@click.option(
"--debug",
"-d",
is_flag=True,
default=False,
help="enable printing of debug-level messages",
)
@click.pass_context
def main(ctx, config, debug, startid, stopid, **kwargs):
ctx.ensure_object(dict)
ctx.obj["console"] = Console()

setup_logging(debug, ctx.obj["console"])
ctx.obj["options"] = ctx.params
config = read_config(config)
ctx.obj["config"] = config
db_manager = get_db_manager(config)
session = db_manager.Session()

# Get *all* of the mirrors
hosts = mirrormanager2.lib.get_mirrors(
session,
private=False,
order_by_crawl_duration=True,
admin_active=True,
user_active=True,
site_private=False,
site_user_active=True,
site_admin_active=True,
)

# # Get a list of host names for fedmsg
# host_names = [
# host.name
# for host in hosts
# if (not host.id < options["startid"] and not host.id >= options["stopid"])
# ]

# Limit our host list down to only the ones we really want to crawl
hosts = [host for host in hosts if (host.id >= startid and (not stopid or host.id < stopid))]
ctx.obj["hosts"] = hosts

session.close()


@main.command()
@click.option(
"--canary",
is_flag=True,
default=False,
help="Fast crawl by only checking if mirror can be reached",
)
@click.option(
"--repodata",
is_flag=True,
default=False,
help="Fast crawl by only checking if the repodata is up to date",
)
@click.pass_context
def crawl(ctx, **kwargs):
options = ctx.obj["options"]
options.update(ctx.params)
config = ctx.obj["config"]
starttime = time.monotonic()
host_ids = [host.id for host in ctx.obj["hosts"]]

# And then, for debugging, only do one host
# hosts = [hosts.next()]

# hostlist = [dict(id=id, host=host) for id, host in zip(hosts, host_names)]
# msg = dict(hosts=hostlist)
# msg["options"] = options
# notify(options, "start", msg)

# Before we do work, chdir to /var/tmp/. mirrormanager1 did this and I'm
# not totally sure why...
os.chdir("/var/tmp")

results = []
try:
with Progress(console=ctx.obj["console"], refresh_per_second=2) as progress:
task_global = progress.add_task(
f"Crawling {len(host_ids)} mirrors", total=len(host_ids)
)
futures = run_in_threadpool(
worker,
host_ids,
fn_args=(options, config, progress),
timeout=options["global_timeout"],
executor_kwargs={
"max_workers": options["threads"],
},
)
for future in futures:
progress.advance(task_global)
results.append(future)
except (KeyboardInterrupt, TimeoutError):
raise
else:
report_crawl(ctx.obj["console"], options, results)

# # Put a bow on the results for fedmsg
# results = [
# dict(result=result, host=host.name, id=host.id)
# for result, host in zip(results, ctx.obj["hosts"])
# ]
# notify(options, "complete", dict(results=results))

# if options["canary"]:
# mode = " in canary mode"
# elif options["repodata"]:
# mode = " in repodata mode"
# else:
# mode = ""
# logger.info("%d of %d hosts failed%s" % (hosts_failed, current_host, mode))
logger.info("Crawler finished after %d seconds", (time.monotonic() - starttime))


# def worker_progress_wrapper(progress):
# def _decorator(f):
# @wraps(f)
# def wrapper(options, config, host_id):
# task = progress.ask_task()
# result = f(*args, **kwds)
#
# return wrapper
# return _decorator


def report_crawl(console: Console, options: dict, results: list[CrawlResult]):
table = Table(title="Results")
table.add_column("Host Name")
table.add_column("Status")
table.add_column("Duration")
if options.get("canary"):
table.add_column("Total directories")
table.add_column("Unreadable directories")
table.add_column("Changed to up2date")
table.add_column("Changed to NOT up2date")
table.add_column("Unchanged")
table.add_column("Unknown")
table.add_column("HostCategoryDirs created")
table.add_column("HostCategoryDirs deleted")

def _to_str(int_or_none):
str(int_or_none) if int_or_none is not None else ""

for result in results:
row = [
result.host_name,
result.status,
f"{result.duration:.0f}s",
]
if options.get("canary"):
row.extend(
[
_to_str(result.total_directories),
_to_str(result.unreadable),
_to_str(result.up2date),
_to_str(result.not_up2date),
_to_str(result.unchanged),
_to_str(result.unknown),
_to_str(result.hcds_created),
_to_str(result.hcds_deleted),
]
)
table.add_row(row)
console.print(table)

# logger.info("Crawl results for %s", result.host_name)
# logger.info("Status: %s", result.status)
# logger.info(f"Crawl duration: {result.duration:.0f} seconds")
# if result.status == CrawlStatus.SUCCESS:
# logger.info("Total directories: %s", result.total_directories)
# logger.info("Unreadable directories: %s", result.unreadable)
# logger.info("Changed to up2date: %s", result.up2date)
# logger.info("Changed to not up2date: %s", result.not_up2date)
# logger.info("Unchanged: %s", result.unchanged)
# logger.info("Unknown disposition: %s", result.unknown)
# logger.info("New HostCategoryDirs created: %s", result.hcds_created)
# logger.info(
# "HostCategoryDirs now deleted on the master, marked not " "up2date: %s",
# result.hcds_deleted,
# )


@main.command()
@click.option(
"--prop-repo",
default="rawhide",
help="Repository prefix to use for propagation. Defaults to 'rawhide'.",
)
@click.pass_context
def propagation(ctx, prop_repo, **kwargs):
"""Print out information about repomd.xml propagation.
Defaults to development/rawhide/x86_64/os/repodata
Only the category 'Fedora Linux' is supported
"""
print(ctx.obj)
print(ctx.params)
# options = ctx.obj["options"]
pass
58 changes: 58 additions & 0 deletions mirrormanager2/crawler/connection_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from functools import partial
from urllib.parse import urlsplit

from .ftp_connector import FTPConnector
from .http_connector import HTTPConnector, HTTPSConnector
from .rsync_connector import RsyncConnector

logger = logging.getLogger("crawler")


def _get_connection_class(scheme):
if scheme == "http":
return HTTPConnector
if scheme == "https":
return HTTPSConnector
if scheme == "ftp":
return FTPConnector
if scheme == "rsync":
return RsyncConnector


class ConnectionPool:
def __init__(self, config, debuglevel=0, timeout=600):
self._connections = {}
self.config = config
self.debuglevel = debuglevel
self.timeout = timeout

def get(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
if netloc not in self._connections:
connection_class = _get_connection_class(scheme)
self._connections[(scheme, netloc)] = connection_class(
config=self.config,
debuglevel=self.debuglevel,
timeout=self.timeout,
on_closed=partial(self._remove_connection, scheme, netloc),
)
# self._connections[(scheme, netloc)] = self._connect(netloc)
# self._connections[(scheme, netloc)].set_debuglevel(self.debuglevel)
return self._connections[(scheme, netloc)]

def close(self, url):
scheme, netloc, path, query, fragment = urlsplit(url)
try:
connection = self._connections[(scheme, netloc)]
except KeyError:
return
connection.close()

def _remove_connection(self, scheme, netloc, connection):
del self._connections[(scheme, netloc)]

def close_all(self):
for connection in list(self._connections.values()):
connection.close()
self._connections = {}
Loading

0 comments on commit 1acf9cb

Please sign in to comment.