diff --git a/Tests/kaas/k8s-version-recency/k8s-version-recency-check.py b/Tests/kaas/k8s-version-recency/k8s-version-recency-check.py old mode 100644 new mode 100755 index e43ff69ba..9f0015f39 --- a/Tests/kaas/k8s-version-recency/k8s-version-recency-check.py +++ b/Tests/kaas/k8s-version-recency/k8s-version-recency-check.py @@ -24,11 +24,11 @@ import aiohttp import asyncio import datetime +from dateutil import relativedelta import getopt import kubernetes_asyncio import logging import logging.config -import math import re import requests import sys @@ -36,9 +36,9 @@ MAJOR_VERSION_CADENCE = None -MINOR_VERSION_CADENCE = 4 # months -PATCH_VERSION_CADENCE = 1 # week -CVE_VERSION_CADENCE = 3 # days +MINOR_VERSION_CADENCE_MONTHS = 4 +PATCH_VERSION_CADENCE_WEEKS = 1 +CVE_VERSION_CADENCE_DAYS = 3 CVE_SEVERITY = 8 # CRITICAL logging_config = { @@ -62,7 +62,7 @@ } } -logger = logging.getLogger("k8s-version-recency-check") +logger = logging.getLogger(__name__) class ConfigException(BaseException): @@ -83,13 +83,12 @@ def print_usage(): print(""" K8s Version Recency Compliance Check -Usage: k8s-version-recency-check.py [-h] [-k|--kubeconfig PATH/TO/KUBECONFIG] +Usage: k8s-version-recency-check.py [-h] [-c|--config PATH/TO/CONFIG] -k|--kubeconfig PATH/TO/KUBECONFIG The K8s version recency check returns 0 if the version of the tested cluster is still acceptable, otherwise -it returns 1 for an out-of-date minor version, 2 for an out-of-date patch level version or 3 if the currently -used version should be updated due to a highly critical CVE. +it returns 2 for an out-of date version or 3 if the used version should be updated due to a highly critical CVE. - -c/--config PATH/TOCONFIG - Path to the config file of the test script + -c/--config PATH/TO/CONFIG - Path to the config file of the test script -k/--kubeconfig PATH/TO/KUBECONFIG - Path to the kubeconfig of the server we want to check -h - Output help """) @@ -142,7 +141,7 @@ def initialize_config(config): # Otherwise, we initialize logging with the included literal setup_logging(config.logging or logging_config) - if not config.kubeconfig: + if config.kubeconfig is None: print("A kubeconfig needs to be set in order to test a k8s cluster version.") raise ConfigException @@ -167,39 +166,44 @@ def __init__(self, major=0, minor=0, patch=0): def __eq__(self, other): if not isinstance(other, K8sVersionInfo): - return False + raise TypeError return self.major == other.major and self.minor == other.minor and self.patch == other.patch def __gt__(self, other): if not isinstance(other, K8sVersionInfo): - return False + raise TypeError patchcomp = self.minor == other.minor and self.patch > other.patch return self.major > other.major or (self.major == other.major and (self.minor > other.minor or patchcomp)) def __ge__(self, other): if not isinstance(other, K8sVersionInfo): - return False + raise TypeError patchcomp = self.minor == other.minor and self.patch >= other.patch return self.major > other.major or (self.major == other.major and (self.minor > other.minor or patchcomp)) def __lt__(self, other): if not isinstance(other, K8sVersionInfo): - return False + raise TypeError patchcomp = self.minor == other.minor and self.patch < other.patch return self.major < other.major or (self.major == other.major and (self.minor < other.minor or patchcomp)) def __le__(self, other): if not isinstance(other, K8sVersionInfo): - return False + raise TypeError patchcomp = self.minor == other.minor and self.patch <= other.patch return self.major < other.major or (self.major == other.major and (self.minor < other.minor or patchcomp)) @classmethod - def extract_version(cls, string, separator="."): + def extract_version(cls, string, separator=".", strip=None): + if strip is None: + strip = ["v"] + for s in strip: + string = string.strip(s) components = string.strip().split(separator) return cls(int(components[0]), int(components[1]), int(components[2])) def check_for_version(self, major=None, minor=None, patch=None): + """Check if a version or part of the version is equal to the given version numbers""" return (major is None or self.major == major) and \ (minor is None or self.minor == minor) and \ (patch is None or self.patch == patch) @@ -209,42 +213,57 @@ def __str__(self): class CVEVersionInfo: - def __init__(self, lower_version, upper_version, less_than=False, equal=False): + """Class that contains a CVE version info. + + Attributes: + upper_version (K8sVersionInfo): Last version with the CVE + lower_version (K8sVersionInfo): First version with the CVE; this value will be set if either an affected version + is directly set in a CVE dataset or if the CVE dataset is in a non-standard format. + If the variable is set, `lower_version` and `upper_version` create a range of affected versions. + equal (bool): check if the version is equal to the `upper_version`, (less than is always checked, since the + format is build like this) + """ + def __init__(self, lower_version, upper_version, equal=False): self.lower_version = lower_version self.upper_version = upper_version - self.less_than = less_than self.equal = equal def __eq__(self, other): if not isinstance(other, CVEVersionInfo): - return False - return self.lower_version == other.lower_version and self.upper_version == other.upper_version and \ - self.less_than == other.less_than and self.equal == self.equal + raise TypeError + return self.lower_version == other.lower_version and \ + self.upper_version == other.upper_version and \ + self.equal == self.equal def is_version_affected(self, version_info): + # See the following link for more information about the format + # https://www.cve.org/AllResources/CveServices#cve-json-5 + + # Check if an `upper version` exists if self.upper_version: + # Check if a `lower version` exists and compare the version against it if self.lower_version: gt = self.lower_version <= version_info else: gt = True - if self.less_than: - if self.equal: - return gt and self.upper_version >= version_info - return gt and self.upper_version > version_info + # Compare the version either with `less than` or `less than or equal` against the `upper version` + if self.equal: + return gt and self.upper_version >= version_info + return gt and self.upper_version > version_info else: + # If no upper version exists, we only need to check if the version is equal to the `lower version` return self.lower_version == version_info def diff_months(date1, date2): - return abs((date1.year - date2.year) * 12 + date1.month - date2.month) + r = relativedelta.relativedelta(date2, date1) + return r.months + (12 * r.years) def diff_weeks(date1, date2): - day1 = (date1 - datetime.timedelta(days=date1.weekday())) - day2 = (date2 - datetime.timedelta(days=date2.weekday())) - diff = day2 - day1 - return abs((diff.days / 7) + math.ceil(diff.seconds / 86400)) + delta = date1 - date2 + return abs(delta.days / 7) def diff_days(date1, date2): @@ -254,48 +273,46 @@ def diff_days(date1, date2): async def request_cve_data(session: aiohttp.ClientSession, cveid: str) -> dict: """Request for a single CVE data item.""" - resp = await session.request('GET', f"https://cveawg.mitre.org/api/cve/{cveid}", - headers={"Accept": "application/json"}) - return await resp.json() + async with session.get( + f"https://cveawg.mitre.org/api/cve/{cveid}", + headers={"Accept": "application/json"} + ) as resp: + return await resp.json() def parse_cve_version_information(cve_version_info): """Parse the CVE version information according to their CVE JSON 5.0 schema""" vi_lower_version = None vi_upper_version = None - less_than = False equal = False # Extract the version if it is viable, but it's not a requirement try: - vi_lower_version = K8sVersionInfo().extract_version(cve_version_info['version'].strip("v")) + vi_lower_version = K8sVersionInfo.extract_version(cve_version_info['version']) except ValueError: pass if 'lessThanOrEqual' in cve_version_info: - vi_upper_version = K8sVersionInfo().extract_version(cve_version_info['lessThanOrEqual'].strip("v")) - vi_upper_version.patch += 1 - less_than = True + vi_upper_version = K8sVersionInfo.extract_version(cve_version_info['lessThanOrEqual']) equal = True elif 'lessThan' in cve_version_info: - vi_upper_version = K8sVersionInfo().extract_version(cve_version_info['lessThan'].strip("v")) - less_than = True + vi_upper_version = K8sVersionInfo.extract_version(cve_version_info['lessThan']) # This shouldn't happen, but if it happens, we look for non-standard descriptions # According to this(https://www.cve.org/AllResources/CveServices#cve-json-5), # this isn't how the data should be described - if not vi_lower_version and not vi_upper_version: + if vi_lower_version is None and vi_upper_version is None: if re.search(r'v?\d+.\d+.x', cve_version_info['version']): vdata = cve_version_info['version'].strip("v").split(".") vi_lower_version = K8sVersionInfo(vdata[0], vdata[1], 0) - vi_upper_version = K8sVersionInfo(vdata[0], int(vdata[1])+1, 0) + vi_upper_version = K8sVersionInfo(vdata[0], vdata[1], 0) if re.search(r'v?\d+.\d+.\d+\s+-\s+v?\d+.\d+.\d+', cve_version_info['version']): vdata = cve_version_info['version'].split("-") - vi_lower_version = K8sVersionInfo().extract_version(vdata[0].strip("v")) - vi_upper_version = K8sVersionInfo().extract_version(vdata[1].strip("v")) + vi_lower_version = K8sVersionInfo.extract_version(vdata[0]) + vi_upper_version = K8sVersionInfo.extract_version(vdata[1]) - return CVEVersionInfo(vi_lower_version, vi_upper_version, less_than, equal) + return CVEVersionInfo(vi_lower_version, vi_upper_version, equal) async def collect_cve_versions(): @@ -349,8 +366,11 @@ async def collect_cve_versions(): if version_info['status'] == "affected" ] for cvev in affected_kubernetes_versions: - if cvev not in cfvs: - cfvs.append(cvev) + try: + if cvev not in cfvs: + cfvs.append(cvev) + except TypeError: + pass return cfvs @@ -363,73 +383,51 @@ async def get_k8s_cluster_version(kubeconfig): version_api = kubernetes_asyncio.client.VersionApi(api) ret = await version_api.get_code() - version = K8sVersionInfo.extract_version(ret.git_version.strip("v")) + version = K8sVersionInfo.extract_version(ret.git_version) version.date = datetime.datetime.strptime(ret.build_date, '%Y-%m-%dT%H:%M:%SZ') return version, cluster_config.current_context['name'] -def is_current_version(version, k8s_version_list, cve_version_list=[]): - """Filter the versions depending on the usage times set by the Standard and - the times set for CVE versions. - """ - try: - if diff_months(version.date, datetime.datetime.now()) >= MINOR_VERSION_CADENCE: - return False - - for kv in k8s_version_list: - if version.check_for_version(major=kv.major, minor=kv.minor) and \ - version.patch < kv.patch: - - if diff_weeks(datetime.datetime.now(), kv.date) >= PATCH_VERSION_CADENCE: - return False - - if kv in cve_version_list and \ - diff_days(datetime.datetime.now(), kv.date) >= CVE_VERSION_CADENCE: - return False - except (KeyError, IndexError, TypeError) as e: - logger.debug(f"An error occurred during version filtering: {e}") - return False - else: - return True - - -def collect_accepted_k8s_versions(cve_version_list=[]): - """Collect a list of k8s versions that comply to the cadence time set by the standard""" - - k8s_versions = [] +def check_k8s_version_recency(version, cve_version_list=None): + """Check a given K8s cluster version against the list of released versions in order to find out, if the version + is an accepted recent version according to the standard.""" + if cve_version_list is None: + cve_version_list = list() github_headers = { "Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28" } - # Request latest version - resp = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases/latest", - headers=github_headers).json() - - # Latest version - lv = K8sVersionInfo.extract_version(resp['tag_name'].strip("v")) - lv.date = datetime.datetime.strptime(resp['published_at'], '%Y-%m-%dT%H:%M:%SZ') - # Request the latest 100 version (the next are not needed, since these versions are too old) response = requests.get("https://api.github.com/repos/kubernetes/kubernetes/releases?per_page=100", headers=github_headers).json() - # Iterate all versions until the first patch of the previous latest version for r in response: - v = K8sVersionInfo.extract_version(r['tag_name'].split("-")[0].strip("v")) + v = K8sVersionInfo.extract_version(r['tag_name'].split("-")[0]) v.date = datetime.datetime.strptime(r['published_at'], '%Y-%m-%dT%H:%M:%SZ') - if not r['draft'] and not r['prerelease']: - k8s_versions.append(v) + if r['draft'] or r['prerelease']: + continue + + # Check if the version is recent + if v.minor >= version.minor: + if diff_months(v.date, datetime.datetime.now()) >= MINOR_VERSION_CADENCE_MONTHS: + return False + + if version.check_for_version(major=v.major, minor=v.minor) and version.patch < v.patch: + if diff_weeks(datetime.datetime.now(), v.date) >= PATCH_VERSION_CADENCE_WEEKS: + return False - # Stop adding new version if the version we added is the previous latest minor versions first patch, - # since it is most of the time around 3-6 months until a new version comes out or an old one goes into EOL - if v.check_for_version(lv.major, (lv.minor-1), 0): + if v in cve_version_list and \ + diff_days(datetime.datetime.now(), v.date) >= CVE_VERSION_CADENCE_DAYS: + return False + + if v.minor == (version.minor + 1) and v.patch == 0: break - return [v for v in k8s_versions if is_current_version(v, k8s_versions, cve_version_list)] + return True async def main(argv): @@ -440,20 +438,21 @@ async def main(argv): return 1 cve_versions = await collect_cve_versions() - k8s_versions = collect_accepted_k8s_versions(cve_versions) cluster_version, cluster_name = await get_k8s_cluster_version(config.kubeconfig) - for k8sv in k8s_versions: - if k8sv == cluster_version: - logger.info("The K8s cluster version %s of cluster '%s' is still in the recency time window." % - (str(cluster_version), cluster_name)) - return 0 + if check_k8s_version_recency(cluster_version, cve_versions): + logger.info("The K8s cluster version %s of cluster '%s' is still in the recency time window." % + (str(cluster_version), cluster_name)) + return 0 for cvev in cve_versions: - if cvev.is_version_affected(cluster_version): - logger.error("The K8s cluster version %s of cluster '%s' is an outdated version " - "with a possible CRITICAL CVE." % (str(cluster_version), cluster_name)) - return 3 + try: + if cvev.is_version_affected(cluster_version): + logger.error("The K8s cluster version %s of cluster '%s' is an outdated version " + "with a possible CRITICAL CVE." % (str(cluster_version), cluster_name)) + return 3 + except TypeError as e: + logger.error(f"An error occurred during CVE check: {e}") logger.error("The K8s cluster version %s of cluster '%s' is outdated according to the Standard." % (str(cluster_version), cluster_name))