From 961e75986387915be4051d02edb26f05f35ea243 Mon Sep 17 00:00:00 2001 From: Paige Rubendall Date: Tue, 28 Nov 2023 17:11:00 -0500 Subject: [PATCH] adding cerberus with krknkubernetes client --- cerberus/kubernetes/client.py | 173 +++++++++------------------------- requirements.txt | 1 + start_cerberus.py | 70 ++++++-------- 3 files changed, 74 insertions(+), 170 deletions(-) diff --git a/cerberus/kubernetes/client.py b/cerberus/kubernetes/client.py index 220e5733..daf815b3 100644 --- a/cerberus/kubernetes/client.py +++ b/cerberus/kubernetes/client.py @@ -1,72 +1,33 @@ import re -import os import sys import yaml import time import logging import requests -import urllib3 from collections import defaultdict -from kubernetes import client, config +from kubernetes import client import cerberus.invoke.command as runcommand from kubernetes.client.rest import ApiException +from krkn_lib.k8s import KrknKubernetes pods_tracker = defaultdict(dict) kubeconfig_path_global = "" +cli = None +cmd_timeout = 60 # Load kubeconfig and initialize kubernetes python client -def initialize_clients(kubeconfig_path, chunk_size, timeout): - global cli - global api_client - global client_config +def initialize_globals(kubeconfig, chunk_size, timeout): global request_chunk_size global cmd_timeout global kubeconfig_path_global + global cli - """Initialize object and create clients from specified kubeconfig""" - client_config = client.Configuration() - http_proxy = os.getenv("http_proxy", None) - """Proxy has auth header""" - if http_proxy and "@" in http_proxy: - proxy_auth = http_proxy.split("@")[0].split("//")[1] - user_pass = proxy_auth.split(":")[0] - client_config.username = user_pass[0] - client_config.password = user_pass[1] - client_config.ssl_ca_cert = False - client_config.verify_ssl = False - config.load_kube_config(config_file=kubeconfig_path, persist_config=True, client_configuration=client_config) - proxy_url = http_proxy - if proxy_url: - client_config.proxy = proxy_url - if proxy_auth: - client_config.proxy_headers = urllib3.util.make_headers(proxy_basic_auth=proxy_auth) - - client.Configuration.set_default(client_config) - cli = client.CoreV1Api() cmd_timeout = timeout request_chunk_size = str(chunk_size) - kubeconfig_path_global = kubeconfig_path - logging.info("client set") - - -def list_continue_helper(func, *args, **keyword_args): - ret_overall = [] - try: - ret = func(*args, **keyword_args) - ret_overall.append(ret) - continue_string = ret.metadata._continue - - while continue_string: - ret = func(*args, **keyword_args, _continue=continue_string) - ret_overall.append(ret) - continue_string = ret.metadata._continue - - except ApiException as e: - logging.error("Exception when calling CoreV1Api->%s: %s\n" % (str(func), e)) - - return ret_overall + kubeconfig_path_global = kubeconfig + cli = KrknKubernetes(kubeconfig_path=kubeconfig, request_chunk_size=request_chunk_size) # List nodes in the cluster @@ -74,11 +35,11 @@ def list_nodes(label_selector=None): nodes = [] try: if label_selector: - ret = list_continue_helper( + ret = cli.list_continue_helper( cli.list_node, pretty=True, label_selector=label_selector, limit=request_chunk_size ) else: - ret = list_continue_helper(cli.list_node, pretty=True, limit=request_chunk_size) + ret = cli.list_continue_helper(cli.list_node, pretty=True, limit=request_chunk_size) except ApiException as e: logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) @@ -89,16 +50,6 @@ def list_nodes(label_selector=None): return nodes -# List all namespaces -def list_namespaces(): - namespaces = [] - ret_overall = list_continue_helper(cli.list_namespace, pretty=True, limit=request_chunk_size) - for ret_items in ret_overall: - for namespace in ret_items.items: - namespaces.append(namespace.metadata.name) - return namespaces - - # Monitor the status of all specified namespaces # and set the status to true or false def monitor_namespaces_status(watch_namespaces, watch_terminating_namespaces, iteration, iter_track_time): @@ -107,15 +58,16 @@ def monitor_namespaces_status(watch_namespaces, watch_terminating_namespaces, it if watch_terminating_namespaces: watch_nodes_start_time = time.time() try: - ret = cli.list_namespace(pretty=True) + ret = cli.list_all_namespaces() except ApiException as e: - logging.error("Exception when calling CoreV1Api->list_namespace: %s\n" % e) + logging.info("Exception when calling CoreV1Api->list_namespace: %s\n" % e) sys.exit(1) - for namespace in ret.items: - if namespace.metadata.name in watch_namespaces: - if namespace.status.phase != "Active": - namespaces.append(namespace.metadata.name) - none_terminating = False + for ret_item in ret: + for namespace in ret_item.items: + if namespace.metadata.name in watch_namespaces: + if namespace.status.phase != "Active": + namespaces.append(namespace.metadata.name) + none_terminating = False iter_track_time["watch_terminating_namespaces"] = time.time() - watch_nodes_start_time logging.info("Iteration %s: No Terminating Namespaces status: %s" % (iteration, str(none_terminating))) else: @@ -134,36 +86,18 @@ def get_node_info(node): logging.error("Exception when calling CoreV1Api->read_node_status: %s\n" % e) -# Get status of a pod in a namespace -def get_pod_status(pod, namespace): - try: - return cli.read_namespaced_pod_status(pod, namespace, pretty=True) - except ApiException as e: - logging.error("Exception when calling CoreV1Api->read_namespaced_pod_status: %s\n" % e) - - # Outputs a json blob with information about all the nodes def get_all_nodes_info(): try: - return list_continue_helper(cli.list_node, limit=request_chunk_size) + return cli.list_nodes() except ApiException as e: logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e) -# Outputs a json blob with informataion about all pods in a given namespace -def get_all_pod_info(namespace): - try: - ret = list_continue_helper(cli.list_namespaced_pod, namespace, pretty=True, limit=request_chunk_size) - except ApiException as e: - logging.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e) - - return ret - - # Check if all the watch_namespaces are valid def check_namespaces(namespaces): try: - valid_namespaces = list_namespaces() + valid_namespaces = cli.list_namespaces() regex_namespaces = set(namespaces) - set(valid_namespaces) final_namespaces = set(namespaces) - set(regex_namespaces) valid_regex = set() @@ -185,7 +119,7 @@ def check_namespaces(namespaces): # Check the namespace name for default SDN def check_sdn_namespace(): - namespaces = list_namespaces() + namespaces = cli.list_namespaces() if "openshift-ovn-kubernetes" in namespaces: return "openshift-ovn-kubernetes" if "openshift-sdn" in namespaces: @@ -197,31 +131,13 @@ def check_sdn_namespace(): sys.exit(1) -# Monitor the status of the cluster nodes and set the status to true or false -def monitor_nodes(): - notready_nodes = [] - all_nodes_info_list = get_all_nodes_info() - for all_nodes_info in all_nodes_info_list: - for node_info in all_nodes_info.items: - node = node_info.metadata.name - node_kerneldeadlock_status = "False" - for condition in node_info.status.conditions: - if condition.type == "KernelDeadlock": - node_kerneldeadlock_status = condition.status - elif condition.type == "Ready": - node_ready_status = condition.status - else: - continue - if node_kerneldeadlock_status != "False" or node_ready_status != "True": - notready_nodes.append(node) - status = False if notready_nodes else True - return status, notready_nodes - - def process_nodes(watch_nodes, iteration, iter_track_time): if watch_nodes: watch_nodes_start_time = time.time() - watch_nodes_status, failed_nodes = monitor_nodes() + try: + watch_nodes_status, failed_nodes = cli.monitor_nodes() + except Exception as e: + logging.error("Caught error during node status" + str(e)) iter_track_time["watch_nodes"] = time.time() - watch_nodes_start_time logging.info("Iteration %s: Node status: %s" % (iteration, watch_nodes_status)) else: @@ -236,7 +152,7 @@ def process_nodes(watch_nodes, iteration, iter_track_time): # Track the pods that were crashed/restarted during the sleep interval of an iteration def namespace_sleep_tracker(namespace, pods_tracker, ignore_patterns): crashed_restarted_pods = defaultdict(list) - all_pod_info_list = get_all_pod_info(namespace) + all_pod_info_list = cli.get_all_pod_info(namespace) if all_pod_info_list is not None and len(all_pod_info_list) > 0: for all_pod_info in all_pod_info_list: for pod_info in all_pod_info.items: @@ -292,7 +208,7 @@ def monitor_namespace(namespace, ignore_pattern=None): notready_pods = set() match = False notready_containers = defaultdict(list) - all_pod_info_list = get_all_pod_info(namespace) + all_pod_info_list = cli.get_all_pod_info(namespace) if all_pod_info_list is not None and len(all_pod_info_list) > 0: for all_pod_info in all_pod_info_list: for pod_info in all_pod_info.items: @@ -339,6 +255,8 @@ def process_namespace(iteration, namespace, failed_pods_components, failed_pod_c # Get cluster operators and return yaml def get_cluster_operators(): + + # should update with kubernetes api operators_status = runcommand.invoke("kubectl get co -o yaml --kubeconfig " + kubeconfig_path_global, cmd_timeout) status_yaml = yaml.load(operators_status, Loader=yaml.FullLoader) return status_yaml @@ -377,34 +295,32 @@ def process_cluster_operator(distribution, watch_cluster_operators, iteration, i # Check for NoSchedule taint in all the master nodes -def check_master_taint(master_nodes, master_label): +def check_master_taint(): schedulable_masters = [] - - for master_node in master_nodes: - node_info = get_node_info(master_node) - node = node_info.metadata.name + # updating to use get_nodes_infos(self) from krkn_lib + node_list_info = cli.get_nodes_infos() + for node_info in node_list_info: + node = node_info.name NoSchedule_taint = False try: - if node_info.spec is not None: - if node_info.spec.taints is not None: - for taint in node_info.spec.taints: - if taint.key == str(master_label) and taint.effect == "NoSchedule": - NoSchedule_taint = True - break - if not NoSchedule_taint: - schedulable_masters.append(node) + if node_info.taint is not None: + for taint in node_info.taint: + if node_info.node_type == "master" and taint.effect == "NoSchedule": + NoSchedule_taint = True + if not NoSchedule_taint: + schedulable_masters.append(node) except Exception as e: logging.info("Exception getting master nodes" + str(e)) schedulable_masters.append(node) return schedulable_masters -def process_master_taint(master_nodes, master_label, iteration, iter_track_time): +def process_master_taint(watch_master_schedulable, iteration, iter_track_time): schedulable_masters = [] - if len(master_nodes) > 0: + if watch_master_schedulable: if iteration % 10 == 1: check_taint_start_time = time.time() - schedulable_masters = check_master_taint(master_nodes, master_label) + schedulable_masters = check_master_taint() iter_track_time["check_master_taint"] = time.time() - check_taint_start_time return schedulable_masters @@ -418,6 +334,7 @@ def is_url_available(url, header=None): else: return True except Exception: + logging.info(f"Url: {url} is not available") return False @@ -446,7 +363,7 @@ def get_csrs(): def get_host() -> str: """Returns the Kubernetes server URL""" - return client.configuration.Configuration.get_default_copy().host + return cli.get_host() def get_clusterversion_string() -> str: diff --git a/requirements.txt b/requirements.txt index accad4a8..4f844353 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ slack_sdk pyfiglet prometheus_api_client coverage +krkn-lib>=1.4.2 diff --git a/start_cerberus.py b/start_cerberus.py index 536638f8..dedbdf4c 100644 --- a/start_cerberus.py +++ b/start_cerberus.py @@ -18,7 +18,7 @@ import cerberus.server.server as server import cerberus.inspect.inspect as inspect import cerberus.invoke.command as runcommand -import cerberus.kubernetes.client as kubecli +import cerberus.kubernetes.client as cerberus_kubecli import cerberus.slack.slack_client as slackcli import cerberus.prometheus.client as promcli import cerberus.database.client as dbcli @@ -121,25 +121,25 @@ def main(cfg): logging.error("Proper kubeconfig not set, please set proper kubeconfig path") print_final_status_json(-1, "Unknown", 1) sys.exit(1) - os.environ["KUBECONFIG"] = str(kubeconfig_path) - logging.info("Initializing client to talk to the Kubernetes cluster") - kubecli.initialize_clients(kubeconfig_path, request_chunk_size, cmd_timeout) + try: + os.environ["KUBECONFIG"] = str(kubeconfig_path) + # krkn-lib-kubernetes init + cerberus_kubecli.initialize_globals(kubeconfig_path, request_chunk_size, cmd_timeout) + except Exception: + cerberus_kubecli.initialize_globals(None, request_chunk_size, cmd_timeout) if "openshift-sdn" in watch_namespaces: - sdn_namespace = kubecli.check_sdn_namespace() + sdn_namespace = cerberus_kubecli.check_sdn_namespace() watch_namespaces = [namespace.replace("openshift-sdn", sdn_namespace) for namespace in watch_namespaces] # Check if all the namespaces under watch_namespaces are valid - watch_namespaces = kubecli.check_namespaces(watch_namespaces) + watch_namespaces = cerberus_kubecli.check_namespaces(watch_namespaces) # Cluster info - logging.info("Fetching cluster info") - cv = kubecli.get_clusterversion_string() - if cv != "": - logging.info(cv) - else: - logging.info("Cluster version CRD not detected, skipping") - logging.info("Server URL: %s" % kubecli.get_host()) + # Need to get clusterversion + cv = "" + cerberus_host_url = cerberus_kubecli.get_host() + logging.info("Server URL: %s" % cerberus_host_url) # Run http server using a separate thread if cerberus is asked # to publish the status. It is served by the http server. @@ -167,23 +167,8 @@ def main(cfg): logging.info("Detailed inspection of failed components has been enabled") inspect.delete_inspect_directory() - # get list of all master nodes with provided labels in the config - master_nodes = [] - master_label = "" - if watch_master_schedulable["enabled"]: - master_label = watch_master_schedulable["label"] - nodes = kubecli.list_nodes(master_label) - if len(nodes) == 0: - logging.error( - "No master node found for the label %s. Please check master node config." % (master_label) - ) # noqa - print_final_status_json(-1, "Unknown", 1) - sys.exit(1) - else: - master_nodes.extend(nodes) - # Use cluster_info to get the api server url - api_server_url = kubecli.get_host() + "/healthz" + api_server_url = cerberus_host_url + "/healthz" # Counter for if api server is not ok api_fail_count = 0 @@ -193,7 +178,7 @@ def main(cfg): pool = multiprocessing.Pool(int(cores_usage_percentage * multiprocessing.cpu_count()), init_worker) manager = multiprocessing.Manager() pods_tracker = manager.dict() - + logging.info("set up pool") # Track time taken for different checks in each iteration global time_tracker time_tracker = {} @@ -232,7 +217,7 @@ def main(cfg): iteration_start_time = time.time() iteration += 1 - + logging.info("starting iteration top") # Read the config for info when slack integration is enabled if slack_integration: weekday = runcommand.invoke("date '+%A'")[:-1] @@ -248,7 +233,7 @@ def main(cfg): if iteration == 1: pool.starmap( - kubecli.namespace_sleep_tracker, + cerberus_kubecli.namespace_sleep_tracker, zip(watch_namespaces, repeat(pods_tracker), repeat(watch_namespaces_ignore_pattern)), ) @@ -264,21 +249,21 @@ def main(cfg): ) = pool.map( smap, [ - functools.partial(kubecli.is_url_available, api_server_url), + functools.partial(cerberus_kubecli.is_url_available, api_server_url), functools.partial( - kubecli.process_master_taint, master_nodes, master_label, iteration, iter_track_time + cerberus_kubecli.process_master_taint, watch_master_schedulable, iteration, iter_track_time ), - functools.partial(kubecli.process_nodes, watch_nodes, iteration, iter_track_time), + functools.partial(cerberus_kubecli.process_nodes, watch_nodes, iteration, iter_track_time), functools.partial( - kubecli.process_cluster_operator, + cerberus_kubecli.process_cluster_operator, distribution, watch_cluster_operators, iteration, iter_track_time, ), - functools.partial(kubecli.process_routes, watch_url_routes, iter_track_time), + functools.partial(cerberus_kubecli.process_routes, watch_url_routes, iter_track_time), functools.partial( - kubecli.monitor_namespaces_status, + cerberus_kubecli.monitor_namespaces_status, watch_namespaces, watch_terminating_namespaces, iteration, @@ -286,7 +271,7 @@ def main(cfg): ), ], ) - + logging.info("done partial") # Increment api_fail_count if api server url is not ok if not server_status: api_fail_count += 1 @@ -300,7 +285,7 @@ def main(cfg): # Monitor all the namespaces parallely watch_namespaces_start_time = time.time() pool.starmap( - kubecli.process_namespace, + cerberus_kubecli.process_namespace, zip( repeat(iteration), watch_namespaces, @@ -309,6 +294,7 @@ def main(cfg): repeat(watch_namespaces_ignore_pattern), ), ) + logging.info("after partial 2") watch_namespaces_status = False if failed_pods_components else True iter_track_time["watch_namespaces"] = time.time() - watch_namespaces_start_time @@ -392,7 +378,7 @@ def main(cfg): if distribution == "openshift": watch_csrs_start_time = time.time() - csrs = kubecli.get_csrs() + csrs = cerberus_kubecli.get_csrs() pending_csr = [] for csr in csrs["items"]: # find csr status @@ -488,7 +474,7 @@ def main(cfg): # Track pod crashes/restarts during the sleep interval in all namespaces parallely multiprocessed_output = pool.starmap( - kubecli.namespace_sleep_tracker, + cerberus_kubecli.namespace_sleep_tracker, zip(watch_namespaces, repeat(pods_tracker), repeat(watch_namespaces_ignore_pattern)), )