From 321d1473d805599f919016bbcbeed27c184fc269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Sandstr=C3=B6m?= Date: Fri, 15 Dec 2023 12:10:42 +0100 Subject: [PATCH] Feature/kube event listener (#114) * Adding event listener to update app statuses in Serve --- apps/controller.py | 2 +- apps/tasks.py | 345 +++++++++++++------------- apps/views.py | 14 +- fixtures/periodic_tasks_fixtures.json | 28 --- requirements.txt | 1 + 5 files changed, 183 insertions(+), 207 deletions(-) diff --git a/apps/controller.py b/apps/controller.py index 8e2da9cf..8cc33752 100644 --- a/apps/controller.py +++ b/apps/controller.py @@ -22,7 +22,7 @@ def delete(options): def deploy(options): print("STARTING DEPLOY FROM CONTROLLER") - _ = os.environ["BASE_PATH"] + app = Apps.objects.get(slug=options["app_slug"], revision=options["app_revision"]) if app.chart_archive and app.chart_archive != "": try: diff --git a/apps/tasks.py b/apps/tasks.py index 5dc171bb..bbab8017 100644 --- a/apps/tasks.py +++ b/apps/tasks.py @@ -4,12 +4,14 @@ import requests from celery import shared_task +from celery.signals import worker_ready from django.apps import apps from django.conf import settings from django.core.exceptions import EmptyResultSet from django.db import transaction from django.db.models import Q from django.utils import timezone +from kubernetes import client, config, watch from models.models import Model, ObjectType from projects.models import S3, BasicAuth, Environment, MLFlow @@ -18,6 +20,13 @@ from . import controller from .models import AppInstance, Apps, AppStatus, ResourceData +K8S_STATUS_MAP = { + "CrashLoopBackOff": "Error", + "Completed": "Retrying...", + "ContainerCreating": "Created", + "PodInitializing": "Pending", +} + ReleaseName = apps.get_model(app_label=settings.RELEASENAME_MODEL) @@ -173,17 +182,14 @@ def post_delete_hooks(instance): def delete_and_deploy_resource(instance_pk, new_release_name): appinstance = AppInstance.objects.select_for_update().get(pk=instance_pk) - if appinstance and appinstance.state != "Deleted": + if appinstance: # The instance does exist. parameters = appinstance.parameters results = controller.delete(parameters) if results.returncode == 0: post_delete_hooks(appinstance) - parameters["release"] = new_release_name - appinstance.parameters.update(parameters) - appinstance.save() - deploy_resource(instance_pk) + deploy_resource(appinstance.pk) try: rel_name_obj = ReleaseName.objects.get( name=new_release_name, project=appinstance.project, status="active" @@ -200,50 +206,32 @@ def delete_and_deploy_resource(instance_pk, new_release_name): @transaction.atomic def deploy_resource(instance_pk, action="create"): print("TASK - DEPLOY RESOURCE...") - app_instance = AppInstance.objects.select_for_update().get(pk=instance_pk) - status = AppStatus(appinstance=app_instance) - - if (action == "create") or (action == "update"): - parameters = app_instance.parameters - status.status_type = "Created" - status.info = parameters["release"] - - # For backwards-compatibility with old ingress spec: - if "ingress" not in parameters: - parameters["ingress"] = dict() - - app_instance.parameters = parameters - print("App Instance paramenters: {}".format(app_instance)) - app_instance.save() + appinstance = AppInstance.objects.select_for_update().get(pk=instance_pk) - results = controller.deploy(app_instance.parameters) + results = controller.deploy(appinstance.parameters) stdout, stderr = process_helm_result(results) if results.returncode == 0: print("Helm install succeeded") - status.status_type = "Installed" - app_instance.state = "Running" + helm_info = { "success": True, "info": {"stdout": stdout, "stderr": stderr}, } else: print("Helm install failed") - status.status_type = "Failed" - app_instance.state = "Failed" helm_info = { "success": False, "info": {"stdout": stdout, "stderr": stderr}, } - app_instance.info["helm"] = helm_info - app_instance.save() - status.save() + appinstance.info["helm"] = helm_info + appinstance.save() if results.returncode != 0: - print(app_instance.info["helm"]) + print(appinstance.info["helm"]) else: - post_create_hooks(app_instance) + post_create_hooks(appinstance) @shared_task @@ -266,7 +254,8 @@ def delete_resource(pk): if results.returncode == 0 or "release: not found" in results.stderr.decode("utf-8"): status = AppStatus(appinstance=appinstance) - status.status_type = "Terminated" + status.status_type = "Deleting..." + appinstance.state = "Deleting..." status.save() print("CALLING POST DELETE HOOKS") post_delete_hooks(appinstance) @@ -275,6 +264,7 @@ def delete_resource(pk): status.status_type = "FailedToDelete" status.save() appinstance.state = "FailedToDelete" + appinstance.save(update_fields=["state"]) @shared_task @@ -296,148 +286,6 @@ def delete_resource_permanently(appinstance): appinstance.delete() -@app.task -@transaction.atomic -def check_status(): - # TODO: Fix for multicluster setup. - args = [ - "kubectl", - "-n", - settings.NAMESPACE, - "get", - "po", - "-l", - "type=app", - "-o", - "json", - ] - results = subprocess.run( - args, - capture_output=True, - check=True, - timeout=settings.KUBE_API_REQUEST_TIMEOUT, - ) - res_json = json.loads(results.stdout.decode("utf-8")) - app_statuses = dict() - # TODO: Handle case of having many pods (could have many replicas, - # or could be right after update) - for item in res_json["items"]: - release = item["metadata"]["labels"]["release"] - phase = item["status"]["phase"] - - deletion_timestamp = [] - if "deletionTimestamp" in item["metadata"]: - deletion_timestamp = item["metadata"]["deletionTimestamp"] - phase = "Terminated" - num_containers = -1 - try: - num_containers = len(item["status"]["containerStatuses"]) - except: # noqa E722 TODO: Add exception - print("Failed to get number of containers.") - pass - num_cont_ready = 0 - if "containerStatuses" in item["status"]: - for container in item["status"]["containerStatuses"]: - if container["ready"]: - num_cont_ready += 1 - if phase == "Running" and num_cont_ready != num_containers: - phase = "Waiting" - app_statuses[release] = { - "phase": phase, - "num_cont": num_containers, - "num_cont_ready": num_cont_ready, - "deletion_status": deletion_timestamp, - } - - # Fetch all app instances whose state is not "Deleted" - instances = AppInstance.objects.filter(~Q(state="Deleted")) - - for instance in instances: - release = instance.parameters["release"] - if release in app_statuses: - current_status = app_statuses[release]["phase"] - try: - latest_status = AppStatus.objects.filter(appinstance=instance).latest("time").status_type - except: # noqa E722 TODO: Add exception - latest_status = "Unknown" - if current_status != latest_status: - print("New status for release {}".format(release)) - print("Current status: {}".format(current_status)) - print("Previous status: {}".format(latest_status)) - status = AppStatus(appinstance=instance) - # if app_statuses[release]['deletion_status']: - # status.status_type = "Terminated" - # else: - status.status_type = app_statuses[release]["phase"] - # status.info = app_statuses[release] - status.save() - # else: - # print("No update for release: {}".format(release)) - else: - delete_exists = AppStatus.objects.filter(appinstance=instance, status_type="Terminated").exists() - if delete_exists: - status = AppStatus(appinstance=instance) - status.status_type = "Deleted" - status.save() - instance.state = "Deleted" - instance.deleted_on = timezone.now() - instance.save() - - # Fetch all app instances whose state is "Deleted" and check whether - # there are related pods which are still running - pod_status = "None" - instances = AppInstance.objects.filter(state="Deleted") - for instance in instances: - if "url" in instance.table_field: - # Find the app instance release name - app_release = instance.parameters["release"] # e.g 'rfc058c6f' - # Now check if there exists a pod with that release - cmd = f'kubectl -n {settings.NAMESPACE} get po -l release="{app_release}"' - - try: - # returns a byte-like object - result = subprocess.run( - cmd, - shell=True, - capture_output=True, - timeout=settings.KUBE_API_REQUEST_TIMEOUT, - ) - result_stdout = result.stdout.decode("utf-8") - result_stderr = result.stderr.decode("utf-8") - except subprocess.CalledProcessError: - print("Error running the command: {}".format(cmd)) - - if result_stdout != "" and "No resources found in default namespace." not in result_stderr: - # Extract the the status of the related release pod - cmd = ( - "kubectl" - f" -n {settings.NAMESPACE}" - f" get po -l release={app_release} " - ' -o jsonpath="{.items[0].status.phase}"' - ) - try: - result = subprocess.run( - cmd, - shell=True, - capture_output=True, - timeout=settings.KUBE_API_REQUEST_TIMEOUT, - ) - pod_status = result.stdout.decode("utf-8") - print(pod_status) - except subprocess.CalledProcessError: - print("Error running the command: {}".format(cmd)) - - if pod_status == "Running" and instance.state == "Deleted": - print("INFO: running pod associated to app (Deleted)") - print("INFO: DELETE RESOURCE with release: {}".format(app_release)) - cmd = "helm" + " delete " + app_release - try: - result = subprocess.run(cmd, shell=True, capture_output=True) - print(result) - except subprocess.CalledProcessError: - print("Error running the command: {}".format(cmd)) - - @app.task def get_resource_usage(): timestamp = time.time() @@ -649,3 +497,154 @@ def delete_old_objects(): old_apps = AppInstance.objects.filter(created_on__lt=threshold_time, app__category__name="Develop") for app_ in old_apps: delete_resource.delay(app_.pk) + + +@shared_task(bind=True, max_retries=None) +def init_event_listener(self, namespace, label_selector): + """ + The event listener takes the latest event and checks if a corresponding appinstance + should be updated. It uses the creation timestamp to always use the status of the youngest pod + in a helm release. + """ + k8s_api = setup_client() + k8s_watch = watch.Watch() + try: + status_data = {} + for event in k8s_watch.stream(k8s_api.list_namespaced_pod, namespace=namespace, label_selector=label_selector): + pod = event["object"] + + status = get_status(pod) + status = status[:15] + release = pod.metadata.labels["release"] + creation_timestamp = pod.metadata.creation_timestamp + deletion_timestamp = pod.metadata.deletion_timestamp + + appinstance = AppInstance.objects.filter(parameters__contains={"release": release}).last() + + if appinstance: + # Case 1 - Set unseen release + if release not in status_data: + status_data[release] = { + "creation_timestamp": creation_timestamp, + "deletion_timestamp": deletion_timestamp, + "status": status, + } + + # If older pod, skip + if ( + creation_timestamp < status_data[release]["creation_timestamp"] + or status == status_data[release]["status"] + ): + continue + + # If pod is same and deleted, set deleted stamp + elif creation_timestamp == status_data[release]["creation_timestamp"] and deletion_timestamp: + status = "Deleted" + appinstance.deleted_on = timezone.now() + + # If pod is newer, update + status_data[release] = { + "creation_timestamp": creation_timestamp, + "deletion_timestamp": deletion_timestamp, + "status": status, + } + status_object = AppStatus(appinstance=appinstance) + update_status(appinstance, status_object, status) + except Exception as exc: + print("Event listner exception occured", exc) + # Catch other exceptions to trigger a retry + raise self.retry(exc=exc, countdown=10) + + +@worker_ready.connect +def on_worker_ready(**kwargs): + """ + This function starts the event listener in a given namespace. + When the Celery worker is ready, the task is started. + """ + label_selector = "type=app" + NAMESPACE = settings.NAMESPACE + sync_all_statuses(namespace=NAMESPACE, label_selector=label_selector) + init_event_listener.apply_async(args=(NAMESPACE, label_selector), countdown=1) + + +def setup_client(): + """ + Sets up the kubernetes python client and event streamer + """ + try: + config.load_incluster_config() + except config.ConfigException: + try: + config.load_kube_config(settings.KUBECONFIG) + except config.ConfigException as e: + raise config.ConfigException( + "Could not set the cluster config. Try to use the cluster.conf file or set incluster config" + ) from e + + k8s_api = client.CoreV1Api() + + return k8s_api + + +def get_status(pod): + """ + Returns the status of a pod by looping through each container + and checking the status. + """ + container_statuses = pod.status.container_statuses + + if container_statuses is not None: + for container_status in container_statuses: + state = container_status.state + + if state is not None: + terminated = state.terminated + + if terminated is not None: + reason = terminated.reason + return mapped_status(reason) + + waiting = state.waiting + + if waiting is not None: + reason = waiting.reason + return mapped_status(reason) + else: + running = state.running + ready = container_status.ready + if running and ready: + return "Running" + else: + return "Pending" + + return pod.status.phase + + +def mapped_status(reason: str) -> str: + return K8S_STATUS_MAP.get(reason, reason) + + +def sync_all_statuses(namespace, label_selector): + """ + Syncs the status of all apps with a pod that is on the cluster + """ + k8s_api = setup_client() + for pod in k8s_api.list_namespaced_pod(namespace=namespace, label_selector=label_selector).items: + status = pod.status.phase + release = pod.metadata.labels["release"] + appinstance = AppInstance.objects.filter(parameters__contains={"release": release}).last() + if appinstance: + status_object = AppStatus(appinstance=appinstance) + update_status(appinstance, status_object, status) + + +@transaction.atomic +def update_status(appinstance, status_object, status): + """ + Helper function to update the status of an appinstance and a status object. + """ + status_object.status_type = status + status_object.save() + appinstance.state = status + appinstance.save(update_fields=["state"]) diff --git a/apps/views.py b/apps/views.py index c67224eb..680ed4d6 100644 --- a/apps/views.py +++ b/apps/views.py @@ -266,7 +266,7 @@ def update_app_instance(self, request, project, appinstance, app_settings, body) appinstance.access = access appinstance.app_dependencies.set(app_deps) appinstance.model_dependencies.set(model_deps) - appinstance.save() + appinstance.save(update_fields=["flavor", "name", "description", "parameters", "access"]) self.update_resource(request, appinstance, current_release_name) def update_resource(self, request, appinstance, current_release_name): @@ -281,10 +281,14 @@ def update_resource(self, request, appinstance, current_release_name): appinstance.table_field.update({"url": new_url}) if new_release_name and current_release_name != new_release_name: # This handles the case where a user creates a new subdomain, we must update the helm release aswell - _ = delete_and_deploy_resource.delay(appinstance.pk, new_release_name) - else: - # Otherwise, we update the resources in the same helm release - _ = deploy_resource.delay(appinstance.pk, "update") + delete_resource.delay(appinstance.pk) + parameters = appinstance.parameters + parameters["release"] = new_release_name + parameters["appname"] = new_release_name + appinstance.parameters.update(parameters) + appinstance.save(update_fields=["parameters", "table_field"]) + + deploy_resource.delay(appinstance.pk, "update") appinstance.save() diff --git a/fixtures/periodic_tasks_fixtures.json b/fixtures/periodic_tasks_fixtures.json index b38db14b..3c3624b6 100644 --- a/fixtures/periodic_tasks_fixtures.json +++ b/fixtures/periodic_tasks_fixtures.json @@ -27,34 +27,6 @@ "model": "django_celery_beat.periodictask", "pk": 1 }, - { - "fields": { - "args": "[]", - "clocked": null, - "crontab": null, - "date_changed": "2021-02-26T14:03:40.168Z", - "description": "", - "enabled": true, - "exchange": null, - "expire_seconds": null, - "expires": null, - "headers": "{}", - "interval": 1, - "kwargs": "{}", - "last_run_at": "2021-02-26T14:03:37.169Z", - "name": "check_app_status", - "one_off": false, - "priority": null, - "queue": null, - "routing_key": null, - "solar": null, - "start_time": null, - "task": "apps.tasks.check_status", - "total_run_count": 174 - }, - "model": "django_celery_beat.periodictask", - "pk": 3 - }, { "fields": { "args": "[]", diff --git a/requirements.txt b/requirements.txt index a2b48c46..74e49d6e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,6 +24,7 @@ drf-nested-routers==0.93.4 s3fs==2023.9.2 minio==7.1.17 celery==5.3.4 +kubernetes==28.1.0 # Building with global-options takes a very long time #Pillow==9.4.0 --global-option="build_ext" --global-option="--disable-tiff" --global-option="--disable-freetype" --global-option="--disable-lcms" --global-option="--disable-webp" --global-option="--disable-webpmux" --global-option="--disable-imagequant" --global-option="--disable-xcb" --global-option="--disable-zlib"