diff --git a/.travis.yml b/.travis.yml index 9ae8c42..c99db65 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: python +dist: focal python: - '3.8' sudo: required diff --git a/mirroroperator/operator.py b/mirroroperator/operator.py index a4f114d..62f779d 100644 --- a/mirroroperator/operator.py +++ b/mirroroperator/operator.py @@ -1,10 +1,11 @@ import ast -import kubernetes +import json import logging import time import os - from http import HTTPStatus +import kubernetes + from kubernetes.client.rest import ApiException from mirroroperator.registrymirror import RegistryMirror from mirroroperator.exceptions import NoCRDException @@ -17,7 +18,11 @@ CRD_PLURAL = "registrymirrors" -class MirrorOperator(object): +# pylint: disable=too-few-public-methods +class MirrorOperator: + # pylint: disable=fixme + # FIXME: pylint warning: redefined-outer-name: Redefining name 'env_vars' from outer scope + # pylint: disable=redefined-outer-name def __init__(self, env_vars): """ :param env_vars: dictionary includes namespace, @@ -26,6 +31,8 @@ def __init__(self, env_vars): ss_ds_labels (used in RegistryMirror, optional), ss_ds_template_lables (used in RegistryMirror, optional) ss_ds_tolerations (used in RegistryMirror, optional) + addressing_scheme ('hostess' or 'services', defaults to 'hostess', optional) + imageswap_namespace (used in MirrorOperator, default to the operator namespace, optional) hostess_docker_image (used in RegistryMirror), hostess_docker_tag (used in RegistryMirror), image_pull_secrets(used in RegistryMirror, optional), @@ -36,8 +43,8 @@ def __init__(self, env_vars): raise TypeError("Missing docker certificate secret") self.registry_mirror_vars = env_vars kubernetes.config.load_incluster_config() - self.crd_api = kubernetes.client.ExtensionsV1beta1Api() self.object_api = kubernetes.client.CustomObjectsApi() + self.core_api = kubernetes.client.CoreV1Api() def watch_registry_mirrors(self): watcher = kubernetes.watch.Watch() @@ -50,25 +57,57 @@ def watch_registry_mirrors(self): ): registry_mirror_kwargs = event['object'].copy() registry_mirror_kwargs.update(self.registry_mirror_vars) - LOGGER.debug("RM kwargs: {}".format(registry_mirror_kwargs)) + LOGGER.debug("RM kwargs: %s", registry_mirror_kwargs) mirror = RegistryMirror( event_type=event['type'], **registry_mirror_kwargs ) mirror.apply() + if self.registry_mirror_vars['addressing_scheme'] == 'services': + self.update_imageswap_config() except ApiException as e: status = HTTPStatus(e.status) if status == HTTPStatus.NOT_FOUND: raise NoCRDException( "CRD not found. Please ensure you create a CRD with group" - " - %s, version - %s and plural - %s before this operator" - " can run.", - CRD_GROUP, CRD_VERSION, CRD_PLURAL) - else: - LOGGER.exception( - "Error watching custom object events", - exc_info=True - ) + " - {}, version - {} and plural - {} before this operator" + " can run.".format(CRD_GROUP, CRD_VERSION, CRD_PLURAL)) from e + LOGGER.exception( + "Error watching custom object events", + exc_info=True + ) + def update_imageswap_config(self): + registrymirrors = self.object_api.list_cluster_custom_object( + CRD_GROUP, CRD_VERSION, CRD_PLURAL) + imageswap_config = "default:\n" + for mirror in registrymirrors['items']: + service_name = "registry-mirror-" + mirror['metadata']['name'] + service_namespace = self.registry_mirror_vars['namespace'] + try: + mirror_service = self.core_api.read_namespaced_service( + service_name, service_namespace) + except ApiException as api_exception: + json_error = json.loads(api_exception.body) + code = HTTPStatus(int(json_error['code'])) + if code == HTTPStatus.NOT_FOUND: + LOGGER.info("Serfice %s not (yet) configured", service_name) + else: + LOGGER.error("API returned status: %s, msg: %s", + code, json_error['message']) + continue + service_ip = mirror_service.spec.cluster_ip + if 'masqueradeUrl' in mirror['spec']: + masqueraded_name = mirror['spec']['masqueradeUrl'] + else: + masqueraded_name = "mirror-" + mirror['spec']['upstreamUrl'] + imageswap_config += "{0}:{1}/{0}\n".format(masqueraded_name, service_ip) + LOGGER.info("Imageswap config: %s", imageswap_config) + imageswap_namespace = self.registry_mirror_vars['imageswap_namespace'] + self.core_api.patch_namespaced_config_map( + "imageswap-maps", + imageswap_namespace, + {"data": {"maps": imageswap_config}} + ) def safely_eval_env(env_var): return ast.literal_eval(os.environ.get(env_var) @@ -79,14 +118,18 @@ def safely_eval_env(env_var): logging.basicConfig(level=logging.INFO) # Get organization specific variables from env + env_namespace=os.environ.get("NAMESPACE", "default") env_vars = dict( - namespace=os.environ.get("NAMESPACE", "default"), + namespace=env_namespace, # optional to allow for image to be pulled from elsewhere docker_registry=os.environ.get( "DOCKER_REGISTRY", "docker.io"), + # pylint: disable=fixme # TODO: remove 'hostess_docker_registry' in 1.0.0 hostess_docker_registry=os.environ.get( "HOSTESS_DOCKER_REGISTRY", "docker.io"), + addressing_scheme=os.environ.get("ADDRESSING_SCHEME", "hostess"), + imageswap_namespace=os.environ.get("IMAGESWAP_NAMESPACE", env_namespace), hostess_docker_image=os.environ.get("HOSTESS_DOCKER_IMAGE", "ocadotechnology/mirror-hostess"), hostess_docker_tag=os.environ.get("HOSTESS_DOCKER_TAG", "1.1.0"), diff --git a/mirroroperator/registrymirror.py b/mirroroperator/registrymirror.py index ed38651..7997fd6 100644 --- a/mirroroperator/registrymirror.py +++ b/mirroroperator/registrymirror.py @@ -1,11 +1,11 @@ -import bitmath -from kubernetes import client -from kubernetes.client.rest import ApiException import logging import copy import base64 import json from http import HTTPStatus +import bitmath +from kubernetes import client +from kubernetes.client.rest import ApiException REGISTRY_CERT_DIR = '/etc/registry-certs' UPSTREAM_CERT_DIR = '/etc/upstream-certs' @@ -17,8 +17,12 @@ LOGGER = logging.getLogger(__name__) -class RegistryMirror(object): +# pylint: disable=too-many-instance-attributes +class RegistryMirror: + # pylint: disable=too-many-arguments + # pylint: disable=too-many-locals def __init__(self, event_type, namespace, docker_registry, + addressing_scheme, hostess_docker_registry, hostess_docker_image, hostess_docker_tag, docker_certificate_secret, **kwargs): self.event_type = event_type @@ -27,6 +31,7 @@ def __init__(self, event_type, namespace, docker_registry, # Option to set Docker registry only for hostess images is # deprecated in favor of setting it for all used images self.hostess_docker_registry = hostess_docker_registry + self.addressing_scheme = addressing_scheme self.hostess_docker_image = hostess_docker_image self.hostess_docker_tag = hostess_docker_tag self.docker_certificate_secret = docker_certificate_secret @@ -73,7 +78,9 @@ def __init__(self, event_type, namespace, docker_registry, requests={"storage": "20Gi"} ).to_dict() - cache_size_limit = int(bitmath.parse_string_unsafe(self.volume_claim_spec.resources['requests']['storage']).to_GB() * 0.8) + cache_size_limit = int( + bitmath.parse_string_unsafe( + self.volume_claim_spec.resources['requests']['storage']).to_GB() * 0.8) self.nginx_config_template = ''' log_format json_combined escape=json @@ -112,6 +119,7 @@ def __init__(self, event_type, namespace, docker_registry, set $upstream_endpoint https://{upstream_fqdn}; location / {{{{ + rewrite /v2/{masquerade_url}/(.*) /v2/$1 break; proxy_ssl_trusted_certificate {shared_cert_mount_path}/{cert_file}; limit_except HEAD GET OPTIONS {{{{ deny all; @@ -131,6 +139,7 @@ def __init__(self, event_type, namespace, docker_registry, }}}}'''.format(registry_cert_dir=REGISTRY_CERT_DIR, cache_dir=CACHE_DIR, cache_size_limit=cache_size_limit, upstream_fqdn=upstream_url, zone="the_zone", + masquerade_url=self.masquerade_url, healthcheck_path=HEALTH_CHECK_PATH, shared_cert_mount_path=SHARED_CERT_MOUNT_PATH, cert_file=CERT_FILE) @@ -155,6 +164,7 @@ def __init__(self, event_type, namespace, docker_registry, self.core_api = client.CoreV1Api() self.apps_api = client.AppsV1Api() + self.object_api = client.CustomObjectsApi() self.priority_class = kwargs.get( "spec", {}).get("priorityClass", "user-standard") @@ -166,6 +176,16 @@ def apply(self): self.namespace ) + if self.addressing_scheme == "services": + certificate = self.run_action_and_parse_error( + self.object_api.get_namespaced_custom_object, + group="cert-manager.io", + version="v1", + plural="certificates", + name=self.full_name, + namespace=self.namespace, + ) + service_headless = self.run_action_and_parse_error( self.core_api.read_namespaced_service, self.full_name + "-headless", @@ -191,10 +211,13 @@ def apply(self): ) self.update_services(service, service_headless) + if self.addressing_scheme == "services": + self.update_certificate(certificate) self.update_stateful_set(stateful_set) self.update_daemon_set(daemon_set) self.update_secret(secret) + # pylint: disable=no-self-use def run_action_and_parse_error(self, func, *args, **kwargs): """ Helper method to avoid try/excepts all over the place + does the exception handling and parsing. @@ -214,9 +237,13 @@ def run_action_and_parse_error(self, func, *args, **kwargs): try: json_error = json.loads(api_exception.body) code = HTTPStatus(int(json_error['code'])) - LOGGER.exception( - "API returned status: %s, msg: %s, method: %s", - code, json_error['message'], func) + if code == HTTPStatus.NOT_FOUND: + LOGGER.info("API object not found: %s, %s", + json_error['message'], func) + else: + LOGGER.exception( + "API returned status: %s, msg: %s, method: %s", + code, json_error['message'], func) except json.decoder.JSONDecodeError as e: LOGGER.error("Decoder exception loading error msg: %s;" @@ -232,16 +259,8 @@ def generate_daemon_set(self, daemon_set): daemon_set.metadata = copy.deepcopy(self.metadata) daemon_set.metadata.name = self.daemon_set_name daemon_set.metadata.labels = ds_labels - daemon_set.spec = client.V1DaemonSetSpec( - min_ready_seconds=10, - selector= {"matchLabels": self.labels}, - template=client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta( - labels=ds_pod_labels - ), - spec=client.V1PodSpec( - containers=[ - client.V1Container( + + hostess_container = client.V1Container( name="mirror-hostess", env=[ client.V1EnvVar( @@ -288,10 +307,12 @@ def generate_daemon_set(self, daemon_set): mount_path="/var/lock/hostess", ), ], - ), - client.V1Container( + ) + + certinstall_container = client.V1Container( name="certificate-installation", args=[ + # pylint: disable=line-too-long "cp /source/tls.crt /target/tls.crt; while :; do sleep 2073600; done" ], command=[ @@ -316,7 +337,33 @@ def generate_daemon_set(self, daemon_set): read_only=True), ], ) - ], + + if self.addressing_scheme == "hostess": + daemonset_containers = [hostess_container, certinstall_container] + docker_cert_path = self.masquerade_url + elif self.addressing_scheme == "services": + daemonset_containers = [certinstall_container] + service = self.run_action_and_parse_error( + self.core_api.read_namespaced_service, + self.full_name, + self.namespace + ) + if not service: + LOGGER.error("Service %s not found. Daemonset not created", self.full_name) + return None + docker_cert_path = service.spec.cluster_ip + else: + LOGGER.error("Unknown addressing scheme: %s", self.addressing_scheme) + + daemon_set.spec = client.V1DaemonSetSpec( + min_ready_seconds=10, + selector= {"matchLabels": self.labels}, + template=client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta( + labels=ds_pod_labels + ), + spec=client.V1PodSpec( + containers=daemonset_containers, tolerations=self.ss_ds_tolerations, image_pull_secrets=[{"name": name} for name in self.image_pull_secrets.split(",")], @@ -343,7 +390,7 @@ def generate_daemon_set(self, daemon_set): client.V1Volume( name="docker-certs", host_path=client.V1HostPathVolumeSource( - path="/etc/docker/certs.d/{}".format(self.masquerade_url) + path="/etc/docker/certs.d/{}".format(docker_cert_path) ), ), client.V1Volume( @@ -369,11 +416,12 @@ def generate_headless_service(self, service_headless): def get_upstream_credentials(self): credentials_secret = None if self.credentials_secret_name: - credentials_secret = self.run_action_and_parse_error(self.core_api.read_namespaced_secret, - self.credentials_secret_name, - self.namespace) + credentials_secret = self.run_action_and_parse_error( + self.core_api.read_namespaced_secret, + self.credentials_secret_name, + self.namespace) if not credentials_secret: - LOGGER.error("No secret named %s was found in the %s namespace, will use unauth access", + LOGGER.info("No secret named %s was found in the %s namespace, will use unauth access", self.credentials_secret_name, self.namespace) return None @@ -425,14 +473,28 @@ def generate_stateful_set(self): ) ] - volumes.append( - client.V1Volume( - name="tls", - secret=client.V1SecretVolumeSource( - secret_name=self.docker_certificate_secret - ), + if self.addressing_scheme == 'hostess': + volumes.append( + client.V1Volume( + name="tls", + secret=client.V1SecretVolumeSource( + secret_name=self.docker_certificate_secret + ), + ) ) - ) + elif self.addressing_scheme == 'services': + volumes.append( + client.V1Volume( + name="tls", + secret=client.V1SecretVolumeSource( + secret_name=self.full_name + "-tls" + ), + ) + ) + else: + LOGGER.error("Unknown addressing scheme: %s", self.addressing_scheme) + return None + volumes.append( client.V1Volume( name="nginx-config", @@ -510,7 +572,7 @@ def generate_stateful_set(self): cp /etc/nginx/conf.d/default.conf /tmp/nginx/default.conf NAMESERVERS=$(cat /etc/resolv.conf | grep "nameserver" | awk '{{print $2}}' | tr '\n' ' ') if [ ! "$NAMESERVERS" == "" ]; then - sed -E -i "s/(#)(resolver)(;)/\\2 ${NAMESERVERS}\\3/" /tmp/nginx/default.conf + sed -E -i "s/(#)(resolver)(;)/\\2 ${NAMESERVERS} ipv6=off\\3/" /tmp/nginx/default.conf fi ''' stateful_set.spec.template = client.V1PodTemplateSpec( @@ -562,6 +624,17 @@ def generate_stateful_set(self): initial_delay_seconds=3, period_seconds=3 ), + liveness_probe=client.V1Probe( + _exec=client.V1ExecAction( + command=['test', + '/etc/registry-certs/..data/tls.crt', + '-ot', + '/var/run/nginx.pid' + ] + ), + initial_delay_seconds=60, + period_seconds=60 + ), ports=[client.V1ContainerPort( container_port=5000, name="https" @@ -576,7 +649,8 @@ def generate_stateful_set(self): priority_class_name=self.priority_class ) ) - stateful_set.spec.update_strategy = client.V1StatefulSetUpdateStrategy(type="RollingUpdate",) + stateful_set.spec.update_strategy = client.V1StatefulSetUpdateStrategy( + type="RollingUpdate",) stateful_set.metadata.labels = ss_labels return stateful_set @@ -593,6 +667,69 @@ def generate_secret(self, secret): secret.data = {"default.conf": base64.b64encode(nginx_config.encode()).decode()} return secret + def update_certificate(self, certificate): + """ Create a certificate for IP address of the given service """ + service = self.run_action_and_parse_error( + self.core_api.read_namespaced_service, + self.full_name, + self.namespace + ) + if not service: + LOGGER.error("Service not found. Cannot create Certificate") + return + + if not certificate: + cert_metadata = copy.deepcopy(self.metadata) + certificate = { + "apiVersion": "cert-manager.io/v1", + "kind": "Certificate", + "metadata": cert_metadata, + "spec": { + "ipAddresses": [service.spec.cluster_ip], + "duration": "17520h0m0s", + "issuerRef": { "kind": "Issuer", "name": "mirror-operator-issuer" }, + "privateKey": { + "algorithm": "RSA", + "encoding": "PKCS1", + "size": 2048 + }, + "secretName": self.full_name + "-tls", + "secretTemplate": cert_metadata, + "subject": { + "organizations": ["mirror-operator"], + }, + "usages": ["server auth", "client auth"], + } + } + self.run_action_and_parse_error(self.object_api.create_namespaced_custom_object, + group="cert-manager.io", + version="v1", + namespace="kube-extra", + plural="certificates", + body=certificate, + ) + + LOGGER.info("Certificate for the %s (service %s) created", + service.spec.cluster_ip, self.full_name) + else: + # NOTE: It will be needed if Service address will be changed. + patch_body = { + "spec": {"ipAddresses": [service.spec.cluster_ip]} + } + + self.run_action_and_parse_error(self.object_api.patch_namespaced_custom_object, + name=self.full_name, + group="cert-manager.io", + version="v1", + namespace="kube-extra", + plural="certificates", + body=patch_body, + ) + + LOGGER.info("Updating Certificate for the service %s", self.full_name) + + + def update_services(self, service, service_headless): empty_service = client.V1Service( metadata=copy.deepcopy(self.metadata), diff --git a/tests/test_operator.py b/tests/test_operator.py index 8e99082..30d3408 100644 --- a/tests/test_operator.py +++ b/tests/test_operator.py @@ -55,6 +55,7 @@ def setUp(self): env_var_dict = { "namespace": "default", "docker_registry": "docker.io", + "addressing_scheme": "hostess", "hostess_docker_registry": "docker.io", "hostess_docker_image": "ocadotechnology/mirror-hostess", "hostess_docker_tag": None, diff --git a/tests/test_regmirror.py b/tests/test_regmirror.py index 1b1210b..9726d75 100644 --- a/tests/test_regmirror.py +++ b/tests/test_regmirror.py @@ -18,6 +18,7 @@ def setUp(self): self.env_var_dict = { "namespace": "default", "docker_registry": "docker.io", + "addressing_scheme": "hostess", "hostess_docker_registry": "docker.io", "hostess_docker_image": "ocadotechnology/mirror-hostess", "hostess_docker_tag": 2,