From cb01cea729a1cc16d4cc1d2262682c10f7c10e17 Mon Sep 17 00:00:00 2001 From: Nikolay Yurin Date: Mon, 31 Jul 2023 13:47:23 -0700 Subject: [PATCH] src/patchset.py: Implement Patchset service Patchset service process patchset nodes: - Wait for parent checkout node to be available - Download checkout node tarball - Apply patches and calculate patchset hash - Upload new tarball Signed-off-by: Nikolay Yurin --- .gitignore | 2 + config/kernelci.toml | 5 + docker-compose.yaml | 14 ++ src/monitor.py | 7 +- src/patchset.py | 317 +++++++++++++++++++++++++++++++++++++++++++ src/tarball.py | 47 ++++--- 6 files changed, 370 insertions(+), 22 deletions(-) create mode 100755 src/patchset.py diff --git a/.gitignore b/.gitignore index 4a1428479..985b144f1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .env .docker-env data +*.pyc +*.venv diff --git a/config/kernelci.toml b/config/kernelci.toml index 4edaa6056..296f6ad8b 100644 --- a/config/kernelci.toml +++ b/config/kernelci.toml @@ -13,6 +13,11 @@ kdir = "/home/kernelci/data/src/linux" output = "/home/kernelci/data/output" storage_config = "docker-host" +[patchset] +kdir = "/home/kernelci/data/src/linux-patchset" +output = "/home/kernelci/data/output" +storage_config = "docker-host" + [scheduler] output = "/home/kernelci/data/output" diff --git a/docker-compose.yaml b/docker-compose.yaml index 1ab1de07c..824edb9ce 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -153,3 +153,17 @@ services: - '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}' - 'run' - '--mode=holdoff' + + patchset: + <<: *base-service + container_name: 'kernelci-pipeline-patchset' + command: + - './pipeline/patchset.py' + - '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}' + - 'run' + volumes: + - './src:/home/kernelci/pipeline' + - './config:/home/kernelci/config' + - './data/ssh:/home/kernelci/data/ssh' + - './data/src:/home/kernelci/data/src' + - './data/output:/home/kernelci/data/output' diff --git a/src/monitor.py b/src/monitor.py index dd23b2bf4..152d4da64 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -60,9 +60,10 @@ def _run(self, sub_id): event = self._api.receive_event(sub_id) obj = event.data dt = datetime.datetime.fromisoformat(event['time']) - commit = (obj['data']['kernel_revision']['commit'][:12] - if 'kernel_revision' in obj['data'] - else str(None)) + try: + commit = obj['data']['kernel_revision']['commit'][:12] + except (KeyError, TypeError): + commit = str(None) result = result_map[obj['result']] if obj['result'] else str(None) print(self.LOG_FMT.format( time=dt.strftime('%Y-%m-%d %H:%M:%S.%f'), diff --git a/src/patchset.py b/src/patchset.py new file mode 100755 index 000000000..6050a48b7 --- /dev/null +++ b/src/patchset.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +# +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (C) 2022 Collabora Limited +# Author: Nikolay Yurin + +import os +import sys +import json +import requests +import time +import tempfile +import hashlib +from datetime import datetime, timedelta +from urllib.parse import urlparse +from urllib.request import urlopen + +import kernelci +import kernelci.build +import kernelci.config +from kernelci.legacy.cli import Args, Command, parse_opts +import kernelci.storage + +from tarball import Tarball + +# FIXME: make patchset service configuration option +ALLOWED_DOMAINS = {"patchwork.kernel.org"} +PATCHSET_SHORT_HASH_LEN = 13 +PROCESSING_DELAY_SECS = 30 +PATCH_TMP_FILE_PREFIX = "kernel-patch" + + +class Patchset(Tarball): + TAR_CREATE_CMD = """\ +set -e +cd {target_dir}/ +tar --create --transform "s/^/{prefix}\\//" * | gzip > {tarball_path} +""" + + APPLY_PATCH_SHELL_CMD = """\ +set -e +cd {kdir} +patch -p1 < {patch_file} +""" + + # FIXME: I really don"t have a good idea what I"m doing here + # This code probably needs rework and put into kernelci.patch + def _hash_patch(self, patch_name, patch_file): + allowed_prefixes = { + b"old mode", # Old file permissions + b"new mode", # New file permissions + b"-", # This convers both removed lines and source file + b"+", # This convers both added lines and target file + # "@" I don"t know how we should handle hunks yet + } + hashable_patch_lines = [] + for line in patch_file.readlines(): + if not line: + continue + + for prefix in allowed_prefixes: + if line.startswith(prefix): + hashable_patch_lines.append(line) + break + + hashable_content = b"/n".join(hashable_patch_lines) + self.log.debug( + "Hashable content:\n" + + hashable_content.decode("utf-8") + ) + patch_hash_digest = hashlib.sha256(hashable_content).hexdigest() + self.log.debug(f"Patch {patch_name} hash: {patch_hash_digest}") + return patch_hash_digest + + # FIXME: move into kernelci.patch + def _apply_patch(self, kdir, patch_name, patch_url): + self.log.info( + f"Applying patch {patch_name}, url: {patch_url}", + ) + try: + encoding = urlopen(patch_url).headers.get_charsets()[0] + except Exception as e: + self.log.warn( + "Failed to fetch encoding from patch " + f"{patch_name} headers: {e}" + ) + self.log.warn("Falling back to utf-8 encoding") + encoding = "utf-8" + + with tempfile.NamedTemporaryFile( + prefix="{}-{}-".format(PATCH_TMP_FILE_PREFIX, patch_name), + encoding=encoding + ) as tmp_f: + if not kernelci.build._download_file(patch_url, tmp_f.name): + raise FileNotFoundError( + f"Error downloading patch from {patch_url}" + ) + + kernelci.shell_cmd(self.APPLY_PATCH_SHELL_CMD.format( + kdir=kdir, + patch_file=tmp_f.name, + )) + + return self._hash_patch(patch_name, tmp_f) + + # FIXME: move into kernelci.patch + def _apply_patches(self, kdir, patch_artifacts): + patchset_hash = hashlib.sha256() + for patch_name, patch_url in patch_artifacts.items(): + patch_hash = self._apply_patch(kdir, patch_name, patch_url) + patchset_hash.update(patch_hash.encode("utf-8")) + + patchset_hash_digest = patchset_hash.hexdigest() + self.log.debug(f"Patchset hash: {patchset_hash_digest}") + return patchset_hash_digest + + def _download_checkout_archive(self, tarball_url, retries=3): + self.log.info(f"Downloading checkout tarball, url: {tarball_url}") + tar_filename = os.path.basename(urlparse(tarball_url).path) + kernelci.build.pull_tarball( + kdir=self._kdir, + url=tarball_url, + dest_filename=tar_filename, + retries=retries, + delete=True + ) + + def _update_node(self, node, revision, tarball_url): + updated_node = node.copy() + updated_node.update({ + "data": { + "kernel_revision": revision, + }, + "state": "available", + "artifacts": { + "tarball": tarball_url, + }, + "holdoff": str(datetime.utcnow() + timedelta(minutes=10)) + }) + try: + self._api.node.update(updated_node) + except requests.exceptions.HTTPError as err: + err_msg = json.loads(err.response.content).get("detail", []) + self.log.error(err_msg) + + def _setup(self, *args): + return self._api_helper.subscribe_filters({ + "op": "created", + "name": "patchset", + "state": "running", + }) + + def _has_allowed_domain(self, url): + domain = urlparse(url).hostname + if domain not in ALLOWED_DOMAINS: + raise RuntimeError( + "Forbidden mbox domain %s, allowed domains: %s", + domain, + ALLOWED_DOMAINS + ) + + def _validate_patch_artifacts(self, node_id, patch_artifacts): + if not patch_artifacts: + raise ValueError( + "No patch artifacts available for node %s", + node_id, + ) + + for patch_mbox_url in patch_artifacts.values(): + self._has_allowed_domain(patch_mbox_url) + + def _gen_checkout_name(self, checkout_node): + revision = checkout_node["data"]["kernel_revision"] + return "-".join([ + "linux", + revision["tree"], + revision["branch"], + revision["describe"], + ]) + + def _process_patchset(self, checkout_node, patchset_node): + patch_artifacts = patchset_node.get("artifacts") + self._validate_patch_artifacts(patchset_node["id"], patch_artifacts) + self._download_checkout_archive(checkout_node["artifacts"]["tarball"]) + + checkout_name = self._gen_checkout_name(checkout_node) + checkout_path = os.path.join(self._kdir, checkout_name) + + patchset_hash = self._apply_patches(checkout_path, patch_artifacts) + patchset_hash_short = patchset_hash[:PATCHSET_SHORT_HASH_LEN] + + tarball_path = self._make_tarball( + target_dir=checkout_path, + tarball_name=f"{checkout_name}-{patchset_hash_short}" + ) + tarball_url = self._push_tarball(tarball_path) + + patchset_revision = { + **checkout_node["data"]["kernel_revision"], + "patchset": patchset_hash, + } + self._update_node( + node=patchset_node, + revision=patchset_revision, + tarball_url=tarball_url + ) + + def _mark_failed(self, patchset_node): + node = patchset_node.copy() + node.update({ + "state": "done", + "result": "fail", + }) + try: + self._api.node.update(node) + except requests.exceptions.HTTPError as err: + err_msg = json.loads(err.response.content).get("detail", []) + self.log.error(err_msg) + + def _mark_failed_if_no_parent(self, patchset_node): + if not patchset_node["parent"]: + self.log.error( + f"Patchset node {patchset_node['id']} as has no parent" + "checkout node , marking node as failed", + ) + self._mark_failed(patchset_node) + return True + + return False + + def _mark_failed_if_parent_failed(self, patchset_node, checkout_node): + if ( + checkout_node["state"] == "done" and + checkout_node["result"] == "fail" + ): + self.log.error( + f"Parent checkout node {checkout_node['id']} failed, " + f"marking patchset node {patchset_node['id']} as failed", + ) + self._mark_failed(patchset_node) + return True + + return False + + def _run(self, sub_id): + self.log.info("Listening for new trigger events") + self.log.info("Press Ctrl-C to stop.") + + while True: + patchset_nodes = self._api.node.find({ + "name": "patchset", + "state": "running", + }) + + if patchset_nodes: + self.log.debug(f"Found patchset nodes: {patchset_nodes}") + + for patchset_node in patchset_nodes: + if self._mark_failed_if_no_parent(patchset_node): + continue + + checkout_node = self._api.node.get(patchset_node["parent"]) + + if self._mark_failed_if_parent_failed( + patchset_node, + checkout_node + ): + continue + + if checkout_node["state"] == "running": + self.log.info( + f"Patchset node {patchset_node['id']} is waiting " + f"for checkout node {checkout_node['id']} to complete", + ) + continue + + try: + self.log.info( + f"Processing patchset node: {patchset_node['id']}", + ) + self._process_patchset(checkout_node, patchset_node) + except Exception as e: + self.log.error( + f"Patchset node {patchset_node['id']} " + f"processing failed: {e}", + ) + self.log.traceback() + self._mark_failed(patchset_node) + + self.log.info( + f"Waiting {PROCESSING_DELAY_SECS} seconds for a new nodes...", + ) + time.sleep(PROCESSING_DELAY_SECS) + + +class cmd_run(Command): + help = ( + "Wait for a checkout node to be available " + "and push a source+patchset tarball" + ) + args = [ + Args.kdir, Args.output, Args.api_config, Args.storage_config, + ] + opt_args = [ + Args.verbose, Args.storage_cred, + ] + + def __call__(self, configs, args): + return Patchset(configs, args).run(args) + + +if __name__ == "__main__": + opts = parse_opts("patchset", globals()) + configs = kernelci.config.load("config/pipeline.yaml") + status = opts.command(configs, opts) + sys.exit(0 if status is True else 1) diff --git a/src/tarball.py b/src/tarball.py index 97292048a..a25adcd83 100755 --- a/src/tarball.py +++ b/src/tarball.py @@ -5,13 +5,12 @@ # Copyright (C) 2022 Collabora Limited # Author: Guillaume Tucker # Author: Jeny Sadadia +# Author: Nikolay Yurin from datetime import datetime, timedelta -import logging import os import re import sys -import urllib.parse import json import requests @@ -32,6 +31,11 @@ class Tarball(Service): + TAR_CREATE_CMD = """\ +set -e +cd {target_dir} +git archive --format=tar --prefix={prefix}/ HEAD | gzip > {tarball_path} +""" def __init__(self, configs, args): super().__init__(configs, args, 'tarball') @@ -50,7 +54,7 @@ def _find_build_config(self, node): revision = node['data']['kernel_revision'] tree = revision['tree'] branch = revision['branch'] - for name, config in self._build_configs.items(): + for config in self._build_configs.values(): if config.tree.name == tree and config.branch == branch: return config @@ -59,25 +63,22 @@ def _update_repo(self, config): kernelci.build.update_repo(config, self._kdir) self.log.info("Repo updated") - def _make_tarball(self, config, describe): - name = '-'.join(['linux', config.tree.name, config.branch, describe]) - tarball = f"{name}.tar.gz" - self.log.info(f"Making tarball {tarball}") - output_path = os.path.relpath(self._output, self._kdir) - cmd = """\ -set -e -cd {kdir} -git archive --format=tar --prefix={name}/ HEAD | gzip > {output}/{tarball} -""".format(kdir=self._kdir, name=name, output=output_path, tarball=tarball) + def _make_tarball(self, target_dir, tarball_name): + self.log.info(f"Making tarball {tarball_name}") + tarball_path = os.path.join(self._output, f"{tarball_name}.tar.gz") + cmd = self.TAR_CREATE_CMD.format( + target_dir=target_dir, + prefix=tarball_name, + tarball_path=tarball_path + ) self.log.info(cmd) kernelci.shell_cmd(cmd) self.log.info("Tarball created") - return tarball + return tarball_path - def _push_tarball(self, config, describe): - tarball_name = self._make_tarball(config, describe) - tarball_path = os.path.join(self._output, tarball_name) - self.log.info(f"Uploading {tarball_path}") + def _push_tarball(self, tarball_path): + tarball_name = os.path.basename(tarball_path) + self.log.info(f"Uploading {tarball_name}") tarball_url = self._storage.upload_single((tarball_path, tarball_name)) self.log.info(f"Upload complete: {tarball_url}") os.unlink(tarball_path) @@ -134,11 +135,19 @@ def _run(self, sub_id): continue self._update_repo(build_config) + describe = kernelci.build.git_describe( build_config.tree.name, self._kdir ) version = self._get_version_from_describe() - tarball_url = self._push_tarball(build_config, describe) + tarball_name = '-'.join([ + 'linux', + build_config.tree.name, + build_config.branch, + describe + ]) + tarball_path = self._make_tarball(self._kdir, tarball_name) + tarball_url = self._push_tarball(tarball_path) self._update_node(checkout_node, describe, version, tarball_url) return True