From c8ec0e1935803932100e1a7bb86789b28530f865 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Tue, 21 Dec 2021 00:11:26 -0500 Subject: [PATCH] Swap createrepo_c for rpmrepo_metadata [noissue] --- .ci/ansible/Containerfile.j2 | 2 + pulp_rpm/app/metadata_parsing.py | 333 ---------------------------- pulp_rpm/app/models/package.py | 77 ++++++- pulp_rpm/app/settings.py | 1 - pulp_rpm/app/tasks/synchronizing.py | 72 +++--- 5 files changed, 122 insertions(+), 363 deletions(-) delete mode 100644 pulp_rpm/app/metadata_parsing.py diff --git a/.ci/ansible/Containerfile.j2 b/.ci/ansible/Containerfile.j2 index 6b131be344..100861b713 100644 --- a/.ci/ansible/Containerfile.j2 +++ b/.ci/ansible/Containerfile.j2 @@ -1,5 +1,7 @@ FROM {{ ci_base | default("ghcr.io/pulp/pulp-ci-centos:" + pulp_container_tag) }} +RUN yum install -y rustc cargo + # Add source directories to container {% for item in plugins %} {% if item.source.startswith("./") %} diff --git a/pulp_rpm/app/metadata_parsing.py b/pulp_rpm/app/metadata_parsing.py deleted file mode 100644 index bbb063001c..0000000000 --- a/pulp_rpm/app/metadata_parsing.py +++ /dev/null @@ -1,333 +0,0 @@ -import bz2 -import collections -import gzip -import logging -import lzma -import os -import re -from django.conf import settings -from gettext import gettext as _ - -import createrepo_c as cr -from xml.etree.cElementTree import iterparse - -log = logging.getLogger(__name__) - -NS_STRIP_RE = re.compile("{.*?}") - - -def iterative_files_changelog_parser(file_extension, filelists_xml_path, other_xml_path): - """ - Iteratively parse filelists.xml and other.xml, to avoid over-use of memory. - - createrepo_c parses everything in bulk, into memory. For large repositories such as - RHEL 7 or OL 7, this can require more than 5gb of memory. That isn't acceptable, especially - when many repositories are being synced at once. The main offenders are other.xml (changelogs) - and filelists.xml (list of owned files). These also happen to be relatively easy to parse. - - This function, ported from Pulp 2, takes a path to filelists.xml and other.xml, creates - a streaming parser for each, and then yields one package worth of data from each file. - """ - # it's basically always gzip, but we'll cover our bases w/ all the possibilites - if file_extension == "gz": - open_func = gzip.open - elif file_extension == "xz": - open_func = lzma.open - elif file_extension == "bz2": - open_func = bz2.open - elif file_extension == "xml": - open_func = open - else: - raise TypeError("Unknown metadata compression type") - # TODO: zstd - - with open_func(filelists_xml_path) as filelists_xml, open_func(other_xml_path) as other_xml: - filelists_parser = iterparse(filelists_xml, events=("start", "end")) - filelists_xml_iterator = iter(filelists_parser) - - other_parser = iterparse(other_xml, events=("start", "end")) - other_xml_iterator = iter(other_parser) - - # get a hold of the root element so we can clear it - # this prevents the entire parsed document from building up in memory - try: - filelists_root_element = next(filelists_xml_iterator)[1] - other_root_element = next(other_xml_iterator)[1] - # I know. This is a terrible misuse of SyntaxError. Don't blame the messenger. - except SyntaxError: - log.error("failed to parse XML metadata file") - raise - - while True: - for event, filelists_element in filelists_xml_iterator: - # if we're not at a fully parsed package element, keep going - if event != "end": - continue - # make this work whether the file has namespace as part of the tag or not - if not ( - filelists_element.tag == "package" - or re.sub(NS_STRIP_RE, "", filelists_element.tag) == "package" - ): - continue - - break - - for event, other_element in other_xml_iterator: - # if we're not at a fully parsed package element, keep going - if event != "end": - continue - # make this work whether the file has namespace as part of the tag or not - if not ( - other_element.tag == "package" - or re.sub(NS_STRIP_RE, "", other_element.tag) == "package" - ): - continue - - break - - (filelists_pkgid, files) = process_filelists_package_element(filelists_element) - (other_pkgid, changelogs) = process_other_package_element(other_element) - - filelists_root_element.clear() # clear all previously parsed ancestors of the root - other_root_element.clear() - - assert ( - filelists_pkgid == other_pkgid - ), "Package id for filelists.xml ({}) and other.xml ({}) do not match".format( - filelists_pkgid, other_pkgid - ) - - yield filelists_pkgid, files, changelogs - - -def process_filelists_package_element(element): - """Parse one package element from the filelists.xml.""" - pkgid = element.attrib["pkgid"] - - files = [] - for subelement in element: - if subelement.tag == "file" or re.sub(NS_STRIP_RE, "", subelement.tag) == "file": - basename, filename = os.path.split(subelement.text) - basename = f"{basename}/" - ftype = subelement.attrib.get("type") - files.append((ftype, basename, filename)) - - return pkgid, files - - -def process_other_package_element(element): - """Parse package element from other.xml.""" - pkgid = element.attrib["pkgid"] - changelogs = [] - - for subelement in element: - if subelement.tag == "changelog" or re.sub(NS_STRIP_RE, "", subelement.tag) == "changelog": - author = subelement.attrib["author"] - date = int(subelement.attrib["date"]) - text = subelement.text - changelogs.append((author, date, text)) - - # make sure the changelogs are sorted by date - changelogs.sort(key=lambda t: t[1]) - - if settings.KEEP_CHANGELOG_LIMIT is not None: - # always keep at least one changelog, even if the limit is set to 0 - changelog_limit = settings.KEEP_CHANGELOG_LIMIT or 1 - # changelogs are listed in chronological order, grab the last N changelogs from the list - changelogs = changelogs[-changelog_limit:] - return pkgid, changelogs - - -def warningcb(warning_type, message): - """Optional callback for warnings about wierd stuff and formatting in XML. - - Args: - warning_type (int): One of the XML_WARNING_* constants. - message (str): Message. - """ - log.warn("PARSER WARNING: %s" % message) - return True # continue parsing - - -def parse_repodata( - primary_xml_path, filelists_xml_path, other_xml_path, only_primary=False, mirror=False -): - """ - Parse repodata to extract package info. - - Args: - primary_xml_path (str): a path to a downloaded primary.xml - filelists_xml_path (str): a path to a downloaded filelists.xml - other_xml_path (str): a path to a downloaded other.xml - - Kwargs: - only_primary (bool): If true, only the metadata in primary.xml will be parsed. - - Returns: - dict: createrepo_c package objects with the pkgId as a key - """ - packages = collections.OrderedDict() - - nevras = set() - pkgid_warning_triggered = False - nevra_warning_triggered = False - - def pkgcb(pkg): - """ - A callback which is used when a whole package entry in xml is parsed. - - Args: - pkg(preaterepo_c.Package): a parsed metadata for a package - - """ - nonlocal pkgid_warning_triggered - nonlocal nevra_warning_triggered - - ERR_MSG = _( - "The repository metadata being synced into Pulp is erroneous in a way that " - "makes it ambiguous (duplicate {}), and therefore we do not allow it to be synced in " - "'mirror_complete' mode. Please choose a sync policy which does not mirror " - "repository metadata.\n\n" - "Please read https://github.com/pulp/pulp_rpm/issues/2402 for more details." - ) - WARN_MSG = _( - "The repository metadata being synced into Pulp is erroneous in a way that " - "makes it ambiguous (duplicate {}). Yum, DNF and Pulp try to handle these problems, " - "but unexpected things may happen.\n\n" - "Please read https://github.com/pulp/pulp_rpm/issues/2402 for more details." - ) - - if not pkgid_warning_triggered and pkg.pkgId in packages: - pkgid_warning_triggered = True - if mirror: - raise Exception(ERR_MSG.format("PKGIDs")) - else: - log.warn(WARN_MSG.format("PKGIDs")) - if not nevra_warning_triggered and pkg.nevra() in nevras: - nevra_warning_triggered = True - if mirror: - raise Exception(ERR_MSG.format("NEVRAs")) - else: - log.warn(WARN_MSG.format("NEVRAs")) - packages[pkg.pkgId] = pkg - nevras.add(pkg.nevra()) - - def newpkgcb(pkgId, name, arch): - """ - A callback which is used when a new package entry is encountered. - - Only opening element is parsed at that moment. - This function has to return a package which parsed data will be added to - or None if a package should be skipped. - - pkgId, name and arch of a package can be used to skip further parsing. Available - only for filelists.xml and other.xml. - - Args: - pkgId(str): pkgId of a package - name(str): name of a package - arch(str): arch of a package - - Returns: - createrepo_c.Package: a package which parsed data should be added to. - - If None is returned, further parsing of a package will be skipped. - - """ - return packages.get(pkgId, None) - - cr.xml_parse_primary(primary_xml_path, pkgcb=pkgcb, warningcb=warningcb, do_files=False) - if not only_primary: - cr.xml_parse_filelists(filelists_xml_path, newpkgcb=newpkgcb, warningcb=warningcb) - cr.xml_parse_other(other_xml_path, newpkgcb=newpkgcb, warningcb=warningcb) - return packages - - -class MetadataParser: - """Parser for RPM metadata.""" - - def __init__(self): - """Initialize empty (use one of the alternate constructors).""" - self.primary_xml_path = None - self.filelists_xml_path = None - self.other_xml_path = None - - @staticmethod - def from_metadata_files(primary_xml_path, filelists_xml_path, other_xml_path): - """Construct a parser from the three main metadata files.""" - parser = MetadataParser() - parser.primary_xml_path = primary_xml_path - parser.filelists_xml_path = filelists_xml_path - parser.other_xml_path = other_xml_path - return parser - - def count_packages(self): - """Count the total number of packages.""" - # It would be much faster to just read the number in the header of the metadata. - # But there's no way to do that, and also we can't necessarily rely on that number because - # of duplicates. - len( - parse_repodata( - self.primary_xml_path, - self.filelists_xml_path, - self.other_xml_path, - only_primary=True, - ) - ) - - def parse_packages_iterative(self, file_extension, skip_srpms=False, mirror=False): - """Parse packages iteratively using the hybrid parser.""" - extra_repodata_parser = iterative_files_changelog_parser( - file_extension, self.filelists_xml_path, self.other_xml_path - ) - seen_pkgids = set() - # We *do not* want to skip srpms when parsing primary because otherwise we run into - # trouble when we encounter them again on the iterative side of the parser. Just skip - # them at the end. - for pkg in self.parse_packages(only_primary=True, mirror=mirror): - pkgid = pkg.pkgId - while True: - pkgid_extra, files, changelogs = next(extra_repodata_parser) - if pkgid_extra in seen_pkgids: - # This is a dirty hack to handle cases that "shouldn't" happen. - # Sometimes repositories have packages listed twice under the same - # pkgid. This is a problem because the primary.xml parsing - # deduplicates the entries by placing them into a dict keyed by pkgid. - # So if the iterative parser(s) run into a package we've seen before, - # we should skip it and move on. - continue - else: - seen_pkgids.add(pkgid) - break - - assert pkgid == pkgid_extra, ( - "Package id from primary metadata ({}), does not match package id " - "from filelists, other metadata ({})" - ).format(pkgid, pkgid_extra) - - if skip_srpms and pkg.arch == "src": - continue - - pkg.files = files - pkg.changelogs = changelogs - yield pkg - - def parse_packages(self, only_primary=False, skip_srpms=False, mirror=False): - """Parse packages using the traditional createrepo_c parser.""" - packages = parse_repodata( - self.primary_xml_path, - self.filelists_xml_path, - self.other_xml_path, - only_primary=only_primary, - mirror=mirror, - ) - while True: - try: - (pkgid, pkg) = packages.popitem(last=False) - except KeyError: - break - - if skip_srpms and pkg.arch == "src": - continue - - yield pkg diff --git a/pulp_rpm/app/models/package.py b/pulp_rpm/app/models/package.py index 290989913a..3a2ac75fc7 100644 --- a/pulp_rpm/app/models/package.py +++ b/pulp_rpm/app/models/package.py @@ -2,6 +2,7 @@ import createrepo_c as cr +from django.conf import settings from django.db import models from django.db.models import Window, F from django.db.models.functions import RowNumber @@ -264,6 +265,69 @@ class Meta: class ReadonlyMeta: readonly = ["evr"] + @classmethod + def rpmrepo_to_dict(cls, package): + """ + Convert createrepo_c package object to dict for instantiating Package object. + + Args: + package(createrepo_c.Package): a RPM/SRPM package to convert + + Returns: + dict: all data for RPM/SRPM content creation + + """ + changelogs = package.changelogs + + # make sure the changelogs are sorted by date + changelogs.sort(key=lambda t: t[1]) + + if settings.KEEP_CHANGELOG_LIMIT is not None: + # always keep at least one changelog, even if the limit is set to 0 + changelog_limit = settings.KEEP_CHANGELOG_LIMIT or 1 + # changelogs are listed in chronological order, grab the last N changelogs from the list + changelogs = changelogs[-changelog_limit:] + + return { + PULP_PACKAGE_ATTRS.ARCH: package.arch, + PULP_PACKAGE_ATTRS.CHANGELOGS: changelogs, + PULP_PACKAGE_ATTRS.CHECKSUM_TYPE: getattr( + CHECKSUM_TYPES, package.checksum_type.upper() + ), + PULP_PACKAGE_ATTRS.CONFLICTS: package.conflicts, + PULP_PACKAGE_ATTRS.DESCRIPTION: package.description, + PULP_PACKAGE_ATTRS.ENHANCES: package.enhances, + PULP_PACKAGE_ATTRS.EPOCH: package.epoch, + PULP_PACKAGE_ATTRS.FILES: package.files_split, + PULP_PACKAGE_ATTRS.LOCATION_BASE: "", # TODO, delete this entirely + PULP_PACKAGE_ATTRS.LOCATION_HREF: package.location_href, + PULP_PACKAGE_ATTRS.NAME: package.name, + PULP_PACKAGE_ATTRS.OBSOLETES: package.obsoletes, + PULP_PACKAGE_ATTRS.PKGID: package.pkgid, + PULP_PACKAGE_ATTRS.PROVIDES: package.provides, + PULP_PACKAGE_ATTRS.RECOMMENDS: package.recommends, + PULP_PACKAGE_ATTRS.RELEASE: package.release, + PULP_PACKAGE_ATTRS.REQUIRES: package.requires, + PULP_PACKAGE_ATTRS.RPM_BUILDHOST: package.rpm_buildhost, + PULP_PACKAGE_ATTRS.RPM_GROUP: package.rpm_buildhost, + PULP_PACKAGE_ATTRS.RPM_HEADER_END: package.rpm_header_range[1], + PULP_PACKAGE_ATTRS.RPM_HEADER_START: package.rpm_header_range[0], + PULP_PACKAGE_ATTRS.RPM_LICENSE: package.rpm_license, + PULP_PACKAGE_ATTRS.RPM_PACKAGER: package.packager, + PULP_PACKAGE_ATTRS.RPM_SOURCERPM: package.rpm_sourcerpm, + PULP_PACKAGE_ATTRS.RPM_VENDOR: package.rpm_vendor, + PULP_PACKAGE_ATTRS.SIZE_ARCHIVE: package.size_archive, + PULP_PACKAGE_ATTRS.SIZE_INSTALLED: package.size_installed, + PULP_PACKAGE_ATTRS.SIZE_PACKAGE: package.size_package, + PULP_PACKAGE_ATTRS.SUGGESTS: package.suggests, + PULP_PACKAGE_ATTRS.SUMMARY: package.summary, + PULP_PACKAGE_ATTRS.SUPPLEMENTS: package.supplements, + PULP_PACKAGE_ATTRS.TIME_BUILD: package.time_build, + PULP_PACKAGE_ATTRS.TIME_FILE: package.time_file, + PULP_PACKAGE_ATTRS.URL: package.url, + PULP_PACKAGE_ATTRS.VERSION: package.version, + } + @classmethod def createrepo_to_dict(cls, package): """ @@ -276,9 +340,20 @@ def createrepo_to_dict(cls, package): dict: all data for RPM/SRPM content creation """ + changelogs = getattr(package, CR_PACKAGE_ATTRS.CHANGELOGS, []) + + # make sure the changelogs are sorted by date + changelogs.sort(key=lambda t: t[1]) + + if settings.KEEP_CHANGELOG_LIMIT is not None: + # always keep at least one changelog, even if the limit is set to 0 + changelog_limit = settings.KEEP_CHANGELOG_LIMIT or 1 + # changelogs are listed in chronological order, grab the last N changelogs from the list + changelogs = changelogs[-changelog_limit:] + return { PULP_PACKAGE_ATTRS.ARCH: getattr(package, CR_PACKAGE_ATTRS.ARCH), - PULP_PACKAGE_ATTRS.CHANGELOGS: getattr(package, CR_PACKAGE_ATTRS.CHANGELOGS, []), + PULP_PACKAGE_ATTRS.CHANGELOGS: changelogs, PULP_PACKAGE_ATTRS.CHECKSUM_TYPE: getattr( CHECKSUM_TYPES, getattr(package, CR_PACKAGE_ATTRS.CHECKSUM_TYPE).upper() ), diff --git a/pulp_rpm/app/settings.py b/pulp_rpm/app/settings.py index 6dc9d51ae6..4227d88592 100644 --- a/pulp_rpm/app/settings.py +++ b/pulp_rpm/app/settings.py @@ -8,6 +8,5 @@ INSTALLED_APPS = ["django_readonly_field", "dynaconf_merge"] ALLOW_AUTOMATIC_UNSAFE_ADVISORY_CONFLICT_RESOLUTION = False DEFAULT_ULN_SERVER_BASE_URL = "https://linux-update.oracle.com/" -RPM_ITERATIVE_PARSING = True KEEP_CHANGELOG_LIMIT = 10 SOLVER_DEBUG_LOGS = True diff --git a/pulp_rpm/app/tasks/synchronizing.py b/pulp_rpm/app/tasks/synchronizing.py index d5c00cf604..d847f0b343 100644 --- a/pulp_rpm/app/tasks/synchronizing.py +++ b/pulp_rpm/app/tasks/synchronizing.py @@ -20,6 +20,7 @@ from aiohttp.web_exceptions import HTTPNotFound import createrepo_c as cr +import rpmrepo_metadata import libcomps from pulpcore.plugin.models import ( @@ -88,7 +89,6 @@ from pulp_rpm.app.comps import strdict_to_dict, dict_digest from pulp_rpm.app.shared_utils import is_previous_version, get_sha256, urlpath_sanitize -from pulp_rpm.app.metadata_parsing import MetadataParser import gi @@ -728,10 +728,6 @@ async def run_repomdrecord_download(name, location_href, downloader): result = await downloader.run() return name, location_href, result - file_extension = [ - record.location_href for record in repomd.records if record.type == "primary" - ][0].split(".")[-1] - for record in repomd.records: record_checksum_type = getattr(CHECKSUM_TYPES, record.checksum_type.upper()) checksum_types[record.type] = record_checksum_type @@ -824,7 +820,7 @@ async def run_repomdrecord_download(name, location_href, downloader): except FileNotFoundError: raise - await self.parse_repository_metadata(repomd, repomd_files, file_extension) + await self.parse_repository_metadata(repomd, repomd_files) async def parse_distribution_tree(self): """Parse content from the file treeinfo if present.""" @@ -855,7 +851,7 @@ async def parse_distribution_tree(self): dc.extra_data = self.treeinfo await self.put(dc) - async def parse_repository_metadata(self, repomd, metadata_results, file_extension): + async def parse_repository_metadata(self, repomd, metadata_results): """Parse repository metadata.""" needed_metadata = set(PACKAGE_REPODATA) - set(metadata_results.keys()) @@ -879,7 +875,6 @@ async def parse_repository_metadata(self, repomd, metadata_results, file_extensi metadata_results["primary"], metadata_results["filelists"], metadata_results["other"], - file_extension=file_extension, ) groups_list = [] @@ -1110,28 +1105,60 @@ async def parse_packages_components(self, comps_result): return dc_groups - async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_extension="gz"): + async def parse_packages(self, primary_xml, filelists_xml, other_xml): """Parse packages from the remote repository.""" - parser = MetadataParser.from_metadata_files( + package_parser = rpmrepo_metadata.PackageParser( primary_xml.path, filelists_xml.path, other_xml.path ) progress_data = { "message": "Parsed Packages", "code": "sync.parsing.packages", - "total": parser.count_packages(), + "total": package_parser.total_packages, } async with ProgressReport(**progress_data) as packages_pb: # skip SRPM if defined skip_srpms = "srpm" in self.skip_types and not self.mirror_metadata - async def on_package(pkg): - """Callback when handling a completed package. + nevras = set() + checksums = set() + pkgid_warning_triggered = False + nevra_warning_triggered = False + + ERR_MSG = _( + "The repository metadata being synced into Pulp is erroneous in a way that " + "makes it ambiguous (duplicate {}), and therefore we do not allow it to be " + "synced in 'mirror_complete' mode. Please choose a sync policy which does " + "not mirror repository metadata.\n\n" + "Please read https://github.com/pulp/pulp_rpm/issues/2402 for more details." + ) + WARN_MSG = _( + "The repository metadata being synced into Pulp is erroneous in a way that " + "makes it ambiguous (duplicate {}). Yum, DNF and Pulp try to handle these " + "problems, but unexpected things may happen.\n\n" + "Please read https://github.com/pulp/pulp_rpm/issues/2402 for more details." + ) + + for pkg in package_parser: + if not pkgid_warning_triggered and pkg.pkgid in checksums: + pkgid_warning_triggered = True + if self.mirror_metadata: + raise Exception(ERR_MSG.format("PKGIDs")) + else: + log.warn(WARN_MSG.format("PKGIDs")) + if not nevra_warning_triggered and pkg.nevra() in nevras: + nevra_warning_triggered = True + if self.mirror_metadata: + raise Exception(ERR_MSG.format("NEVRAs")) + else: + log.warn(WARN_MSG.format("NEVRAs")) + nevras.add(pkg.nevra()) + checksums.add(pkg.pkgid) + + if pkg.arch == "srpm" and skip_srpms: + continue - Args: - pkg (createrepo_c.Package): A completed createrepo_c package. - """ if self.mirror_metadata: uses_base_url = pkg.location_base illegal_relative_path = self.is_illegal_relative_path(pkg.location_href) @@ -1139,7 +1166,7 @@ async def on_package(pkg): if uses_base_url or illegal_relative_path: raise ValueError(MIRROR_INCOMPATIBLE_REPO_ERR_MSG) - package = Package(**Package.createrepo_to_dict(pkg)) + package = Package(**Package.rpmrepo_to_dict(pkg)) base_url = pkg.location_base or self.remote_url url = urlpath_sanitize(base_url, package.location_href) del pkg # delete it as soon as we're done with it @@ -1174,17 +1201,6 @@ async def on_package(pkg): await packages_pb.aincrement() # TODO: don't do this for every individual package await self.put(dc) - if settings.RPM_ITERATIVE_PARSING: - for pkg in parser.parse_packages_iterative( - file_extension, skip_srpms=skip_srpms, mirror=self.mirror_metadata - ): - await on_package(pkg) - else: - for pkg in parser.parse_packages( - skip_srpms=skip_srpms, mirror=self.mirror_metadata - ): - await on_package(pkg) - async def parse_advisories(self, result): """Parse advisories from the remote repository.""" updateinfo_xml_path = result.path