diff --git a/.gitignore b/.gitignore index 6494908ad..0122bc1d0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ .docker-env data *.pyc +*.venv + diff --git a/config/kernelci.toml b/config/kernelci.toml index 4edaa6056..c1f3be162 100644 --- a/config/kernelci.toml +++ b/config/kernelci.toml @@ -13,6 +13,15 @@ kdir = "/home/kernelci/data/src/linux" output = "/home/kernelci/data/output" storage_config = "docker-host" +[patchset] +kdir = "/home/kernelci/data/src/linux-patchset" +output = "/home/kernelci/data/output" +storage_config = "docker-host" +patchset_tmp_file_prefix = "kernel-patch" +patchset_short_hash_len = 13 +allowed_domains = ["patchwork.kernel.org"] +polling_delay_secs = 30 + [scheduler] output = "/home/kernelci/data/output" diff --git a/docker-compose.yaml b/docker-compose.yaml index 21c7c8c9c..43746b88a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -193,3 +193,18 @@ services: - '--mode=holdoff' extra_hosts: - "host.docker.internal:host-gateway" + + patchset: + <<: *base-service + container_name: 'kernelci-pipeline-patchset' + command: + - './pipeline/patchset.py' + - '--settings=${KCI_SETTINGS:-/home/kernelci/config/kernelci.toml}' + - 'run' + volumes: + - './src:/home/kernelci/pipeline' + - './config:/home/kernelci/config' + - './data/ssh:/home/kernelci/data/ssh' + - './data/src:/home/kernelci/data/src' + - './data/output:/home/kernelci/data/output' + diff --git a/src/monitor.py b/src/monitor.py index e4a24f20d..0f1796956 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -60,9 +60,10 @@ def _run(self, sub_id): event = self._api.receive_event(sub_id) obj = event.data dt = datetime.datetime.fromisoformat(event['time']) - commit = (obj['data']['kernel_revision']['commit'][:12] - if 'kernel_revision' in obj['data'] - else str(None)) + try: + commit = obj['data']['kernel_revision']['commit'][:12] + except (KeyError, TypeError): + commit = str(None) result = result_map[obj['result']] if obj['result'] else str(None) print(self.LOG_FMT.format( time=dt.strftime('%Y-%m-%d %H:%M:%S.%f'), diff --git a/src/patchset.py b/src/patchset.py new file mode 100755 index 000000000..c37ce22a2 --- /dev/null +++ b/src/patchset.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +# +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (c) Meta Platforms, Inc. and affiliates. +# Author: Nikolay Yurin + +import os +import sys +import json +import requests +import time +import tempfile +import hashlib +from datetime import datetime, timedelta +from urllib.parse import urlparse +from urllib.request import urlopen + +import kernelci +import kernelci.build +import kernelci.config +from kernelci.legacy.cli import Args, Command, parse_opts +import kernelci.storage + +from base import Service +from tarball import Tarball + + +class Patchset(Tarball): + TAR_CREATE_CMD = """\ +set -e +cd {target_dir} +tar --create --transform "s/^/{prefix}\\//" * | gzip > {tarball_path} +""" + + APPLY_PATCH_SHELL_CMD = """\ +set -e +cd {checkout_path} +patch -p1 < {patch_file} +""" + + # FIXME: I really don"t have a good idea what I"m doing here + # This code probably needs rework and put into kernelci.patch + def _hash_patch(self, patch_name, patch_file): + allowed_prefixes = { + b"old mode", # Old file permissions + b"new mode", # New file permissions + b"-", # This convers both removed lines and source file + b"+", # This convers both added lines and target file + # "@" I don"t know how we should handle hunks yet + } + hashable_patch_lines = [] + for line in patch_file.readlines(): + if not line: + continue + + for prefix in allowed_prefixes: + if line.startswith(prefix): + hashable_patch_lines.append(line) + break + + hashable_content = b"/n".join(hashable_patch_lines) + self.log.debug( + "Hashable content:\n" + + hashable_content.decode("utf-8") + ) + patch_hash_digest = hashlib.sha256(hashable_content).hexdigest() + self.log.debug(f"Patch {patch_name} hash: {patch_hash_digest}") + return patch_hash_digest + + # FIXME: move into kernelci.patch + def _apply_patch(self, checkout_path, patch_name, patch_url): + self.log.info( + f"Applying patch {patch_name}, url: {patch_url}", + ) + try: + encoding = urlopen(patch_url).headers.get_charsets()[0] + except Exception as e: + self.log.warn( + "Failed to fetch encoding from patch " + f"{patch_name} headers: {e}" + ) + self.log.warn("Falling back to utf-8 encoding") + encoding = "utf-8" + + with tempfile.NamedTemporaryFile( + prefix="{}-{}-".format( + self._service_config.patchset_tmp_file_prefix, + patch_name + ), + encoding=encoding + ) as tmp_f: + if not kernelci.build._download_file(patch_url, tmp_f.name): + raise FileNotFoundError( + f"Error downloading patch from {patch_url}" + ) + + kernelci.shell_cmd(self.APPLY_PATCH_SHELL_CMD.format( + checkout_path=checkout_path, + patch_file=tmp_f.name, + )) + + return self._hash_patch(patch_name, tmp_f) + + # FIXME: move into kernelci.patch + def _apply_patches(self, checkout_path, patch_artifacts): + patchset_hash = hashlib.sha256() + for patch_name, patch_url in patch_artifacts.items(): + patch_hash = self._apply_patch(checkout_path, patch_name, patch_url) + patchset_hash.update(patch_hash.encode("utf-8")) + + patchset_hash_digest = patchset_hash.hexdigest() + self.log.debug(f"Patchset hash: {patchset_hash_digest}") + return patchset_hash_digest + + def _download_checkout_archive(self, download_path, tarball_url, retries=3): + self.log.info(f"Downloading checkout tarball, url: {tarball_url}") + tar_filename = os.path.basename(urlparse(tarball_url).path) + kernelci.build.pull_tarball( + kdir=download_path, + url=tarball_url, + dest_filename=tar_filename, + retries=retries, + delete=True + ) + + def _update_node( + self, + patchset_node, + checkout_node, + tarball_url, + patchset_hash + ): + patchset_data = checkout_node.get("data", {}).copy() + patchset_data["kernel_revision"]["patchset"] = patchset_hash + + updated_node = patchset_node.copy() + updated_node["artifacts"]["tarball"] = tarball_url + updated_node["state"] = "available" + updated_node["data"] = patchset_data + updated_node["holdoff"] = str( + datetime.utcnow() + timedelta(minutes=10) + ) + + try: + self._api.node.update(updated_node) + except requests.exceptions.HTTPError as err: + err_msg = json.loads(err.response.content).get("detail", []) + self.log.error(err_msg) + + def _setup(self, *args): + return self._api_helper.subscribe_filters({ + "op": "created", + "name": "patchset", + "state": "running", + }) + + def _has_allowed_domain(self, url): + domain = urlparse(url).hostname + if domain not in self._service_config.allowed_domains: + raise RuntimeError( + "Forbidden mbox domain %s, allowed domains: %s", + domain, + self._service_config.allowed_domains, + ) + + def _get_patch_artifacts(self, patchset_node): + node_artifacts = patchset_node.get("artifacts") + if not node_artifacts: + raise ValueError( + "Patchset node %s has no artifacts", + patchset_node["id"], + ) + + for patch_mbox_url in node_artifacts.values(): + self._has_allowed_domain(patch_mbox_url) + + return node_artifacts + + def _gen_checkout_name(self, checkout_node): + revision = checkout_node["data"]["kernel_revision"] + return "-".join([ + "linux", + revision["tree"], + revision["branch"], + revision["describe"], + ]) + + def _process_patchset(self, checkout_node, patchset_node): + patch_artifacts = self._get_patch_artifacts(patchset_node) + + # Tarball download implicitely removes destination dir + # there's no need to cleanup this directory + self._download_checkout_archive( + download_path=self._service_config.kdir, + tarball_url=checkout_node["artifacts"]["tarball"] + ) + + checkout_name = self._gen_checkout_name(checkout_node) + checkout_path = os.path.join(self._service_config.kdir, checkout_name) + + patchset_hash = self._apply_patches(checkout_path, patch_artifacts) + patchset_hash_short = patchset_hash[ + :self._service_config.patchset_short_hash_len + ] + + tarball_path = self._make_tarball( + target_dir=checkout_path, + tarball_name=f"{checkout_name}-{patchset_hash_short}" + ) + tarball_url = self._push_tarball(tarball_path) + + self._update_node( + patchset_node=patchset_node, + checkout_node=checkout_node, + tarball_url=tarball_url, + patchset_hash=patchset_hash + ) + + def _mark_failed(self, patchset_node): + node = patchset_node.copy() + node.update({ + "state": "done", + "result": "fail", + }) + try: + self._api.node.update(node) + except requests.exceptions.HTTPError as err: + err_msg = json.loads(err.response.content).get("detail", []) + self.log.error(err_msg) + + def _mark_failed_if_no_parent(self, patchset_node): + if not patchset_node["parent"]: + self.log.error( + f"Patchset node {patchset_node['id']} as has no parent" + "checkout node , marking node as failed", + ) + self._mark_failed(patchset_node) + return True + + return False + + def _mark_failed_if_parent_failed(self, patchset_node, checkout_node): + if ( + checkout_node["state"] == "done" and + checkout_node["result"] == "fail" + ): + self.log.error( + f"Parent checkout node {checkout_node['id']} failed, " + f"marking patchset node {patchset_node['id']} as failed", + ) + self._mark_failed(patchset_node) + return True + + return False + + def _run(self, _sub_id): + self.log.info("Listening for new trigger events") + self.log.info("Press Ctrl-C to stop.") + + while True: + patchset_nodes = self._api.node.find({ + "name": "patchset", + "state": "running", + }) + + if patchset_nodes: + self.log.debug(f"Found patchset nodes: {patchset_nodes}") + + for patchset_node in patchset_nodes: + if self._mark_failed_if_no_parent(patchset_node): + continue + + checkout_node = self._api.node.get(patchset_node["parent"]) + + if self._mark_failed_if_parent_failed( + patchset_node, + checkout_node + ): + continue + + if checkout_node["state"] == "running": + self.log.info( + f"Patchset node {patchset_node['id']} is waiting " + f"for checkout node {checkout_node['id']} to complete", + ) + continue + + try: + self.log.info( + f"Processing patchset node: {patchset_node['id']}", + ) + self._process_patchset(checkout_node, patchset_node) + except Exception as e: + self.log.error( + f"Patchset node {patchset_node['id']} " + f"processing failed: {e}", + ) + self.log.traceback() + self._mark_failed(patchset_node) + + self.log.info( + "Waiting %d seconds for a new nodes..." % + self._service_config.polling_delay_secs, + ) + time.sleep(self._service_config.polling_delay_secs) + + +class cmd_run(Command): + help = ( + "Wait for a checkout node to be available " + "and push a source+patchset tarball" + ) + args = [ + Args.kdir, Args.output, Args.api_config, Args.storage_config, + ] + opt_args = [ + Args.verbose, Args.storage_cred, + ] + + def __call__(self, configs, args): + return Patchset(configs, args).run(args) + + +if __name__ == "__main__": + opts = parse_opts("patchset", globals()) + configs = kernelci.config.load("config") + status = opts.command(configs, opts) + sys.exit(0 if status is True else 1) diff --git a/src/tarball.py b/src/tarball.py index e0f3e992b..7d27b7a9f 100755 --- a/src/tarball.py +++ b/src/tarball.py @@ -5,13 +5,12 @@ # Copyright (C) 2022 Collabora Limited # Author: Guillaume Tucker # Author: Jeny Sadadia +# Author: Nikolay Yurin from datetime import datetime, timedelta -import logging import os import re import sys -import urllib.parse import json import requests @@ -32,25 +31,30 @@ class Tarball(Service): - - def __init__(self, configs, args): - super().__init__(configs, args, 'tarball') - self._build_configs = configs['build_configs'] - self._kdir = args.kdir - self._output = args.output - if not os.path.exists(self._output): - os.makedirs(self._output) - self._verbose = args.verbose - self._storage_config = configs['storage_configs'][args.storage_config] + TAR_CREATE_CMD = """\ +set -e +cd {target_dir} +git archive --format=tar --prefix={prefix}/ HEAD | gzip > {tarball_path} +""" + + def __init__(self, global_configs, service_config): + super().__init__(global_configs, service_config, 'tarball') + self._service_config = service_config + self._build_configs = global_configs['build_configs'] + if not os.path.exists(self._service_config.output): + os.makedirs(self._service_config.output) + storage_config = global_configs['storage_configs'][ + service_config.storage_config + ] self._storage = kernelci.storage.get_storage( - self._storage_config, args.storage_cred + storage_config, service_config.storage_cred ) def _find_build_config(self, node): revision = node['data']['kernel_revision'] tree = revision['tree'] branch = revision['branch'] - for name, config in self._build_configs.items(): + for config in self._build_configs.values(): if config.tree.name == tree and config.branch == branch: return config @@ -61,13 +65,16 @@ def _update_repo(self, config): ''' self.log.info(f"Updating repo for {config.name}") try: - kernelci.build.update_repo(config, self._kdir) + kernelci.build.update_repo(config, self._service_config.kdir) except Exception as err: self.log.error(f"Failed to update: {err}, cleaning stale repo") # safeguard, make sure it is git repo - if not os.path.exists(os.path.join(self._kdir, '.git')): - self.log.error(f"{self._kdir} is not a git repo") - raise Exception(f"{self._kdir} is not a git repo") + if not os.path.exists( + os.path.join(self._service_config.kdir, '.git') + ): + err_msg = f"{self._service_config.kdir} is not a git repo" + self.log.error(err_msg) + raise Exception(err_msg) # cleanup the repo and return True, so we try again kernelci.shell_cmd(f"rm -rf {self._kdir}") return True @@ -75,24 +82,24 @@ def _update_repo(self, config): self.log.info("Repo updated") return False - def _make_tarball(self, config, describe): - name = '-'.join(['linux', config.tree.name, config.branch, describe]) - tarball = f"{name}.tar.gz" - self.log.info(f"Making tarball {tarball}") - output_path = os.path.relpath(self._output, self._kdir) - cmd = """\ -set -e -cd {kdir} -git archive --format=tar --prefix={name}/ HEAD | gzip > {output}/{tarball} -""".format(kdir=self._kdir, name=name, output=output_path, tarball=tarball) + def _make_tarball(self, target_dir, tarball_name): + self.log.info(f"Making tarball {tarball_name}") + tarball_path = os.path.join( + self._service_config.output, + f"{tarball_name}.tar.gz" + ) + cmd = self.TAR_CREATE_CMD.format( + target_dir=target_dir, + prefix=tarball_name, + tarball_path=tarball_path + ) self.log.info(cmd) kernelci.shell_cmd(cmd) self.log.info("Tarball created") - return tarball + return tarball_path - def _push_tarball(self, config, describe): - tarball_name = self._make_tarball(config, describe) - tarball_path = os.path.join(self._output, tarball_name) + def _push_tarball(self, tarball_path): + tarball_name = os.path.basename(tarball_path) self.log.info(f"Uploading {tarball_path}") tarball_url = self._storage.upload_single((tarball_path, tarball_name)) self.log.info(f"Upload complete: {tarball_url}") @@ -100,7 +107,9 @@ def _push_tarball(self, config, describe): return tarball_url def _get_version_from_describe(self): - describe_v = kernelci.build.git_describe_verbose(self._kdir) + describe_v = kernelci.build.git_describe_verbose( + self._service_config.kdir + ) version = KVER_RE.match(describe_v).groupdict() return { key: value @@ -157,14 +166,22 @@ def _run(self, sub_id): os._exit(1) describe = kernelci.build.git_describe( - build_config.tree.name, self._kdir + build_config.tree.name, self._service_config.kdir ) version = self._get_version_from_describe() - tarball_url = self._push_tarball(build_config, describe) + tarball_name = '-'.join([ + 'linux', + build_config.tree.name, + build_config.branch, + describe + ]) + tarball_path = self._make_tarball( + self._service_config.kdir, + tarball_name + ) + tarball_url = self._push_tarball(tarball_path) self._update_node(checkout_node, describe, version, tarball_url) - return True - class cmd_run(Command): help = "Wait for a new revision event and push a source tarball"