From 12f4c42d73e8f296aa6daf769d7830860879b6b2 Mon Sep 17 00:00:00 2001 From: Tim Laurence Date: Sun, 18 Mar 2018 23:32:15 -0400 Subject: [PATCH] Major update, mainly better version checks. (#30) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note: API change to use the proper units as documented by Nagios. You may notice large jumps in your graphs if units were previously being ignored. Note: Memory usage no-longer includes cache to match the way ‘docker stats’ does it. You likely will see a drop in memory usage from this. Made version check much better, it can now handle non-official registries. Added ability to specify size of a KB (1000 or 1024 bytes) Moved testing to use Pytest Added traceback to error handling for better bug reports. Handle unsupported memory units better, i.e. not with assert. Adjusted default registry to match Docker documentation. Round and truncate displayed values as appropriate --- .travis.yml | 7 +- README.md | 20 +- README.txt | 91 ++- check_docker | 293 +++++-- check_swarm | 4 +- test_check_docker.py | 1785 +++++++++++++++++------------------------- test_check_swarm.py | 594 +++++++------- 7 files changed, 1313 insertions(+), 1481 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5a67307..0a24556 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,11 +4,10 @@ python: - "3.4" - "3.5" - "3.6" + - "3.7-dev" install: - - pip install codeclimate-test-reporter coverage==4.3.4 pyfakefs + - pip install codeclimate-test-reporter coverage==4.3.4 pyfakefs pytest # command to run tests script: - - COVERAGE_FILE=.coverage.check_docker coverage run ./test_check_docker.py - - COVERAGE_FILE=.coverage.check_swarm coverage run ./test_check_swarm.py - - coverage combine .coverage.check_* + - coverage run --include='check_*' -m pytest - codeclimate-test-reporter || echo "Ignoring Code Climate reporter upload failure" diff --git a/README.md b/README.md index 8752c55..752db8e 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ With check_docker can use it to check and alert on - container health checks are passing? - uptime, i.e. is it able to stay running for a long enough time? - the presence of a container or containers matching specified names -- image version (experimental!), does the running image match that in the remote registry? +- image version, does the running image match that in the remote registry? With check_swarm you can alert @@ -56,11 +56,13 @@ With wget usage: check_docker [-h] [--connection [//docker.socket|:] | --secure-connection [:]] - [--timeout TIMEOUT] + [--binary_units | --decimal_units] [--timeout TIMEOUT] [--containers CONTAINERS [CONTAINERS ...]] [--present] [--cpu WARN:CRIT] [--memory WARN:CRIT:UNITS] [--status STATUS] [--health] [--uptime WARN:CRIT] - [--version] [--restarts WARN:CRIT] + [--version] + [--insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...]] + [--restarts WARN:CRIT] Check docker containers. @@ -71,6 +73,10 @@ With wget /var/run/docker.sock) --secure-connection [:] Where to find TLS protected docker daemon socket. + --binary_units Use a base of 1024 when doing calculations of KB, MB, + GB, & TB (This is default) + --decimal_units Use a base of 1000 when doing calculations of KB, MB, + GB, & TB --timeout TIMEOUT Connection timeout in seconds. (default: 10.0) --containers CONTAINERS [CONTAINERS ...] One or more RegEx that match the names of the @@ -82,7 +88,7 @@ With wget limits. Valid values are 0 - 100. --memory WARN:CRIT:UNITS Check memory usage taking into account any limits. - Valid values for units are %,b,k,m,g. + Valid values for units are %,B,KB,MB,GB. --status STATUS Desired container status (running, exited, etc). (default: None) --health Check container's health check status @@ -90,7 +96,11 @@ With wget infrequent crashes are tolerated. --version Check if the running images are the same version as those in the registry. Useful for finding stale - images. Only works with public registry. + images. Does not support login. + --insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...] + List of registries to connect to with http(no TLS). + Useful when using "--version" with images from + insecure registries. --restarts WARN:CRIT Container restart thresholds. ## check_swarm Usage diff --git a/README.txt b/README.txt index 74d4ae3..3f29ab9 100644 --- a/README.txt +++ b/README.txt @@ -20,8 +20,7 @@ With check_docker can use it to check and alert on - container health checks are passing? - uptime, i.e. is it able to stay running for a long enough time? - the presence of a container or containers matching specified names -- image version (experimental!), does the running image match that in - the remote registry? +- image version, does the running image match that in the remote registry? With check_swarm you can alert @@ -39,45 +38,55 @@ check_docker Usage :: - usage: check_docker [-h] - [--connection [//docker.socket|:] - | --secure-connection [:]] - [--timeout TIMEOUT] - [--containers CONTAINERS [CONTAINERS ...]] [--present] - [--cpu WARN:CRIT] [--memory WARN:CRIT:UNITS] - [--status STATUS] [--health] [--uptime WARN:CRIT] - [--version] [--restarts WARN:CRIT] - - Check docker containers. - - optional arguments: - -h, --help show this help message and exit - --connection [//docker.socket|:] - Where to find docker daemon socket. (default: - /var/run/docker.sock) - --secure-connection [:] - Where to find TLS protected docker daemon socket. - --timeout TIMEOUT Connection timeout in seconds. (default: 10.0) - --containers CONTAINERS [CONTAINERS ...] - One or more RegEx that match the names of the - container(s) to check. If omitted all containers are - checked. (default: ['all']) - --present Modifies --containers so that each RegEx must match at - least one container. - --cpu WARN:CRIT Check cpu usage percentage taking into account any - limits. Valid values are 0 - 100. - --memory WARN:CRIT:UNITS - Check memory usage taking into account any limits. - Valid values for units are %,b,k,m,g. - --status STATUS Desired container status (running, exited, etc). - (default: None) - --health Check container's health check status - --uptime WARN:CRIT Minimum container uptime in seconds. Use when - infrequent crashes are tolerated. - --version Check if the running images are the same version as - those in the registry. Useful for finding stale - images. Only works with public registry. - --restarts WARN:CRIT Container restart thresholds. +usage: check_docker [-h] + [--connection [//docker.socket|:] + | --secure-connection [:]] + [--binary_units | --decimal_units] [--timeout TIMEOUT] + [--containers CONTAINERS [CONTAINERS ...]] [--present] + [--cpu WARN:CRIT] [--memory WARN:CRIT:UNITS] + [--status STATUS] [--health] [--uptime WARN:CRIT] + [--version] + [--insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...]] + [--restarts WARN:CRIT] + +Check docker containers. + +optional arguments: + -h, --help show this help message and exit + --connection [//docker.socket|:] + Where to find docker daemon socket. (default: + /var/run/docker.sock) + --secure-connection [:] + Where to find TLS protected docker daemon socket. + --binary_units Use a base of 1024 when doing calculations of KB, MB, + GB, & TB (This is default) + --decimal_units Use a base of 1000 when doing calculations of KB, MB, + GB, & TB + --timeout TIMEOUT Connection timeout in seconds. (default: 10.0) + --containers CONTAINERS [CONTAINERS ...] + One or more RegEx that match the names of the + container(s) to check. If omitted all containers are + checked. (default: ['all']) + --present Modifies --containers so that each RegEx must match at + least one container. + --cpu WARN:CRIT Check cpu usage percentage taking into account any + limits. Valid values are 0 - 100. + --memory WARN:CRIT:UNITS + Check memory usage taking into account any limits. + Valid values for units are %,B,KB,MB,GB. + --status STATUS Desired container status (running, exited, etc). + (default: None) + --health Check container's health check status + --uptime WARN:CRIT Minimum container uptime in seconds. Use when + infrequent crashes are tolerated. + --version Check if the running images are the same version as + those in the registry. Useful for finding stale + images. Does not support login. + --insecure-registries INSECURE_REGISTRIES [INSECURE_REGISTRIES ...] + List of registries to connect to with http(no TLS). + Useful when using "--version" with images from + insecure registries. + --restarts WARN:CRIT Container restart thresholds. check_swarm Usage ----------------- diff --git a/check_docker b/check_docker index 0c092c4..79feb80 100755 --- a/check_docker +++ b/check_docker @@ -1,12 +1,15 @@ #!/usr/bin/env python3 import os import stat -from collections import deque +import traceback +from collections import deque, namedtuple from datetime import datetime, timezone import logging from sys import argv from http.client import HTTPConnection -from urllib.request import AbstractHTTPHandler, HTTPHandler, HTTPSHandler, OpenerDirector +from urllib.error import HTTPError, URLError +from urllib.request import AbstractHTTPHandler, HTTPHandler, HTTPSHandler, OpenerDirector, HTTPRedirectHandler, Request, \ + HTTPDefaultErrorHandler, HTTPErrorProcessor import argparse import json import socket @@ -18,7 +21,7 @@ __author__ = 'Tim Laurence' __copyright__ = "Copyright 2017" __credits__ = ['Tim Laurence'] __license__ = "GPL" -__version__ = "1.0.5" +__version__ = "2.0.0" ''' nrpe compatible check for docker containers. @@ -32,17 +35,22 @@ dependency. DEFAULT_SOCKET = '/var/run/docker.sock' DEFAULT_TIMEOUT = 10.0 DEFAULT_PORT = 2375 -DEFAULT_MEMORY_UNITS = 'b' +DEFAULT_MEMORY_UNITS = 'B' DEFAULT_HEADERS = [('Accept', 'application/vnd.docker.distribution.manifest.v2+json')] -DEFAULT_PUBLIC_REGISTRY = 'https://index.docker.io' +DEFAULT_PUBLIC_REGISTRY = 'registry-1.docker.io' DEFAULT_PUBLIC_AUTH = 'https://auth.docker.io' -UNIT_ADJUSTMENTS = { - '%': 1, - 'b': 1, - 'k': 1024, - 'm': 1024 * 1024, - 'g': 1024 * 1024 * 1024 + +# The second value is the power to raise the base to. +UNIT_ADJUSTMENTS_TEMPLATE = { + '%': 0, + 'B': 0, + 'KB': 1, + 'MB': 2, + 'GB': 3, + 'TB': 4 } +unit_adjustments = None + OK_RC = 0 WARNING_RC = 1 CRITICAL_RC = 2 @@ -53,6 +61,8 @@ rc = -1 messages = [] performance_data = [] +ImageName = namedtuple('ImageName', "registry name tag full_name") + # Hacked up urllib to handle sockets ############################################################################################# @@ -81,10 +91,36 @@ class SocketFileHandler(AbstractHTTPHandler): return self.do_open(self.SocketFileToHttpConnectionAdaptor, req) +# Got some help from this example https://gist.github.com/FiloSottile/2077115 +class HeadRequest(Request): + def get_method(self): + return "HEAD" + + +class HEADRedirectHandler(HTTPRedirectHandler): + """ + Subclass the HTTPRedirectHandler to make it use our + HeadRequest also on the redirected URL + """ + + def redirect_request(self, req, fp, code, msg, headers, newurl): + if code in (301, 302, 303, 307): + newurl = newurl.replace(' ', '%20') + newheaders = dict((k, v) for k, v in req.headers.items() + if k.lower() not in ("content-length", "content-type")) + return HeadRequest(newurl, + headers=newheaders, + origin_req_host=req.get_origin_req_host(), + unverifiable=True) + else: + raise HTTPError(req.get_full_url(), code, msg, headers, fp) + + better_urllib_get = OpenerDirector() better_urllib_get.addheaders = DEFAULT_HEADERS.copy() better_urllib_get.add_handler(HTTPHandler()) better_urllib_get.add_handler(HTTPSHandler()) +better_urllib_get.add_handler(HTTPRedirectHandler()) better_urllib_get.add_handler(SocketFileHandler()) better_urllib_head = OpenerDirector() @@ -92,6 +128,9 @@ better_urllib_head.method = 'HEAD' better_urllib_head.addheaders = DEFAULT_HEADERS.copy() better_urllib_head.add_handler(HTTPHandler()) better_urllib_head.add_handler(HTTPSHandler()) +better_urllib_head.add_handler(HTTPDefaultErrorHandler()) +better_urllib_head.add_handler(HTTPRedirectHandler()) +better_urllib_head.add_handler(HTTPErrorProcessor()) better_urllib_head.add_handler(SocketFileHandler()) @@ -132,39 +171,45 @@ def parse_thresholds(spec, include_units=True, units_required=True): def evaluate_numeric_thresholds(container, value, warn, crit, name, short_name, min=None, max=None, units='', greater_than=True): - perf_string = "{}_{}={}{};{};{}".format(container, short_name, value, units, warn, crit) + + # Some units don't have decimal places + rounded_value = int(value) if units in ['B','%', None] else round(value, 2) + + perf_string = "{}_{}={}{};{};{}".format(container, short_name, rounded_value, units, warn, crit) if min is not None: perf_string += ';{}'.format(min) if max is not None: perf_string += ';{}'.format(max) + global performance_data performance_data.append(perf_string) if greater_than: if value >= crit: - critical("{} {} is {}{}".format(container, name, value, units)) + critical("{} {} is {}{}".format(container, name, rounded_value, units)) elif value >= warn: - warning("{} {} is {}{}".format(container, name, value, units)) + warning("{} {} is {}{}".format(container, name, rounded_value, units)) else: - ok("{} {} is {}{}".format(container, name, value, units)) + ok("{} {} is {}{}".format(container, name, rounded_value, units)) else: if value <= crit: - critical("{} {} is {}{}".format(container, name, value, units)) + critical("{} {} is {}{}".format(container, name, rounded_value, units)) elif value <= warn: - warning("{} {} is {}{}".format(container, name, value, units)) + warning("{} {} is {}{}".format(container, name, rounded_value, units)) else: - ok("{} {} is {}{}".format(container, name, value, units)) + ok("{} {} is {}{}".format(container, name, rounded_value, units)) @lru_cache() def get_url(url): response = better_urllib_get.open(url, timeout=timeout) - return process_urllib_response(response), response.code + return process_urllib_response(response), response.status @lru_cache() def head_url(url, auth_token=None): if auth_token: better_urllib_head.addheaders.append(('Authorization', 'Bearer ' + auth_token)) + # Follow redirects response = better_urllib_head.open(url, timeout=timeout) if auth_token: better_urllib_head.addheaders.pop() @@ -189,11 +234,12 @@ def get_image_info(name): @lru_cache() -def get_manifest_auth_token(image_name, auth_source, registry='registry.docker.io', action='pull'): - url = "{auth_source}/token?service={registry}&scope=repository:{image_name}:{action}".format( - auth_source=auth_source, registry=registry, image_name=image_name, action=action) - logger.debug(url) - response, _ = get_url(url) +def get_manifest_auth_token(www_authenticate_header): + # TODO: Pass the error on to be handled better + assert 'Bearer realm=' in www_authenticate_header, "Auth header is not one I can process" + header_list = www_authenticate_header.replace('Bearer realm=', '').replace('"', '').split(',') + auth_url = header_list[0] + '?' + '&'.join(header_list[1:]) + response, _ = get_url(auth_url) return response['token'] @@ -205,6 +251,7 @@ def get_stats(container): content, _ = get_url(daemon + '/containers/{container}/stats?stream=0'.format(container=container)) return content + def get_containers(names, require_present): containers_list, _ = get_url(daemon + '/containers/json?all=1') all_container_names = set(x['Names'][0][1:] for x in containers_list) @@ -225,6 +272,63 @@ def get_containers(names, require_present): return filtered +def get_container_digest(container): + # find registry and tag + inspection = get_container_info(container) + image_id = inspection['Image'] + image_info = get_image_info(image_id) + try: + return image_info['RepoDigests'][0].split('@')[1] + except IndexError: + return None + + +def get_container_image_urls(container): + inspection = get_container_info(container) + image_id = inspection['Image'] + image_info = get_image_info(image_id) + return image_info['RepoTags'] + + +def normalize_image_name_to_manifest_url(image_name, insecure_registries): + parsed_url = parse_image_name(image_name) + + lower_insecure = [reg.lower() for reg in insecure_registries] + + # Registry query url + scheme = 'http' if parsed_url.registry.lower() in lower_insecure else 'https' + url = '{scheme}://{registry}/v2/{image_name}/manifests/{image_tag}'.format(scheme=scheme, + registry=parsed_url.registry, + image_name=parsed_url.name, + image_tag=parsed_url.tag) + return url, parsed_url.registry + + +def get_digest_from_registry(url): + # query registry + try: + registry_info = head_url(url=url) + except HTTPError as e: + if e.code == 401: # Convert unauthorized response to regular response + registry_info = e.fp + else: + raise e + + if registry_info.status == 401: # HTTP unauthorized + + # Find auth server + # TODO: Handle logging in if needed + www_authenticate_header = registry_info.headers.get('Www-Authenticate') + # Get auth token + token = get_manifest_auth_token(www_authenticate_header) + + # query registry again + registry_info = head_url(url=url, auth_token=token) + + registry_hash = registry_info.getheader('Docker-Content-Digest', None) + return registry_hash + + def set_rc(new_rc): global rc rc = new_rc if new_rc > rc else rc @@ -258,29 +362,73 @@ def require_running(name): func(container, *args, **kwargs) else: # container is not running, can't perform check - critical('{container} is not "running", cannot check {check}")'.format(container=container, - check=name)) + critical('{container} is not "running", cannot check {check}"'.format(container=container, + check=name)) return wrapper return inner_decorator +def parse_image_name(image_name): + """ + Parses image names into their constituent parts. + :param image_name: + :return: ImageName + """ + + # These are based on information found here + # https://docs.docker.com/engine/reference/commandline/tag/#extended-description + # https://github.com/docker/distribution/blob/master/reference/regexp.go + host_segment_re = '[a-zA-Z0-9]([a-zA-Z0-9-]*[a-zA-Z0-9])?' + hostname_re = '({host_segment}\.)+{host_segment}'.format(host_segment=host_segment_re) + registry_re = '((?P({hostname_re}(:\d+)?|{host_segment_re}:\d+))/)'.format( + host_segment_re=host_segment_re, hostname_re=hostname_re) + name_component_ends_re = '[a-z0-9]' + name_component_middle_re = '[a-z0-9._-]' # Ignoring spec limit of two _ + name_component_re = '({end}{middle}*{end}|{end})'.format(end=name_component_ends_re, + middle=name_component_middle_re) + image_name_re = "(?P({name_component}/)*{name_component})".format(name_component=name_component_re) + image_tag_re = '(?P[a-zA-Z0-9_][a-zA-Z0-9_.-]*)' + full_re = '^{registry}?{image_name}(:{image_tag})?$'.format(registry=registry_re, image_name=image_name_re, + image_tag=image_tag_re) + parsed = re.match(full_re, image_name) + + registry = parsed.group('registry') if parsed.group('registry') else DEFAULT_PUBLIC_REGISTRY + + image_name = parsed.group('image_name') + image_name = image_name if '/' in image_name or registry != DEFAULT_PUBLIC_REGISTRY else 'library/' + image_name + + image_tag = parsed.group('image_tag') + image_tag = image_tag if image_tag else 'latest' + + full_image_name = "{registry}/{image_name}:{image_tag}".format( + registry=registry, + image_name=image_name, + image_tag=image_tag) + + return ImageName(registry=registry, name=image_name, tag=image_tag, full_name=full_image_name) + + # Checks ############################################################################################# @require_running(name='memory') def check_memory(container, warn, crit, units): - assert units in UNIT_ADJUSTMENTS, "Invalid memory units" + if not units in unit_adjustments: + unknown("Memory units must be one of {}".format(list(unit_adjustments.keys()))) + return inspection = get_stats(container) + # Subtracting cache to match `docker stats` does. + adjusted_usage = inspection['memory_stats']['usage'] - inspection['memory_stats']['stats']['total_cache'] if units == '%': max = 100 - usage = int(100 * inspection['memory_stats']['usage'] / inspection['memory_stats']['limit']) + usage = int(100 * adjusted_usage / inspection['memory_stats']['limit']) else: - max = inspection['memory_stats']['limit'] / UNIT_ADJUSTMENTS[units] - usage = inspection['memory_stats']['usage'] / UNIT_ADJUSTMENTS[units] + max = inspection['memory_stats']['limit'] / unit_adjustments[units] + usage = adjusted_usage / unit_adjustments[units] evaluate_numeric_thresholds(container=container, value=usage, warn=warn, crit=crit, units=units, name='memory', short_name='mem', min=0, max=max) @@ -345,35 +493,39 @@ def check_restarts(container, warn, crit, units=None): short_name='re', min=0, max=graph_padding) -def check_version(container): - # find registry and tag - inspection = get_container_info(container) - image_id = inspection['Image'] - image_inspection = get_image_info(image_id) - image_tag = image_inspection['RepoTags'][0] - try: - image_digest = image_inspection['RepoDigests'][0].split('@')[1] - except IndexError: +def check_version(container, insecure_registries): + image_digest = get_container_digest(container) + if image_digest is None: unknown('Checksum missing for "{}", try doing a pull'.format(container)) return - registry = DEFAULT_PUBLIC_REGISTRY - full_image_tag = 'library/' + image_tag - - image_name, image_version = full_image_tag.split(':') + image_urls = get_container_image_urls(container=container) + if len(image_urls) > 1: + unknown('"{}" has multiple tags/names. Unsure which one to use to check the version.'.format(container)) + return + elif len(image_urls) == 0: + unknown('"{}" has last no repository tag. Is this anywhere else?'.format(container)) + return - token = get_manifest_auth_token(image_name, DEFAULT_PUBLIC_AUTH) + url, registry = normalize_image_name_to_manifest_url(image_urls[0], insecure_registries) - # query registry - url = '{registry}/v2/{image_name}/manifests/{image_version}'.format(registry=registry, image_name=image_name, - image_version=image_version) - reg_info = head_url(url=url, auth_token=token) + try: + registry_hash = get_digest_from_registry(url) + except URLError as e: + if e.reason.reason == 'UNKNOWN_PROTOCOL': + unknown( + "TLS error connecting to registry {} for {}, should you use the '--insecure-registry' flag?" \ + .format(registry, container)) + return - registry_hash = reg_info.getheader('Docker-Content-Digest', None) if registry_hash is None: - raise IndexError('Docker-Content-Digest header missing, cannot check version') - if registry_hash != image_digest: - critical("{} is out of date".format(container)) + unknown("Cannot check version, Registry didn't return 'Docker-Content-Digest header for {} while checking {}." \ + .format(container, url)) + return + if registry_hash == image_digest: + ok("{}'s version matches registry".format(container)) + return + critical("{}'s version does not match registry".format(container)) def calculate_cpu_capacity_precentage(info, stats): @@ -444,6 +596,20 @@ def process_args(args): metavar='[:]', help='Where to find TLS protected docker daemon socket.') + base_group = parser.add_mutually_exclusive_group() + base_group.add_argument('--binary_units', + dest='units_base', + action='store_const', + const=1024, + help='Use a base of 1024 when doing calculations of KB, MB, GB, & TB (This is default)') + + base_group.add_argument('--decimal_units', + dest='units_base', + action='store_const', + const=1000, + help='Use a base of 1000 when doing calculations of KB, MB, GB, & TB') + parser.set_defaults(units_base=1024) + # Connection timeout parser.add_argument('--timeout', dest='timeout', @@ -482,7 +648,7 @@ def process_args(args): action='store', type=str, metavar='WARN:CRIT:UNITS', - help='Check memory usage taking into account any limits. Valid values for units are %%,b,k,m,g.') + help='Check memory usage taking into account any limits. Valid values for units are %%,B,KB,MB,GB.') # State parser.add_argument('--status', @@ -511,7 +677,16 @@ def process_args(args): dest='version', default=None, action='store_true', - help='Check if the running images are the same version as those in the registry. Useful for finding stale images. Only works with public registry.') + help='Check if the running images are the same version as those in the registry. Useful for finding stale images. Does not support login.') + + # Version + parser.add_argument('--insecure-registries', + dest='insecure_registries', + action='store', + nargs='+', + type=str, + default=[], + help='List of registries to connect to with http(no TLS). Useful when using "--version" with images from insecure registries.') # Restart parser.add_argument('--restarts', @@ -572,6 +747,10 @@ def print_results(): def perform_checks(raw_args): args = process_args(raw_args) + + global unit_adjustments + unit_adjustments = {key: args.units_base ** value for key, value in UNIT_ADJUSTMENTS_TEMPLATE.items()} + if socketfile_permissions_failure(args): unknown("Cannot access docker socket file. User ID={}, socket file={}".format(os.getuid(), args.connection)) elif no_checks_present(args): @@ -593,7 +772,7 @@ def perform_checks(raw_args): # Check version if args.version: - check_version(container) + check_version(container, args.insecure_registries) # below are checks that require a 'running' status @@ -618,10 +797,12 @@ def perform_checks(raw_args): check_restarts(container, *parse_thresholds(args.restarts, include_units=False)) except Exception as e: - unknown("Exception raised during check: {}".format(repr(e))) + traceback.print_exc() + unknown("Exception raised during check': {}".format(repr(e))) print_results() + if __name__ == '__main__': perform_checks(argv[1:]) exit(rc) diff --git a/check_swarm b/check_swarm index f3afe9a..f4db9fa 100755 --- a/check_swarm +++ b/check_swarm @@ -16,7 +16,7 @@ __author__ = 'Tim Laurence' __copyright__ = "Copyright 2017" __credits__ = ['Tim Laurence'] __license__ = "GPL" -__version__ = "1.0.5" +__version__ = "2.0.0" ''' nrpe compatible check for docker swarm @@ -91,7 +91,7 @@ better_urllib_head.add_handler(SocketFileHandler()) @lru_cache() def get_url(url): response = better_urllib_get.open(url, timeout=timeout) - return process_urllib_response(response), response.code + return process_urllib_response(response), response.status def process_urllib_response(response): diff --git a/test_check_docker.py b/test_check_docker.py index aee7f3f..9d08c02 100644 --- a/test_check_docker.py +++ b/test_check_docker.py @@ -1,1102 +1,749 @@ -import argparse import json -import sys from io import BytesIO import stat from datetime import datetime, timezone, timedelta -import unittest from unittest.mock import patch from urllib.error import HTTPError -from pyfakefs import fake_filesystem_unittest +import pytest from importlib.machinery import SourceFileLoader from urllib import request -__author__ = 'tim' +cd = SourceFileLoader('check_docker', './check_docker').load_module() -# This is needed because `check_docker` does not end a a .py so it won't be found by default1 -check_docker = SourceFileLoader('check_docker', './check_docker').load_module() +__author__ = 'tim' class FakeHttpResponse(BytesIO): - def __init__(self, content, http_code): - self.code = http_code + def __init__(self, content, http_code, headers=None): + self.status = http_code + self.headers = headers if headers else {} super(FakeHttpResponse, self).__init__(content) + def getheader(self, header, default): + return self.headers.get(header, default) + + +@pytest.fixture +def check_docker(): + # This is needed because `check_docker` does not end a a .py so it won't be found by default + check_docker = SourceFileLoader('check_docker', './check_docker').load_module() + check_docker.rc = -1 + check_docker.timeout = 1 + check_docker.messages = [] + check_docker.performance_data = [] + check_docker.daemon = 'socket:///notreal' + check_docker.get_url.cache_clear() + + return check_docker + + +@pytest.fixture +def check_docker_with_units(check_docker): + check_docker.unit_adjustments = {key: 1024 ** value for key, value in + check_docker.UNIT_ADJUSTMENTS_TEMPLATE.items()} + return check_docker + + +def test_get_url(check_docker, monkeypatch): + obj = {'foo': 'bar'} + encoded = json.dumps(obj=obj).encode('utf-8') + expected_response = FakeHttpResponse(content=encoded, http_code=200) + + def mock_open(*args, **kwargs): + return expected_response + + monkeypatch.setattr(check_docker.better_urllib_get, 'open', value=mock_open) + response, _ = check_docker.get_url(url='/test') + assert response == obj + + +@pytest.mark.parametrize("func", [ + 'get_stats', + 'get_state', + 'get_image_info' +]) +def test_get_url_calls(check_docker, func): + # TODO + with patch('check_docker.get_url', return_value=({'State': 'State'}, 200)) as patched: + getattr(check_docker, func)('container') + assert patched.call_count == 1 + + +@pytest.mark.parametrize("value, rc, messages, perf_data", [ + (1, cd.OK_RC, ['OK: container metric is 1B'], ['container_met=1B;2;3;0;10']), + (2, cd.WARNING_RC, ['WARNING: container metric is 2B'], ['container_met=2B;2;3;0;10']), + (3, cd.CRITICAL_RC, ['CRITICAL: container metric is 3B'], ['container_met=3B;2;3;0;10']) +]) +def test_evaluate_numeric_thresholds(check_docker, value, rc, messages, perf_data): + check_docker.evaluate_numeric_thresholds(container='container', + value=value, + warn=2, + crit=3, + name='metric', + short_name='met', + min=0, + max=10, + units='B' + ) + assert check_docker.rc == rc + assert check_docker.messages == messages + assert check_docker.performance_data == perf_data + + +@pytest.mark.parametrize('func,arg,rc,messages', + ( + ('ok', "OK test", cd.OK_RC, ['OK: OK test']), + ('warning', "WARN test", cd.WARNING_RC, ['WARNING: WARN test']), + ('critical', "CRIT test", cd.CRITICAL_RC, ['CRITICAL: CRIT test']), + ('unknown', "UNKNOWN test", cd.UNKNOWN_RC, ['UNKNOWN: UNKNOWN test']), + )) +def test_status_update(check_docker, func, arg, rc, messages): + getattr(check_docker, func)(arg) + assert check_docker.rc == rc + assert check_docker.messages == messages + + +@pytest.mark.parametrize('input, units_required, expected', ( + ('1:2:3', True, (1, 2, '3')), + ('1:2', False, (1, 2, None)), + ('1:2:3', False, (1, 2, '3')), + +)) +def test_parse_thresholds(check_docker, input, units_required, expected): + result = check_docker.parse_thresholds(input, units_required=units_required) + assert expected == tuple(result) + + +@pytest.mark.parametrize('spec, kwargs, exception', ( + ('1:2', {}, ValueError), + ('1:2:b', {'include_units': False}, ValueError), + ('1:2', {'include_units': True}, ValueError), + ("1", {}, IndexError), + (":1", {}, ValueError), + (":1:c", {}, ValueError), + ("1:", {}, ValueError), + ("1::c", {}, ValueError), + ('1:2:', {'units_required': True}, ValueError), + ("a:1:c", {}, ValueError), + ("1:b:c", {}, ValueError), +) + ) +def test_parse_thresholds_exceptions(check_docker, spec, kwargs, exception): + with pytest.raises(exception): + check_docker.parse_thresholds(spec, **kwargs) + + +def test_set_rc(check_docker): + # Can I do a basic set + check_docker.set_rc(check_docker.OK_RC) + assert check_docker.rc == check_docker.OK_RC + + # Does it prevent downgrades of rc + check_docker.set_rc(check_docker.WARNING_RC) + assert check_docker.rc == check_docker.WARNING_RC + check_docker.set_rc(check_docker.OK_RC) + assert check_docker.rc == check_docker.WARNING_RC + + +@pytest.mark.parametrize('response, expected_status', ( + ({'State': {'Running': True}}, cd.OK_RC), + ({'State': {'Status': 'stopped'}}, cd.CRITICAL_RC), + ({'State': {'Running': False}}, cd.CRITICAL_RC), + ({'State': {'foo': False}}, cd.UNKNOWN_RC) +)) +def test_check_status(monkeypatch, check_docker, response, expected_status): + def mock_response(*args, **kwargs): + encoded = json.dumps(obj=response).encode('utf-8') + return FakeHttpResponse(encoded, 200) + + monkeypatch.setattr(check_docker.better_urllib_get, 'open', value=mock_response) + check_docker.check_status(container='container', desired_state='running') + assert check_docker.rc == expected_status + + +@pytest.mark.parametrize('response, expected_status', ( + ({'State': {'Health': {'Status': 'healthy'}, 'Running': True}}, cd.OK_RC), + ({'State': {'Health': {'Status': 'unhealthy'}, 'Running': True}}, cd.CRITICAL_RC), + ({'State': {'Running': True}}, cd.UNKNOWN_RC), + ({'State': {'Health': {}, 'Running': True}}, cd.UNKNOWN_RC), + ({'State': {'Health': {'Status': 'starting'}, 'Running': True}}, cd.UNKNOWN_RC) +)) +def test_check_health(monkeypatch, check_docker, response, expected_status): + def mock_response(*args, **kwargs): + encoded = json.dumps(obj=response).encode('utf-8') + return FakeHttpResponse(encoded, 200) + + monkeypatch.setattr(check_docker.better_urllib_get, 'open', value=mock_response) + check_docker.check_health(container='container') + assert check_docker.rc == expected_status + + +@pytest.mark.parametrize('memory_stats, warn, crit, units, expected_status', ( + ({'limit': 10, 'usage': 1, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.OK_RC), + ({'limit': 10, 'usage': 2, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.WARNING_RC), + ({'limit': 10, 'usage': 3, 'stats': {'total_cache': 1}}, 1, 2, 'B', cd.CRITICAL_RC), + ({'limit': 10, 'usage': 1, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.OK_RC), + ({'limit': 10, 'usage': 3, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.WARNING_RC), + ({'limit': 10, 'usage': 4, 'stats': {'total_cache': 1}}, 20, 30, '%', cd.CRITICAL_RC), + ({'limit': 10, 'usage': 4, 'stats': {'total_cache': 1}}, 20, 30, 'BAD_UNITS', cd.UNKNOWN_RC), +)) +def test_check_memory(monkeypatch, check_docker_with_units, memory_stats, warn, crit, units, expected_status): + response = { + 'memory_stats': memory_stats, + 'State': {'Running': True} + } + + def mock_response(*args, **kwargs): + encoded = json.dumps(obj=response).encode('utf-8') + return FakeHttpResponse(encoded, 200) + + monkeypatch.setattr(check_docker_with_units.better_urllib_get, 'open', value=mock_response) + check_docker_with_units.check_memory(container='container', warn=warn, crit=crit, units=units) + assert check_docker_with_units.rc == expected_status + + +cpu_param_fields = 'host_config, cpu_stats, precpu_stats, warn, crit, expected_status, exspected_percent' +cpu_parm_tests = (({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'percpu_usage': [15], 'total_usage': 15}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0}, + 10, 20, cd.OK_RC, 5), + ({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'percpu_usage': [25], 'total_usage': 25}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0}, + 10, 20, cd.WARNING_RC, 15), + ({"NanoCpus": 1000000000, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0}, + 10, 20, cd.CRITICAL_RC, 25), + ({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 10000}, + {'cpu_usage': {'percpu_usage': [15], 'total_usage': 15}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0}, + 10, 20, cd.CRITICAL_RC, 50), + ({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'online_cpus': 1, 'system_cpu_usage': 0}, + 10, 20, cd.CRITICAL_RC, 25), + ({"NanoCpus": 0, "CpuPeriod": 1, "CpuQuota": 2}, + {'cpu_usage': {'percpu_usage': [35], 'total_usage': 35}, 'online_cpus': 1, 'system_cpu_usage': 100}, + {'cpu_usage': {'percpu_usage': [10], 'total_usage': 10}, 'system_cpu_usage': 0}, + 10, 20, cd.CRITICAL_RC, 25), + ({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'total_usage': 36}, 'online_cpus': 2, 'system_cpu_usage': 200}, + {'cpu_usage': {'total_usage': 10}, 'system_cpu_usage': 0}, + 10, 20, cd.WARNING_RC, 13), + ({"NanoCpus": 0, "CpuPeriod": 0, "CpuQuota": 0}, + {'cpu_usage': {'percpu_usage': [35, 1], 'total_usage': 36}, 'system_cpu_usage': 200}, + {'cpu_usage': {'total_usage': 10}, 'system_cpu_usage': 0}, + 10, 20, cd.WARNING_RC, 13 + ) + ) + + +@pytest.mark.parametrize(cpu_param_fields, cpu_parm_tests) +def test_check_cpu(monkeypatch, check_docker, host_config, cpu_stats, precpu_stats, warn, crit, expected_status, + exspected_percent): + container_stats = { + 'cpu_stats': cpu_stats, + 'precpu_stats': precpu_stats + } + container_info = { + 'State': {'Running': True}, + "HostConfig": host_config + } + + def mock_stats_response(*args, **kwargs): + return container_stats + + def mock_info_response(*args, **kwargs): + return container_info + + monkeypatch.setattr(check_docker, 'get_stats', value=mock_stats_response) + monkeypatch.setattr(check_docker, 'get_container_info', value=mock_info_response) + + check_docker.check_cpu(container='container', warn=warn, crit=crit) + assert check_docker.rc == expected_status + + +@pytest.mark.parametrize(cpu_param_fields, cpu_parm_tests) +def test_calculate_cpu(check_docker, host_config, cpu_stats, precpu_stats, warn, crit, expected_status, + exspected_percent): + container_stats = { + 'cpu_stats': cpu_stats, + 'precpu_stats': precpu_stats + } + container_info = { + 'State': {'Running': True}, + "HostConfig": host_config + } + + pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) + assert pecentage == exspected_percent + + +def test_require_running(check_docker, monkeypatch): + """ This confirms the 'require_running decorator is working properly with a stopped container""" + container_info = {'RestartCount': 0, 'State': {'Running': False}} + + def mock_info_response(*args, **kwargs): + return container_info + + monkeypatch.setattr(check_docker, 'get_container_info', value=mock_info_response) + + check_docker.check_restarts(container='container', warn=1, crit=2) + assert check_docker.rc == check_docker.CRITICAL_RC + + +@pytest.mark.parametrize("restarts, exspected_status", ( + (0, cd.OK_RC), + (1, cd.WARNING_RC), + (3, cd.CRITICAL_RC), +)) +def test_restarts(check_docker, monkeypatch, restarts, exspected_status): + container_info = {'RestartCount': restarts, 'State': {'Running': True}} + + def mock_info_response(*args, **kwargs): + return container_info + + monkeypatch.setattr(check_docker, 'get_container_info', value=mock_info_response) + + check_docker.check_restarts(container='container', warn=1, crit=2) + assert check_docker.rc == exspected_status + + +@pytest.mark.parametrize("uptime, warn, crit, exspected_status", ( + (timedelta(seconds=0), 10, 5, cd.CRITICAL_RC), + (timedelta(seconds=9), 10, 1, cd.WARNING_RC), + (timedelta(seconds=10), 2, 1, cd.OK_RC), + (timedelta(days=1, seconds=0), 2, 1, cd.OK_RC) +)) +def test_check_uptime1(monkeypatch, check_docker, uptime, warn, crit, exspected_status): + time = datetime.now(tz=timezone.utc) - uptime + time_str = time.strftime("%Y-%m-%dT%H:%M:%S.0000000000Z") + json_results = { + 'State': {'StartedAt': time_str, + 'Running': True}, + } + + def mock_response(*args, **kwargs): + encoded = json.dumps(obj=json_results).encode('utf-8') + return FakeHttpResponse(encoded, 200) + + monkeypatch.setattr(check_docker.better_urllib_get, 'open', value=mock_response) + + check_docker.check_uptime(container='container', warn=warn, crit=crit) + assert check_docker.rc == exspected_status + -class TestUtil(unittest.TestCase): - def test_get_url(self): - obj = {'foo': 'bar'} - encoded = json.dumps(obj=obj).encode('utf-8') - expected_response = FakeHttpResponse(content=encoded, http_code=200) - with patch('check_docker.better_urllib_get.open', return_value=expected_response): - response, _ = check_docker.get_url(url='/test') - self.assertDictEqual(response, obj) - - def test_get_stats(self): - with patch('check_docker.get_url', return_value=([], 200)) as patched: - check_docker.get_stats('container') - self.assertEqual(patched.call_count, 1) - - def test_get_state(self): - with patch('check_docker.get_url', return_value=({'State': {}}, 200)) as patched: - check_docker.get_state('container') - self.assertEqual(patched.call_count, 1) - - def test_get_get_image_info(self): - with patch('check_docker.get_url', return_value=([], 200)) as patched: - check_docker.get_image_info('container') - self.assertEqual(patched.call_count, 1) - - -class TestReporting(unittest.TestCase): - def setUp(self): - check_docker.rc = -1 - check_docker.messages = [] - check_docker.performance_data = [] - - def test_evaluate_numeric_thresholds_ok(self): - # Test OK - check_docker.evaluate_numeric_thresholds(container='container', - value=1, - warn=2, - crit=3, - name='metric', - short_name='met', - min=0, - max=10, - units='b' - ) - self.assertEqual(check_docker.rc, check_docker.OK_RC, "Incorrect return code") - self.assertListEqual(check_docker.messages, ['OK: container metric is 1b']) - self.assertListEqual(check_docker.performance_data, ['container_met=1b;2;3;0;10']) - - def test_evaluate_numeric_thresholds_warn(self): - # Test warn - check_docker.evaluate_numeric_thresholds(container='container', - value=2, - warn=2, - crit=3, - name='metric', - short_name='met', - min=0, - max=10, - units='b' - ) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC, "Incorrect return code") - self.assertListEqual(check_docker.messages, ['WARNING: container metric is 2b']) - self.assertListEqual(check_docker.performance_data, ['container_met=2b;2;3;0;10']) - - def test_evaluate_numeric_thresholds_crit(self): - # Test crit - check_docker.evaluate_numeric_thresholds(container='container', - value=3, - warn=2, - crit=3, - name='metric', - short_name='met', - min=0, - max=10, - units='b' - ) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC, "Incorrect return code") - self.assertListEqual(check_docker.messages, ['CRITICAL: container metric is 3b']) - self.assertListEqual(check_docker.performance_data, ['container_met=3b;2;3;0;10']) - - def test_ok(self): - check_docker.ok("OK test") - self.assertEqual(check_docker.rc, check_docker.OK_RC) - self.assertListEqual(check_docker.messages, ['OK: OK test']) - - def test_warn(self): - check_docker.warning("WARN test") - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - self.assertListEqual(check_docker.messages, ['WARNING: WARN test']) - - def test_crit(self): - check_docker.critical("CRIT test") - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - self.assertListEqual(check_docker.messages, ['CRITICAL: CRIT test']) - - def test_unknown(self): - check_docker.unknown("UNKNOWN test") - self.assertEqual(check_docker.rc, check_docker.UNKNOWN_RC) - self.assertListEqual(check_docker.messages, ['UNKNOWN: UNKNOWN test']) - - def test_parse_thresholds_with_units(self): - a = check_docker.parse_thresholds('1:2:3') - self.assertTupleEqual(tuple(a), (1, 2, '3')) - - def test_parse_thresholds_with_missing_units(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, '1:2') - - def test_parse_thresholds_with_units_when_disabled(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, '1:2:b', include_units=False) - - def test_parse_thresholds_missing_units_when_optional(self): - a = check_docker.parse_thresholds('1:2', units_required=False) - self.assertTupleEqual(tuple(a), (1, 2, None)) - - def test_parse_thresholds_with_units_when_optional(self): - a = check_docker.parse_thresholds('1:2:3', units_required=False) - self.assertTupleEqual(tuple(a), (1, 2, '3')) - - def test_parse_thresholds_missing_units_when_not_optional(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, '1:2', units_required=True) - - def test_parse_thresholds_with_units_when_not_optional(self): - a = check_docker.parse_thresholds('1:2:3', units_required=True) - self.assertTupleEqual(tuple(a), (1, 2, '3')) - - def test_parse_thresholds_missing_crit(self): - self.assertRaises(IndexError, check_docker.parse_thresholds, "1") - - def test_parse_thresholds_blank_warn(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, ":1") - self.assertRaises(ValueError, check_docker.parse_thresholds, ":1:c") - - def test_parse_thresholds_blank_crit(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, "1:") - self.assertRaises(ValueError, check_docker.parse_thresholds, "1::c") - - def test_parse_thresholds_blank_units(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, '1:2:', units_required=True) - - def test_parse_thresholds_str_warn(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, "a:1:c") - - def test_parse_thresholds_str_crit(self): - self.assertRaises(ValueError, check_docker.parse_thresholds, "1:b:c") - - def test_set_rc(self): - # Can I do a basic set - check_docker.set_rc(check_docker.OK_RC) - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - # Does it prevent downgrades of rc - check_docker.set_rc(check_docker.WARNING_RC) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - check_docker.set_rc(check_docker.OK_RC) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - -class TestChecks(fake_filesystem_unittest.TestCase): - def setUp(self): - check_docker.rc = -1 - check_docker.messages = [] - check_docker.performance_data = [] - check_docker.get_url.cache_clear() - - def test_check_status1(self): - json_results = { - 'State': {'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_status(container='container', desired_state='running') - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_check_status2(self): - json_results = { - 'State': {'Status': 'stopped'}, - 'State': {'Status': 'stopped'}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_status(container='container', desired_state='running') - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - # This how older docker engines display state - def test_check_status3(self): - json_results = { - 'State': {'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_status(container='container', desired_state='running') - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - # This how older docker engines display state - def test_check_status4(self): - json_results = { - 'State': {'Running': False}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_status(container='container', desired_state='running') - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - # This how older docker engines display state - def test_check_status5(self): - json_results = { - 'State': {'foo': False}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_status(container='container', desired_state='running') - self.assertEqual(check_docker.rc, check_docker.UNKNOWN_RC) - - def test_check_health1(self): - json_results = { - 'State': {'Health': {'Status': 'healthy'}, 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_health(container='container') - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_check_health2(self): - json_results = { - 'State': {'Health': {'Status': 'unhealthy'}, 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_health(container='container') - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_check_health3(self): - json_results = { - 'State': {'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_health(container='container') - self.assertEqual(check_docker.rc, check_docker.UNKNOWN_RC) - - def test_check_health4(self): - json_results = { - 'State': {'Health': {}, 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_health(container='container') - self.assertEqual(check_docker.rc, check_docker.UNKNOWN_RC) - - def test_check_health5(self): - json_results = { - 'State': {'Health': {'Status': 'starting'}, 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_health(container='container') - self.assertEqual(check_docker.rc, check_docker.UNKNOWN_RC) - - def test_check_memory1(self): - container_info = { - 'State': {'Running': True}, - 'memory_stats': {'limit': 10, - 'usage': 0 - } - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=1, crit=2, units='b') - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_check_memory2(self): - container_info = { - 'memory_stats': {'limit': 10, - 'usage': 1 - }, - 'State': {'Running': True} - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=1, crit=2, units='b') - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_check_memory3(self): - container_info = { - 'memory_stats': {'limit': 10, - 'usage': 2 - }, - 'State': {'Running': True} - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=1, crit=2, units='b') - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_check_memory4(self): - container_info = { - 'memory_stats': {'limit': 10, - 'usage': 1 - }, - 'State': {'Running': True} - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=20, crit=30, units='%') - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_check_memory5(self): - container_info = { - 'memory_stats': {'limit': 10, - 'usage': 2 - }, - 'State': {'Running': True} - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=20, crit=30, units='%') - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_check_memory6(self): - container_info = { - 'memory_stats': {'limit': 10, - 'usage': 3 - }, - 'State': {'Running': True} - } - - with patch('check_docker.get_url', return_value=(container_info, 200)): - check_docker.check_memory(container='container', warn=20, crit=30, units='%') - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_check_cpu1(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [15], - 'total_usage': 15}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - } - - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_calculate_cpu1(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [15], - 'total_usage': 15}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 5) - - def test_check_cpu2(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [25], - 'total_usage': 25}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 15) - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_calculate_cpu2(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [25], - 'total_usage': 25}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 15) - - def test_check_cpu3(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_calculate_cpu3(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 1000000000, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 25) - - def test_check_cpu4(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [15], - 'total_usage': 15}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 10000, - } - - } - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_calculate_cpu4(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [15], - 'total_usage': 15}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 10000, - } - - } - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 50) - - def test_check_cpu5(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_calculate_cpu5(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'online_cpus': 1, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 25) - - def test_check_cpu6(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 1, - "CpuQuota": 2, - } - - } - - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_calculate_cpu6(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35], - 'total_usage': 35}, - 'online_cpus': 1, - 'system_cpu_usage': 100}, - 'precpu_stats': {'cpu_usage': {'percpu_usage': [10], - 'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 1, - "CpuQuota": 2, - } - - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 25) - - def test_check_cpu7(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'total_usage': 36}, - 'online_cpus': 2, - 'system_cpu_usage': 200}, - 'precpu_stats': {'cpu_usage': {'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_calculate_cpu7(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'total_usage': 36}, - 'online_cpus': 2, - 'system_cpu_usage': 200}, - 'precpu_stats': {'cpu_usage': {'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 13) - - def test_check_cpu8(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35, 1], - 'total_usage': 36}, - 'system_cpu_usage': 200}, - 'precpu_stats': {'cpu_usage': {'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - with patch('check_docker.get_container_info', return_value=container_info): - with patch('check_docker.get_stats', return_value=container_stats): - check_docker.check_cpu(container='container', warn=10, crit=20) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_calculate_cpu8(self): - container_stats = { - 'cpu_stats': {'cpu_usage': {'percpu_usage': [35, 1], - 'total_usage': 36}, - 'system_cpu_usage': 200}, - 'precpu_stats': {'cpu_usage': {'total_usage': 10}, - 'system_cpu_usage': 0, - } - } - container_info = { - 'State': {'Running': True}, - "HostConfig": { - "NanoCpus": 0, - "CpuPeriod": 0, - "CpuQuota": 0, - } - - } - - pecentage = check_docker.calculate_cpu_capacity_precentage(info=container_info, stats=container_stats) - self.assertEqual(pecentage, 13) - - def test_require_running(self): - """ This the 'require_running decorator is working properly with a stopped container""" - container_info = {'RestartCount': 0, 'State': {'Running': False}} - - with patch('check_docker.get_container_info', return_value=container_info): - check_docker.check_restarts(container='container', warn=1, crit=2) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_restarts1(self): - container_info = {'RestartCount': 0, 'State': {'Running': True}} - - with patch('check_docker.get_container_info', return_value=container_info): - check_docker.check_restarts(container='container', warn=1, crit=2) - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_restarts2(self): - container_info = {'RestartCount': 1, 'State': {'Running': True}} - - with patch('check_docker.get_container_info', return_value=container_info): - check_docker.check_restarts(container='container', warn=1, crit=2) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_restarts3(self): - container_info = {'RestartCount': 3, 'State': {'Running': True}} - - with patch('check_docker.get_container_info', return_value=container_info): - check_docker.check_restarts(container='container', warn=1, crit=2) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_check_uptime1(self): - now_string = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") - now_string += ".0000000000Z" - json_results = { - 'State': {'StartedAt': now_string, - 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_uptime(container='container', warn=10, crit=5) - self.assertEqual(check_docker.rc, check_docker.CRITICAL_RC) - - def test_check_uptime2(self): - ten = timedelta(seconds=10) - then = datetime.now(tz=timezone.utc) - ten - now_string = then.strftime("%Y-%m-%dT%H:%M:%S") - now_string += ".0000000000Z" - json_results = { - 'State': {'StartedAt': now_string, - 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_uptime(container='container', warn=20, crit=1) - self.assertEqual(check_docker.rc, check_docker.WARNING_RC) - - def test_check_uptime3(self): - ten = timedelta(seconds=10) - then = datetime.now(tz=timezone.utc) - ten - now_string = then.strftime("%Y-%m-%dT%H:%M:%S") - now_string += ".0000000000Z" - json_results = { - 'State': {'StartedAt': now_string, - 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_uptime(container='container', warn=2, crit=1) - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - def test_check_uptime4(self): - ten = timedelta(days=1, seconds=0) - then = datetime.now(tz=timezone.utc) - ten - now_string = then.strftime("%Y-%m-%dT%H:%M:%S") - now_string += ".0000000000Z" - json_results = { - 'State': {'StartedAt': now_string, - 'Running': True}, - } - with patch('check_docker.get_url', return_value=(json_results, 200)): - check_docker.check_uptime(container='container', warn=2, crit=1) - self.assertEqual(check_docker.rc, check_docker.OK_RC) - - -class TestArgs(unittest.TestCase): - sample_containers_json = [ +@pytest.fixture +def sample_containers_json(): + return [ {'Names': ['/thing1']}, {'Names': ['/thing2']} ] - def setUp(self): - check_docker.rc = -1 - - def test_args_restart(self): - args = ('--restarts', 'non-default') - result = check_docker.process_args(args=args) - self.assertEqual(result.restarts, 'non-default') - - def test_args_status(self): - args = ('--status', 'non-default') - result = check_docker.process_args(args=args) - self.assertEqual(result.status, 'non-default') - - def test_args_memory(self): - args = ('--memory', 'non-default') - result = check_docker.process_args(args=args) - self.assertEqual(result.memory, 'non-default') - - def test_args_containers(self): - args = ('--containers', 'non-default') - result = check_docker.process_args(args=args) - self.assertListEqual(result.containers, ['non-default']) - - def test_args_containers_blank(self): - args = ('--containers',) - try: - self.assertRaises(argparse.ArgumentError, check_docker.process_args, args=args) - except SystemExit: # Argument failures exit as well - pass - - def test_args_present(self): - result = check_docker.process_args(args=()) - self.assertFalse(result.present) - args = ('--present',) - result = check_docker.process_args(args=args) - self.assertTrue(result.present) - - def test_args_timeout(self): - args = ('--timeout', '9999') - result = check_docker.process_args(args=args) - self.assertEqual(result.timeout, 9999.0) - - def test_args_connection(self): - args = ('--connection', '/foo') - result = check_docker.process_args(args=args) - self.assertEqual(result.connection, '/foo') - self.assertEqual(check_docker.daemon, 'socket:///foo:') - - args = ('--connection', 'foo.com/bar') - result = check_docker.process_args(args=args) - self.assertEqual(result.connection, 'foo.com/bar') - self.assertEqual(check_docker.daemon, 'http://foo.com/bar') - - def test_args_secure_connection(self): - args = ('--secure-connection', 'non-default') - result = check_docker.process_args(args=args) - self.assertEqual(result.secure_connection, 'non-default') - - args = ('--secure-connection', 'foo.com/bar') - result = check_docker.process_args(args=args) - self.assertEqual(result.secure_connection, 'foo.com/bar') - self.assertEqual(check_docker.daemon, 'https://foo.com/bar') - - def test_args_mixed_connection(self): - args = ('--connection', 'non-default', '--secure-connection', 'non-default') - try: - self.assertRaises(argparse.ArgumentError, check_docker.process_args, args) - except SystemExit: # Argument failures exit as well - pass - - def test_missing_check(self): - args = tuple() - result = check_docker.process_args(args=args) - self.assertTrue(check_docker.no_checks_present(result)) - - def test_present_check(self): - args = ('--status', 'running') - result = check_docker.process_args(args=args) - self.assertFalse(check_docker.no_checks_present(result)) - - def test_get_containers_1(self): - with patch('check_docker.get_url', return_value=(self.sample_containers_json, 200)): - container_list = check_docker.get_containers('all', False) - self.assertSetEqual(container_list, {'thing1', 'thing2'}) - - def test_get_containers_2(self): - with patch('check_docker.get_url', return_value=(self.sample_containers_json, 200)): - container_list = check_docker.get_containers(['thing.*'], False) - self.assertSetEqual(container_list, {'thing1', 'thing2'}) - - def test_get_containers_3(self): - with patch('check_docker.get_url', return_value=(self.sample_containers_json, 200)): - with patch('check_docker.unknown') as patched: - container_list = check_docker.get_containers({'foo'}, False) - self.assertSetEqual(container_list, set()) - self.assertEqual(patched.call_count, 0) - - def test_get_containers_4(self): - with patch('check_docker.get_url', return_value=(self.sample_containers_json, 200)): - with patch('check_docker.critical') as patched: - container_list = check_docker.get_containers({'foo'}, True) - self.assertSetEqual(container_list, set()) - self.assertEqual(patched.call_count, 1) - - -class TestSocket(fake_filesystem_unittest.TestCase): - def setUp(self): - check_docker.rc = -1 - check_docker.messages = [] - check_docker.performance_data = [] - self.setUpPyfakefs() - - def test_socketfile_failure_false(self): - self.fs.CreateFile('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666)) - args = ('--status', 'running', '--connection', '/tmp/socket') - result = check_docker.process_args(args=args) - self.assertFalse(check_docker.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_filetype(self): - self.fs.CreateFile('/tmp/not_socket', contents='testing') - args = ('--status', 'running', '--connection', '/tmp/not_socket') - result = check_docker.process_args(args=args) - self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_missing(self): - args = ('--status', 'running', '--connection', '/tmp/missing') - result = check_docker.process_args(args=args) - self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_unwriteable(self): - self.fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--status', 'running', '--connection', '/tmp/unwritable') - result = check_docker.process_args(args=args) - self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_unreadable(self): - self.fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--status', 'running', '--connection', '/tmp/unreadable') - result = check_docker.process_args(args=args) - self.assertTrue(check_docker.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_http(self): - self.fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--status', 'running', '--connection', 'http://127.0.0.1') - result = check_docker.process_args(args=args) - self.assertFalse(check_docker.socketfile_permissions_failure(parsed_args=result)) - - -class TestPerform(fake_filesystem_unittest.TestCase): - def setUp(self): - self.setUpPyfakefs() - self.fs.CreateFile(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) - self.containers = [{'Names': ['/thing1']}, ] - self.http_success_with_empty_payload = ('{}', 200) - - def test_no_containers(self): - args = ['--cpu', '0:0'] - with patch('check_docker.get_url', return_value=([], 200)): - with patch('check_docker.unknown') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_cpu(self): - args = ['--cpu', '0:0'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_cpu') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_mem(self): - args = ['--memory', '0:0'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_memory') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_health(self): - args = ['--health'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_health') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_restarts(self): - args = ['--restarts', '1:1'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_restarts') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_status(self): - args = ['--status', 'running'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_status') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_uptime(self): - args = ['--uptime', '0:0'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_uptime') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_version(self): - args = ['--version'] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.check_version') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_no_checks(self): - args = [] - with patch('check_docker.get_url', return_value=(self.containers, 200)): - with patch('check_docker.unknown') as patched: - check_docker.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - -class TestOutput(unittest.TestCase): - def setUp(self): - check_docker.messages = [] - check_docker.performance_data = [] - check_docker.messages = [] +def test_args_help(check_docker, capsys): + args = tuple() + check_docker.process_args(args=args) + out, err = capsys.readouterr() + assert 'usage: ' in out - def test_print_results1(self): - check_docker.messages = [] - check_docker.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, '') - def test_print_results2(self): - check_docker.messages = ['TEST'] - check_docker.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, 'TEST') +def test_args_restart(check_docker): + args = ('--restarts', 'non-default') + result = check_docker.process_args(args=args) + assert result.restarts == 'non-default' - def test_print_results3(self): - check_docker.messages = ['FOO', 'BAR'] - check_docker.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, 'FOO; BAR') - def test_print_results4(self): - check_docker.messages = ['FOO', 'BAR'] - check_docker.performance_data = ['1;2;3;4;'] +def test_args_status(check_docker): + args = ('--status', 'non-default') + result = check_docker.process_args(args=args) + assert result.status == 'non-default' - check_docker.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, 'FOO; BAR|1;2;3;4;') +def test_args_memory(check_docker): + args = ('--memory', 'non-default') + result = check_docker.process_args(args=args) + assert result.memory, 'non-default' -class TestVersion(unittest.TestCase): - def test_package_present(self): - req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker", method="HEAD") - with request.urlopen(req) as resp: - self.assertEqual(resp.getcode(), 200) - def test_ensure_new_version(self): - version = check_docker.__version__ - req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker&version={version}". - format(version=version), method="HEAD") +def test_args_containers(check_docker): + args = ('--containers', 'non-default') + result = check_docker.process_args(args=args) + assert result.containers == ['non-default'] + + +def test_args_containers_blank(check_docker): + args = ('--containers',) + with pytest.raises(SystemExit): + check_docker.process_args(args=args) + + +def test_args_present(check_docker): + result = check_docker.process_args(args=()) + assert not result.present + args = ('--present',) + result = check_docker.process_args(args=args) + assert result.present + + +def test_args_timeout(check_docker): + args = ('--timeout', '9999') + result = check_docker.process_args(args=args) + assert result.timeout == 9999.0 + + +def test_args_connection(check_docker): + args = ('--connection', '/foo') + result = check_docker.process_args(args=args) + assert result.connection == '/foo' + assert check_docker.daemon == 'socket:///foo:' + + args = ('--connection', 'foo.com/bar') + result = check_docker.process_args(args=args) + assert result.connection == 'foo.com/bar' + assert check_docker.daemon == 'http://foo.com/bar' + + +def test_args_secure_connection(check_docker): + check_docker.rc = -1 + args = ('--secure-connection', 'non-default') + result = check_docker.process_args(args=args) + assert result.secure_connection == 'non-default' + + args = ('--secure-connection', 'foo.com/bar') + result = check_docker.process_args(args=args) + assert result.secure_connection == 'foo.com/bar' + assert check_docker.daemon == 'https://foo.com/bar' - try: - with request.urlopen(req) as resp: - http_code = resp.getcode() - except HTTPError as e: - http_code = e.code - self.assertEqual(http_code, 404, "Version already exists") +@pytest.mark.parametrize('args', ( + ('--connection', 'non-default', '--secure-connection', 'non-default'), + ('--binary_units', '--decimal_units') +)) +def test_exclusive_args(check_docker, args): + with pytest.raises(SystemExit): + check_docker.process_args(args) -if __name__ == '__main__': - unittest.main(buffer=True) + +@pytest.mark.parametrize('arg, one_kb', ( + ('--binary_units', 1024), + ('--decimal_units', 1000) +)) +def test_units_base(check_docker, fs, arg, one_kb): + # Assert value is driven by argprase results + assert check_docker.unit_adjustments is None, "unit_adjustments has no sensible default wihout knowing the base" + + # Confirm default value is set + parsed_args = check_docker.process_args([]) + assert parsed_args.units_base == 1024, "units_base should default to 1024" + + # Confirm value is updated by argparse flags + parsed_args = check_docker.process_args([arg]) + assert parsed_args.units_base == one_kb, "units_base should be influenced by units flags" + + fs.CreateFile(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + with patch('check_docker.get_containers', return_value=['container1']), \ + patch('check_docker.get_stats', + return_value={'memory_stats': {'limit': one_kb, 'usage': one_kb, 'stats': {'total_cache': 0}}}), \ + patch('check_docker.get_state', return_value={'Running': True}): + check_docker.perform_checks(['--memory', '0:0:KB', arg]) + + # Confirm unit adjustment table was updated by argument + assert check_docker.unit_adjustments['KB'] == one_kb + + # Confirm output shows unit conversion specified by arg + assert check_docker.performance_data == ['container1_mem=1.0KB;0;0;0;1.0'] + + +def test_missing_check(check_docker): + check_docker.rc = -1 + args = tuple() + result = check_docker.process_args(args=args) + assert check_docker.no_checks_present(result) + + +def test_present_check(check_docker): + check_docker.rc = -1 + args = ('--status', 'running') + result = check_docker.process_args(args=args) + assert not check_docker.no_checks_present(result) + + +def test_get_containers_1(check_docker, sample_containers_json): + with patch('check_docker.get_url', return_value=(sample_containers_json, 200)): + container_list = check_docker.get_containers('all', False) + assert container_list == {'thing1', 'thing2'} + + +def test_get_containers_2(check_docker, sample_containers_json): + with patch('check_docker.get_url', return_value=(sample_containers_json, 200)): + container_list = check_docker.get_containers(['thing.*'], False) + assert container_list == {'thing1', 'thing2'} + + +def test_get_containers_3(check_docker, sample_containers_json): + check_docker.rc = -1 + with patch('check_docker.get_url', return_value=(sample_containers_json, 200)): + with patch('check_docker.unknown') as patched: + container_list = check_docker.get_containers({'foo'}, False) + assert container_list == set() + assert patched.call_count == 0 + + +def test_get_containers_4(check_docker, sample_containers_json): + check_docker.rc = -1 + with patch('check_docker.get_url', return_value=(sample_containers_json, 200)): + with patch('check_docker.critical') as patched: + container_list = check_docker.get_containers({'foo'}, True) + assert container_list == set() + assert patched.call_count == 1 + + +def test_socketfile_failure_false(check_docker, fs): + fs.CreateFile('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ('--status', 'running', '--connection', '/tmp/socket') + result = check_docker.process_args(args=args) + assert not check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_filetype(check_docker, fs): + fs.CreateFile('/tmp/not_socket', contents='testing') + args = ('--status', 'running', '--connection', '/tmp/not_socket') + result = check_docker.process_args(args=args) + assert check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_missing(check_docker, fs): + args = ('--status', 'running', '--connection', '/tmp/missing') + result = check_docker.process_args(args=args) + check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_unwriteable(check_docker, fs): + fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--status', 'running', '--connection', '/tmp/unwritable') + result = check_docker.process_args(args=args) + assert check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_unreadable(check_docker, fs): + fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--status', 'running', '--connection', '/tmp/unreadable') + result = check_docker.process_args(args=args) + assert check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_http(check_docker, fs): + fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--status', 'running', '--connection', 'http://127.0.0.1') + result = check_docker.process_args(args=args) + assert not check_docker.socketfile_permissions_failure(parsed_args=result) + + +def test_perform_with_no_containers(check_docker, fs): + fs.CreateFile(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--cpu', '0:0'] + with patch('check_docker.get_url', return_value=([], 200)): + with patch('check_docker.unknown') as patched: + check_docker.perform_checks(args) + assert patched.call_count == 1 + + +@pytest.mark.parametrize("args, called", ( + (['--cpu', '0:0'], 'check_docker.check_cpu'), + (['--memory', '0:0'], 'check_docker.check_memory'), + (['--health'], 'check_docker.check_health'), + (['--restarts', '1:1'], 'check_docker.check_restarts'), + (['--status', 'running'], 'check_docker.check_status'), + (['--uptime', '0:0'], 'check_docker.check_uptime'), + (['--version'], 'check_docker.check_version'), + ([], 'check_docker.unknown') +)) +def test_perform(check_docker, fs, args, called): + fs.CreateFile(check_docker.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + with patch('check_docker.get_url', return_value=([{'Names': ['/thing1']}, ], 200)): + with patch(called) as patched: + check_docker.perform_checks(args) + assert patched.call_count == 1 + + +@pytest.mark.parametrize("messages, perf_data, exspected", ( + ([], [], ''), + (['TEST'], [], 'TEST'), + (['FOO', 'BAR'], [], 'FOO; BAR'), + (['FOO', 'BAR'], ['1;2;3;4;'], 'FOO; BAR|1;2;3;4;') +)) +def test_print_results(check_docker, capsys, messages, perf_data, exspected): + check_docker.messages = messages + check_docker.performance_data = perf_data + check_docker.print_results() + out, err = capsys.readouterr() + assert out.strip() == exspected + + +def test_package_present(): + req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker", method="HEAD") + with request.urlopen(req) as resp: + assert resp.getcode() == 200 + + +def test_ensure_new_version(): + version = cd.__version__ + req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker&version={version}". + format(version=version), method="HEAD") + + try: + with request.urlopen(req) as resp: + http_code = resp.getcode() + except HTTPError as e: + http_code = e.code + assert http_code == 404, "Version already exists" + + +@pytest.mark.parametrize('url, expected', ( + ("short", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/short", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/short:latest")), + + ("simple/name", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="simple/name", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/simple/name:latest")), + ("library/ubuntu", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/ubuntu", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/ubuntu:latest")), + ("docker/stevvooe/app", + cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="docker/stevvooe/app", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/docker/stevvooe/app:latest")), + ("aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb", + cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb", + tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa/aa/aa/aa/aa/aa/aa/aa/aa/bb/bb/bb/bb/bb/bb:latest")), + ("aa/aa/bb/bb/bb", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa/aa/bb/bb/bb", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa/aa/bb/bb/bb:latest")), + ("a/a/a/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/a/a/a", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/a/a/a:latest")), + ("a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/a", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/a:latest")), + ("a/aa", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/aa", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/aa:latest")), + ("a/aa/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="a/aa/a", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/a/aa/a:latest")), + ("foo.com", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo.com", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo.com:latest")), + ("foo.com:8080/bar", + cd.ImageName(registry="foo.com:8080", name="bar", tag="latest", full_name="foo.com:8080/bar:latest")), + ("foo.com/bar", cd.ImageName(registry="foo.com", name="bar", tag="latest", full_name="foo.com/bar:latest")), + ("foo.com/bar/baz", + cd.ImageName(registry="foo.com", name="bar/baz", tag="latest", full_name="foo.com/bar/baz:latest")), + + ("localhost:8080/bar", + cd.ImageName(registry="localhost:8080", name="bar", tag="latest", full_name="localhost:8080/bar:latest")), + ("sub-dom1.foo.com/bar/baz/quux", cd.ImageName(registry="sub-dom1.foo.com", name="bar/baz/quux", tag="latest", + full_name="sub-dom1.foo.com/bar/baz/quux:latest")), + ("blog.foo.com/bar/baz", + cd.ImageName(registry="blog.foo.com", name="bar/baz", tag="latest", full_name="blog.foo.com/bar/baz:latest")), + ("aa-a/a", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="aa-a/a", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/aa-a/a:latest")), + ("foo_bar", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo_bar", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo_bar:latest")), + ("foo_bar.com", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/foo_bar.com", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/foo_bar.com:latest")), + ("foo.com/foo_bar", + cd.ImageName(registry="foo.com", name="foo_bar", tag="latest", full_name="foo.com/foo_bar:latest")), + ("b.gcr.io/test.example.com/my-app", + cd.ImageName(registry="b.gcr.io", name="test.example.com/my-app", tag="latest", + full_name="b.gcr.io/test.example.com/my-app:latest")), + ("xn--n3h.com/myimage", + cd.ImageName(registry="xn--n3h.com", name="myimage", tag="latest", full_name="xn--n3h.com/myimage:latest")), + ("xn--7o8h.com/myimage", + cd.ImageName(registry="xn--7o8h.com", name="myimage", tag="latest", full_name="xn--7o8h.com/myimage:latest")), + ("example.com/xn--7o8h.com/myimage", + cd.ImageName(registry="example.com", name="xn--7o8h.com/myimage", tag="latest", + full_name="example.com/xn--7o8h.com/myimage:latest")), + ("example.com/some_separator__underscore/myimage", + cd.ImageName(registry="example.com", name="some_separator__underscore/myimage", tag="latest", + full_name="example.com/some_separator__underscore/myimage:latest")), + ("do__cker/docker", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="do__cker/docker", tag="latest", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/do__cker/docker:latest")), + ("b.gcr.io/test.example.com/my-app", + cd.ImageName(registry="b.gcr.io", name="test.example.com/my-app", tag="latest", + full_name="b.gcr.io/test.example.com/my-app:latest")), + ("registry.io/foo/project--id.module--name.ver---sion--name", + cd.ImageName(registry="registry.io", name="foo/project--id.module--name.ver---sion--name", tag="latest", + full_name="registry.io/foo/project--id.module--name.ver---sion--name:latest")), + ("Asdf.com/foo/bar", + cd.ImageName(registry="Asdf.com", name="foo/bar", tag="latest", full_name="Asdf.com/foo/bar:latest")), + ("host.tld:12/name:tag", + cd.ImageName(registry="host.tld:12", name="name", tag="tag", full_name="host.tld:12/name:tag")), + ("host.tld/name:tag", cd.ImageName(registry="host.tld", name="name", tag="tag", full_name="host.tld/name:tag")), + ("name/name:tag", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="name/name", tag="tag", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/name/name:tag")), + ("name:tag", cd.ImageName(registry=cd.DEFAULT_PUBLIC_REGISTRY, name="library/name", tag="tag", + full_name=cd.DEFAULT_PUBLIC_REGISTRY + "/library/name:tag")), + ("host:21/name:tag", cd.ImageName(registry='host:21', name="name", tag="tag", + full_name="host:21/name:tag")), +)) +def test_parse_image_name(check_docker, url, expected): + parsed_name = check_docker.parse_image_name(url) + assert parsed_name == expected + + +def test_get_manifest_auth_token(check_docker): + with patch('check_docker.get_url', return_value=({'token': 'test'}, 200)): + www_authenticate_header = 'Bearer realm="https://example.com/token",service="example.com",scope="repository:test:pull"' + token = check_docker.get_manifest_auth_token(www_authenticate_header) + assert token == 'test' + + +def test_get_container_image_urls(check_docker): + container_response = {'Image': 'test'} + image_response = {'RepoTags': ['test']} + with patch('check_docker.get_container_info', return_value=container_response), \ + patch('check_docker.get_image_info', return_value=image_response): + urls = check_docker.get_container_image_urls('container') + assert urls == ['test'] + + +@pytest.mark.parametrize('image_url, expected_normal_url', ( + ('foo', 'https://' + cd.DEFAULT_PUBLIC_REGISTRY + '/v2/library/foo/manifests/latest'), + ('insecure.com/foo', 'http://insecure.com/v2/foo/manifests/latest'), +)) +def test_normalize_image_name_to_manifest_url(check_docker, image_url, expected_normal_url): + insecure_registries = ('insecure.com',) + normal_url, _ = check_docker.normalize_image_name_to_manifest_url(image_url, insecure_registries) + assert normal_url == expected_normal_url + + +@pytest.mark.parametrize('image_response, expected_digest', ( + ({'RepoDigests': []}, None), + ({'RepoDigests': ['name@AAAAAA']}, 'AAAAAA'), +)) +def test_get_container_digest(check_docker, image_response, expected_digest): + container_response = {'Image': 'test'} + with patch('check_docker.get_container_info', return_value=container_response), \ + patch('check_docker.get_image_info', return_value=image_response): + digest = check_docker.get_container_digest('container') + assert digest == expected_digest + + +def test_get_digest_from_registry_no_auth(check_docker): + response = FakeHttpResponse(content=b"", http_code=200, headers={'Docker-Content-Digest': "test_token"}) + + with patch('check_docker.head_url', return_value=response): + digest = check_docker.get_digest_from_registry('https://example.com/v2/test/manifests/lastest') + assert digest == "test_token" + + +@pytest.mark.parametrize('local_container_digest,registry_container_digest, image_urls, expected_rc', ( + ('AAAA', 'AAAA', ('example.com/foo',), cd.OK_RC), + ('AAAA', 'BBBB', ('example.com/foo',), cd.CRITICAL_RC), + (None, '', ('example.com/foo',), cd.UNKNOWN_RC), + ('', None, ('example.com/foo',), cd.UNKNOWN_RC), + ('AAAA', 'AAAA', ('example.com/foo', 'example.com/bar'), cd.UNKNOWN_RC), + ('AAAA', 'AAAA', tuple(), cd.UNKNOWN_RC), +)) +def test_check_version(check_docker, local_container_digest, registry_container_digest, image_urls, expected_rc): + with patch('check_docker.get_container_digest', return_value=local_container_digest), \ + patch('check_docker.get_container_image_urls', return_value=image_urls), \ + patch('check_docker.get_digest_from_registry', return_value=registry_container_digest): + check_docker.check_version('container', tuple()) + assert check_docker.rc == expected_rc diff --git a/test_check_swarm.py b/test_check_swarm.py index 8058bf6..337bfe8 100644 --- a/test_check_swarm.py +++ b/test_check_swarm.py @@ -1,343 +1,329 @@ import argparse import json -import sys from io import BytesIO import stat -from datetime import datetime, timezone, timedelta -import unittest from unittest.mock import patch from urllib.error import HTTPError -from pyfakefs import fake_filesystem_unittest +import pytest from importlib.machinery import SourceFileLoader from urllib import request __author__ = 'tim' # This is needed because `check_swarm` does not end a a .py so it won't be found by default1 -check_swarm = SourceFileLoader('check_swarm', './check_swarm').load_module() +cs = SourceFileLoader('check_swarm', './check_swarm').load_module() + + +@pytest.fixture +def check_swarm(): + # This is needed because `check_docker` does not end a a .py so it won't be found by default + check_swarm = SourceFileLoader('check_swarm', './check_swarm').load_module() + check_swarm.rc = -1 + check_swarm.timeout = 1 + check_swarm.messages = [] + check_swarm.performance_data = [] + check_swarm.daemon = 'socket:///notreal' + check_swarm.get_url.cache_clear() + return check_swarm class FakeHttpResponse(BytesIO): def __init__(self, content, http_code): - self.code = http_code + self.status = http_code super(FakeHttpResponse, self).__init__(content) -class TestUtil(unittest.TestCase): - def setUp(self): - check_swarm.rc = -1 - self.services = [{'Spec': {"Name": 'FOO'}}, - {'Spec': {"Name": 'BAR'}}] - - def test_get_url(self): - obj = {'foo': 'bar'} - encoded = json.dumps(obj=obj).encode('utf-8') - expected_response = FakeHttpResponse(content=encoded, http_code=200) - with patch('check_swarm.better_urllib_get.open', return_value=expected_response): - response, _ = check_swarm.get_url(url='/test') - self.assertDictEqual(response, obj) - - def test_get_swarm_status(self): - with patch('check_swarm.get_url', return_value=('', 999)): - response = check_swarm.get_swarm_status() - self.assertEqual(response, 999) - - def test_get_service_info(self): - with patch('check_swarm.get_url', return_value=('FOO', 999)): - response_data, response_status = check_swarm.get_service_info('FOO') - self.assertEqual(response_data, 'FOO') - self.assertEqual(response_status, 999) - - def test_get_services_not_swarm(self): - with patch('check_swarm.get_url', return_value=('', 406)): - check_swarm.get_services('FOO') - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) - - def test_get_services_error(self): - with patch('check_swarm.get_url', return_value=('', 500)): - check_swarm.get_services('FOO') - self.assertEqual(check_swarm.rc, check_swarm.UNKNOWN_RC) - - def test_get_services_all(self): - with patch('check_swarm.get_url', return_value=(self.services, 200)): - result=check_swarm.get_services('all') - self.assertEqual(len(result), len(self.services)) - -class TestReporting(unittest.TestCase): - def setUp(self): - check_swarm.rc = -1 - check_swarm.messages = [] - check_swarm.performance_data = [] - - def test_ok(self): - check_swarm.ok("OK test") - self.assertEqual(check_swarm.rc, check_swarm.OK_RC) - self.assertListEqual(check_swarm.messages, ['OK: OK test']) - - def test_warn(self): - check_swarm.warning("WARN test") - self.assertEqual(check_swarm.rc, check_swarm.WARNING_RC) - self.assertListEqual(check_swarm.messages, ['WARNING: WARN test']) - - def test_crit(self): - check_swarm.critical("CRIT test") - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) - self.assertListEqual(check_swarm.messages, ['CRITICAL: CRIT test']) - - def test_unknown(self): - check_swarm.unknown("UNKNOWN test") - self.assertEqual(check_swarm.rc, check_swarm.UNKNOWN_RC) - self.assertListEqual(check_swarm.messages, ['UNKNOWN: UNKNOWN test']) - - def test_set_rc(self): - # Can I do a basic set - check_swarm.set_rc(check_swarm.OK_RC) - self.assertEqual(check_swarm.rc, check_swarm.OK_RC) - - # Does it prevent downgrades of rc - check_swarm.set_rc(check_swarm.WARNING_RC) - self.assertEqual(check_swarm.rc, check_swarm.WARNING_RC) - check_swarm.set_rc(check_swarm.OK_RC) - self.assertEqual(check_swarm.rc, check_swarm.WARNING_RC) - - def test_process_url_status_ok(self): - check_swarm.process_url_status(200, ok_msg='ok_msg', critical_msg='critical_msg', unknown_msg='unknown_msg') - self.assertEqual(check_swarm.rc, check_swarm.OK_RC) - self.assertListEqual(check_swarm.messages , ['OK: ok_msg']) - - - def test_process_url_status_critical(self): - check_swarm.process_url_status(404, ok_msg='ok_msg', critical_msg='critical_msg', unknown_msg='unknown_msg') - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) - self.assertListEqual(check_swarm.messages , ['CRITICAL: critical_msg']) - - - def test_process_url_status_unknown(self): - check_swarm.process_url_status(418, ok_msg='ok_msg', critical_msg='critical_msg', unknown_msg='unknown_msg') - self.assertEqual(check_swarm.rc, check_swarm.UNKNOWN_RC) - self.assertListEqual(check_swarm.messages , ['UNKNOWN: unknown_msg']) - -class TestArgs(unittest.TestCase): - def setUp(self): - check_swarm.rc = -1 - - def test_args_timeout(self): - args = ('--timeout', '9999', "--service", "FOO") - result = check_swarm.process_args(args=args) - self.assertEqual(result.timeout, 9999.0) - - def test_args_connection(self): - args = ('--connection', '/foo', "--service", "FOO") - result = check_swarm.process_args(args=args) - self.assertEqual(result.connection, '/foo') - self.assertEqual(check_swarm.daemon, 'socket:///foo:') - - args = ('--connection', 'example.com/bar', "--service", "FOO") - result = check_swarm.process_args(args=args) - self.assertEqual(result.connection, 'example.com/bar') - self.assertEqual(check_swarm.daemon, 'http://example.com/bar') - - def test_args_secure_connection(self): - args = ('--secure-connection', 'non-default', "--service", "FOO") - result = check_swarm.process_args(args=args) - self.assertEqual(result.secure_connection, 'non-default') - - args = ('--secure-connection', 'example.com/bar', "--service", "FOO") - result = check_swarm.process_args(args=args) - self.assertEqual(result.secure_connection, 'example.com/bar') - self.assertEqual(check_swarm.daemon, 'https://example.com/bar') - - def test_args_mixed_connection(self): - args = ('--connection', 'non-default', '--secure-connection', 'non-default', "--service", "FOO") - try: - self.assertRaises(argparse.ArgumentError, check_swarm.process_args, args) - except SystemExit: # Argument failures exit as well - pass - - def test_args_mixed_checks(self): - args = ('--swarm', "--service", "FOO") - try: - self.assertRaises(argparse.ArgumentError, check_swarm.process_args, args) - except SystemExit: # Argument failures exit as well - pass - - def test_missing_check(self): - args = tuple() - with self.assertRaises(SystemExit): - check_swarm.process_args(args=args) - self.assertTrue(': error: one of the arguments' in sys.stderr.getvalue()) - - -class TestSocket(fake_filesystem_unittest.TestCase): - def setUp(self): - check_swarm.rc = -1 - check_swarm.messages = [] - check_swarm.performance_data = [] - self.setUpPyfakefs() - - def test_socketfile_failure_false(self): - self.fs.CreateFile('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666)) - args = ('--swarm', '--connection', '/tmp/socket') - result = check_swarm.process_args(args=args) - self.assertFalse(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_filetype(self): - self.fs.CreateFile('/tmp/not_socket', contents='testing') - args = ('--swarm', '--connection', '/tmp/not_socket') - result = check_swarm.process_args(args=args) - self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_missing(self): - args = ('--swarm', '--connection', '/tmp/missing') - result = check_swarm.process_args(args=args) - self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_unwriteable(self): - self.fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--swarm', '--connection', '/tmp/unwritable') - result = check_swarm.process_args(args=args) - self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_unreadable(self): - self.fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--swarm', '--connection', '/tmp/unreadable') - result = check_swarm.process_args(args=args) - self.assertTrue(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - def test_socketfile_failure_http(self): - self.fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000)) - args = ('--swarm', '--connection', 'http://127.0.0.1') - result = check_swarm.process_args(args=args) - self.assertFalse(check_swarm.socketfile_permissions_failure(parsed_args=result)) - - -class TestPerform(fake_filesystem_unittest.TestCase): - def setUp(self): - self.setUpPyfakefs() - self.fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) - self.services = [{'Spec': {"Name": 'FOO'}}, - {'Spec': {"Name": 'BAR'}}] - self.service = {'Spec': {"Name": 'FOO'}} - - self.http_success_with_empty_payload = ('{}', 200) - check_swarm.rc = -1 - - def test_check_swarm_called(self): - args = ['--swarm'] - with patch('check_swarm.get_url', return_value=(self.services, 200)): - with patch('check_swarm.check_swarm') as patched: - check_swarm.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_swarm_results_OK(self): - args = ['--swarm'] - with patch('check_swarm.get_swarm_status', return_value=200): - check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.OK_RC) - - def test_check_swarm_results_CRITICAL(self): - args = ['--swarm'] - with patch('check_swarm.get_swarm_status', return_value=406): - check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) - - def test_check_service_called(self): - args = ['--service', 'FOO'] - with patch('check_swarm.get_url', return_value=(self.services, 200)): - with patch('check_swarm.check_service') as patched: - check_swarm.perform_checks(args) - self.assertEqual(patched.call_count, 1) - - def test_check_service_results_OK(self): - args = ['--service', 'FOO'] - with patch('check_swarm.get_services', return_value=['FOO','BAR']): - with patch('check_swarm.get_service_info', return_value=(self.service, 200)): - check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.OK_RC) - - def test_check_service_results_FAIL_missing(self): - args = ['--service', 'missing1'] - with patch('check_swarm.get_url', return_value=(self.services, 200)): - check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) - - def test_check_service_results_FAIL_unknown(self): - args = ['--service', 'FOO'] - with patch('check_swarm.get_services', return_value=['FOO','BAR']): - with patch('check_swarm.get_service_info', return_value=('', 500)): - check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.UNKNOWN_RC) - - def test_check_no_services(self): - args = ['--service', 'missing2'] - with patch('check_swarm.get_url', return_value=([], 200)): +def test_get_url(check_swarm, monkeypatch): + obj = {'foo': 'bar'} + encoded = json.dumps(obj=obj).encode('utf-8') + expected_response = FakeHttpResponse(content=encoded, http_code=200) + + def mock_open(*args, **kwargs): + return expected_response + + monkeypatch.setattr(check_swarm.better_urllib_get, 'open', value=mock_open) + response, _ = check_swarm.get_url(url='/test') + assert response == obj + + +def test_get_swarm_status(check_swarm): + with patch('check_swarm.get_url', return_value=('', 999)): + response = check_swarm.get_swarm_status() + assert response == 999 + + +def test_get_service_info(check_swarm): + with patch('check_swarm.get_url', return_value=('FOO', 999)): + response_data, response_status = check_swarm.get_service_info('FOO') + assert response_data == 'FOO' + assert response_status == 999 + + +def test_get_services_not_swarm(check_swarm): + with patch('check_swarm.get_url', return_value=('', 406)): + check_swarm.get_services('FOO') + assert check_swarm.rc == check_swarm.CRITICAL_RC + + +def test_get_services_error(check_swarm): + with patch('check_swarm.get_url', return_value=('', 500)): + check_swarm.get_services('FOO') + assert check_swarm.rc == check_swarm.UNKNOWN_RC + + +def test_get_services_all(check_swarm): + services = [{'Spec': {"Name": 'FOO'}}, + {'Spec': {"Name": 'BAR'}}] + with patch('check_swarm.get_url', return_value=(services, 200)): + result = check_swarm.get_services('all') + assert len(result) == len(services) + + +@pytest.mark.parametrize('func,arg,rc,messages', + ( + ('ok', "OK test", cs.OK_RC, ['OK: OK test']), + ('warning', "WARN test", cs.WARNING_RC, ['WARNING: WARN test']), + ('critical', "CRIT test", cs.CRITICAL_RC, ['CRITICAL: CRIT test']), + ('unknown', "UNKNOWN test", cs.UNKNOWN_RC, ['UNKNOWN: UNKNOWN test']), + )) +def test_status_update(check_swarm, func, arg, rc, messages): + getattr(check_swarm, func)(arg) + assert check_swarm.rc == rc + assert check_swarm.messages == messages + + +def test_set_rc(check_swarm): + # Can I do a basic set + check_swarm.set_rc(check_swarm.OK_RC) + assert check_swarm.rc == check_swarm.OK_RC + + # Does it prevent downgrades of rc + check_swarm.set_rc(check_swarm.WARNING_RC) + assert check_swarm.rc == check_swarm.WARNING_RC + check_swarm.set_rc(check_swarm.OK_RC) + assert check_swarm.rc == check_swarm.WARNING_RC + + +@pytest.mark.parametrize('code, expected_rc, expected_messages', ( + (200, cs.OK_RC, ['OK: ok_msg']), + (404, cs.CRITICAL_RC, ['CRITICAL: critical_msg']), + (418, cs.UNKNOWN_RC, ['UNKNOWN: unknown_msg']), +)) +def test_process_url_status_ok(check_swarm, code, expected_rc, expected_messages): + check_swarm.process_url_status(code, ok_msg='ok_msg', critical_msg='critical_msg', unknown_msg='unknown_msg') + assert check_swarm.rc == expected_rc + assert check_swarm.messages == expected_messages + + +def test_args_timeout(check_swarm): + args = ('--timeout', '9999', '--swarm') + result = check_swarm.process_args(args=args) + assert result.timeout == 9999.0 + + +def test_args_connection(check_swarm): + args = ('--connection', '/foo', '--swarm') + result = check_swarm.process_args(args=args) + assert result.connection == '/foo' + assert check_swarm.daemon == 'socket:///foo:' + + args = ('--connection', 'foo.com/bar', '--swarm') + result = check_swarm.process_args(args=args) + assert result.connection == 'foo.com/bar' + assert check_swarm.daemon == 'http://foo.com/bar' + + +def test_args_secure_connection(check_swarm): + args = ('--secure-connection', 'non-default', '--swarm') + result = check_swarm.process_args(args=args) + assert result.secure_connection == 'non-default' + + args = ('--secure-connection', 'foo.com/bar', '--swarm') + result = check_swarm.process_args(args=args) + assert result.secure_connection == 'foo.com/bar' + assert check_swarm.daemon == 'https://foo.com/bar' + + +def test_args_mixed_connection(check_swarm): + args = ('--connection', 'non-default', '--secure-connection', 'non-default', '--swarm') + with pytest.raises(SystemExit): + check_swarm.process_args(args) + + +def test_missing_check(check_swarm): + try: + with pytest.raises(argparse.ArgumentError): + check_swarm.process_args(tuple()) + except SystemExit: # Argument failures exit as well + pass + + +def test_args_mixed_checks(check_swarm): + try: + with pytest.raises(argparse.ArgumentError): + check_swarm.process_args(['--swarm', "--service", "FOO"]) + except SystemExit: # Argument failures exit as well + pass + + +def test_socketfile_failure_false(check_swarm, fs): + fs.CreateFile('/tmp/socket', contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ('--swarm', '--connection', '/tmp/socket') + result = check_swarm.process_args(args=args) + assert not check_swarm.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_filetype(check_swarm, fs): + fs.CreateFile('/tmp/not_socket', contents='testing') + args = ('--swarm', '--connection', '/tmp/not_socket') + result = check_swarm.process_args(args=args) + assert check_swarm.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_missing(check_swarm, fs): + args = ('--swarm', '--connection', '/tmp/missing') + result = check_swarm.process_args(args=args) + check_swarm.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_unwriteable(check_swarm, fs): + fs.CreateFile('/tmp/unwritable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--swarm', '--connection', '/tmp/unwritable') + result = check_swarm.process_args(args=args) + assert check_swarm.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_unreadable(check_swarm, fs): + fs.CreateFile('/tmp/unreadable', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--swarm', '--connection', '/tmp/unreadable') + result = check_swarm.process_args(args=args) + assert check_swarm.socketfile_permissions_failure(parsed_args=result) + + +def test_socketfile_failure_http(check_swarm, fs): + fs.CreateFile('/tmp/http', contents='', st_mode=(stat.S_IFSOCK | 0o000)) + args = ('--swarm', '--connection', 'http://127.0.0.1') + result = check_swarm.process_args(args=args) + assert not check_swarm.socketfile_permissions_failure(parsed_args=result) + + +@pytest.fixture() +def services(): + return [{'Spec': {"Name": 'FOO'}}, {'Spec': {"Name": 'BAR'}}] + + +def test_check_swarm_called(check_swarm, fs, services): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--swarm'] + with patch('check_swarm.get_url', return_value=(services, 200)): + with patch('check_swarm.check_swarm') as patched: check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) + assert patched.call_count == 1 + + +def test_check_swarm_results_OK(check_swarm, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--swarm'] + with patch('check_swarm.get_swarm_status', return_value=200): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.OK_RC + + +def test_check_swarm_results_CRITICAL(check_swarm, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--swarm'] + with patch('check_swarm.get_swarm_status', return_value=406): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.CRITICAL_RC + - def test_check_missing_service(self): - args = ['--service', 'missing3'] - with patch('check_swarm.get_url', return_value=(self.services, 200)): +def test_check_service_called(check_swarm, services, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'FOO'] + with patch('check_swarm.get_url', return_value=(services, 200)): + with patch('check_swarm.check_service') as patched: check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) + assert patched.call_count == 1 - def test_check_not_swarm_service(self): - args = ['--service', 'missing4'] - with patch('check_swarm.get_url', return_value=('', 406)): + +def test_check_service_results_OK(check_swarm, services, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'FOO'] + with patch('check_swarm.get_services', return_value=['FOO', 'BAR']): + with patch('check_swarm.get_service_info', return_value=(services, 200)): check_swarm.perform_checks(args) - self.assertEqual(check_swarm.rc, check_swarm.CRITICAL_RC) + assert check_swarm.rc == cs.OK_RC -class TestOutput(unittest.TestCase): - def setUp(self): - check_swarm.messages = [] - check_swarm.performance_data = [] +def test_check_service_results_FAIL_missing(check_swarm, services, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'missing1'] + with patch('check_swarm.get_url', return_value=(services, 200)): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.CRITICAL_RC - check_swarm.messages = [] - def test_print_results1(self): - check_swarm.messages = [] - check_swarm.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, '') +def test_check_service_results_FAIL_unknown(check_swarm, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'FOO'] + with patch('check_swarm.get_services', return_value=['FOO', 'BAR']): + with patch('check_swarm.get_service_info', return_value=('', 500)): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.UNKNOWN_RC - def test_print_results2(self): - check_swarm.messages = ['TEST'] - check_swarm.print_results() - output = sys.stdout.getvalue().strip() - def test_print_results3(self): - check_swarm.messages = ['FOO', 'BAR'] - check_swarm.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, 'FOO; BAR') +def test_check_no_services(check_swarm,fs ): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'missing2'] + with patch('check_swarm.get_url', return_value=([], 200)): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.CRITICAL_RC - def test_print_results4(self): - check_swarm.messages = ['FOO', 'BAR'] - check_swarm.print_results() - output = sys.stdout.getvalue().strip() - self.assertEqual(output, 'FOO; BAR') +def test_check_missing_service(check_swarm, services, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'missing3'] + with patch('check_swarm.get_url', return_value=(services, 200)): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.CRITICAL_RC -class TestVersion(unittest.TestCase): - def test_package_present(self): - req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker", method="HEAD") - with request.urlopen(req) as resp: - self.assertEqual(resp.getcode(), 200) +def test_check_not_swarm_service(check_swarm, fs): + fs.CreateFile(check_swarm.DEFAULT_SOCKET, contents='', st_mode=(stat.S_IFSOCK | 0o666)) + args = ['--service', 'missing4'] + with patch('check_swarm.get_url', return_value=('', 406)): + check_swarm.perform_checks(args) + assert check_swarm.rc == cs.CRITICAL_RC - def test_ensure_new_version(self): - version = check_swarm.__version__ - req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker&version={version}". - format(version=version), method="HEAD") - try: - with request.urlopen(req) as resp: - http_code = resp.getcode() - except HTTPError as e: - http_code = e.code - self.assertEqual(http_code, 404, "Version already exists") +@pytest.mark.parametrize("messages, perf_data, expected", ( + ([], [], ''), + (['TEST'], [], 'TEST'), + (['FOO', 'BAR'], [], 'FOO; BAR'), +)) +def test_print_results(check_swarm, capsys, messages, perf_data, expected): + check_swarm.messages = messages + check_swarm.performance_data = perf_data + check_swarm.print_results() + out, err = capsys.readouterr() + assert out.strip() == expected -if __name__ == '__main__': - unittest.main(buffer=True) +def test_package_present(): + req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker", method="HEAD") + with request.urlopen(req) as resp: + assert resp.getcode() == 200 + + +def test_ensure_new_version(): + version = cs.__version__ + req = request.Request("https://pypi.python.org/pypi?:action=doap&name=check_docker&version={version}". + format(version=version), method="HEAD") + + try: + with request.urlopen(req) as resp: + http_code = resp.getcode() + except HTTPError as e: + http_code = e.code + assert http_code == 404, "Version already exists"