Skip to content

Commit

Permalink
adding cerberus with krknkubernetes client
Browse files Browse the repository at this point in the history
  • Loading branch information
paigerube14 committed Jan 15, 2024
1 parent 3d28498 commit 14bdbe6
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 170 deletions.
173 changes: 45 additions & 128 deletions cerberus/kubernetes/client.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,45 @@
import re
import os
import sys
import yaml
import time
import logging
import requests
import urllib3
from collections import defaultdict
from kubernetes import client, config
from kubernetes import client
import cerberus.invoke.command as runcommand
from kubernetes.client.rest import ApiException
from krkn_lib.k8s import KrknKubernetes

pods_tracker = defaultdict(dict)

kubeconfig_path_global = ""
cli = None
cmd_timeout = 60


# Load kubeconfig and initialize kubernetes python client
def initialize_clients(kubeconfig_path, chunk_size, timeout):
global cli
global api_client
global client_config
def initialize_globals(kubeconfig, chunk_size, timeout):
global request_chunk_size
global cmd_timeout
global kubeconfig_path_global
global cli

"""Initialize object and create clients from specified kubeconfig"""
client_config = client.Configuration()
http_proxy = os.getenv("http_proxy", None)
"""Proxy has auth header"""
if http_proxy and "@" in http_proxy:
proxy_auth = http_proxy.split("@")[0].split("//")[1]
user_pass = proxy_auth.split(":")[0]
client_config.username = user_pass[0]
client_config.password = user_pass[1]
client_config.ssl_ca_cert = False
client_config.verify_ssl = False
config.load_kube_config(config_file=kubeconfig_path, persist_config=True, client_configuration=client_config)
proxy_url = http_proxy
if proxy_url:
client_config.proxy = proxy_url
if proxy_auth:
client_config.proxy_headers = urllib3.util.make_headers(proxy_basic_auth=proxy_auth)

client.Configuration.set_default(client_config)
cli = client.CoreV1Api()
cmd_timeout = timeout
request_chunk_size = str(chunk_size)
kubeconfig_path_global = kubeconfig_path
logging.info("client set")


def list_continue_helper(func, *args, **keyword_args):
ret_overall = []
try:
ret = func(*args, **keyword_args)
ret_overall.append(ret)
continue_string = ret.metadata._continue

while continue_string:
ret = func(*args, **keyword_args, _continue=continue_string)
ret_overall.append(ret)
continue_string = ret.metadata._continue

except ApiException as e:
logging.error("Exception when calling CoreV1Api->%s: %s\n" % (str(func), e))

return ret_overall
kubeconfig_path_global = kubeconfig
cli = KrknKubernetes(kubeconfig_path=kubeconfig, request_chunk_size=request_chunk_size)


# List nodes in the cluster
def list_nodes(label_selector=None):
nodes = []
try:
if label_selector:
ret = list_continue_helper(
ret = cli.list_continue_helper(
cli.list_node, pretty=True, label_selector=label_selector, limit=request_chunk_size
)
else:
ret = list_continue_helper(cli.list_node, pretty=True, limit=request_chunk_size)
ret = cli.list_continue_helper(cli.list_node, pretty=True, limit=request_chunk_size)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)

Expand All @@ -89,16 +50,6 @@ def list_nodes(label_selector=None):
return nodes


# List all namespaces
def list_namespaces():
namespaces = []
ret_overall = list_continue_helper(cli.list_namespace, pretty=True, limit=request_chunk_size)
for ret_items in ret_overall:
for namespace in ret_items.items:
namespaces.append(namespace.metadata.name)
return namespaces


# Monitor the status of all specified namespaces
# and set the status to true or false
def monitor_namespaces_status(watch_namespaces, watch_terminating_namespaces, iteration, iter_track_time):
Expand All @@ -107,15 +58,16 @@ def monitor_namespaces_status(watch_namespaces, watch_terminating_namespaces, it
if watch_terminating_namespaces:
watch_nodes_start_time = time.time()
try:
ret = cli.list_namespace(pretty=True)
ret = cli.list_all_namespaces()
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_namespace: %s\n" % e)
logging.info("Exception when calling CoreV1Api->list_namespace: %s\n" % e)
sys.exit(1)
for namespace in ret.items:
if namespace.metadata.name in watch_namespaces:
if namespace.status.phase != "Active":
namespaces.append(namespace.metadata.name)
none_terminating = False
for ret_item in ret:
for namespace in ret_item.items:
if namespace.metadata.name in watch_namespaces:
if namespace.status.phase != "Active":
namespaces.append(namespace.metadata.name)
none_terminating = False
iter_track_time["watch_terminating_namespaces"] = time.time() - watch_nodes_start_time
logging.info("Iteration %s: No Terminating Namespaces status: %s" % (iteration, str(none_terminating)))
else:
Expand All @@ -134,36 +86,18 @@ def get_node_info(node):
logging.error("Exception when calling CoreV1Api->read_node_status: %s\n" % e)


# Get status of a pod in a namespace
def get_pod_status(pod, namespace):
try:
return cli.read_namespaced_pod_status(pod, namespace, pretty=True)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->read_namespaced_pod_status: %s\n" % e)


# Outputs a json blob with information about all the nodes
def get_all_nodes_info():
try:
return list_continue_helper(cli.list_node, limit=request_chunk_size)
return cli.list_nodes()
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_node: %s\n" % e)


# Outputs a json blob with informataion about all pods in a given namespace
def get_all_pod_info(namespace):
try:
ret = list_continue_helper(cli.list_namespaced_pod, namespace, pretty=True, limit=request_chunk_size)
except ApiException as e:
logging.error("Exception when calling CoreV1Api->list_namespaced_pod: %s\n" % e)

return ret


# Check if all the watch_namespaces are valid
def check_namespaces(namespaces):
try:
valid_namespaces = list_namespaces()
valid_namespaces = cli.list_namespaces()
regex_namespaces = set(namespaces) - set(valid_namespaces)
final_namespaces = set(namespaces) - set(regex_namespaces)
valid_regex = set()
Expand All @@ -185,7 +119,7 @@ def check_namespaces(namespaces):

# Check the namespace name for default SDN
def check_sdn_namespace():
namespaces = list_namespaces()
namespaces = cli.list_namespaces()
if "openshift-ovn-kubernetes" in namespaces:
return "openshift-ovn-kubernetes"
if "openshift-sdn" in namespaces:
Expand All @@ -197,31 +131,13 @@ def check_sdn_namespace():
sys.exit(1)


# Monitor the status of the cluster nodes and set the status to true or false
def monitor_nodes():
notready_nodes = []
all_nodes_info_list = get_all_nodes_info()
for all_nodes_info in all_nodes_info_list:
for node_info in all_nodes_info.items:
node = node_info.metadata.name
node_kerneldeadlock_status = "False"
for condition in node_info.status.conditions:
if condition.type == "KernelDeadlock":
node_kerneldeadlock_status = condition.status
elif condition.type == "Ready":
node_ready_status = condition.status
else:
continue
if node_kerneldeadlock_status != "False" or node_ready_status != "True":
notready_nodes.append(node)
status = False if notready_nodes else True
return status, notready_nodes


def process_nodes(watch_nodes, iteration, iter_track_time):
if watch_nodes:
watch_nodes_start_time = time.time()
watch_nodes_status, failed_nodes = monitor_nodes()
try:
watch_nodes_status, failed_nodes = cli.monitor_nodes()
except Exception as e:
logging.error("Caught error during node status" + str(e))
iter_track_time["watch_nodes"] = time.time() - watch_nodes_start_time
logging.info("Iteration %s: Node status: %s" % (iteration, watch_nodes_status))
else:
Expand All @@ -236,7 +152,7 @@ def process_nodes(watch_nodes, iteration, iter_track_time):
# Track the pods that were crashed/restarted during the sleep interval of an iteration
def namespace_sleep_tracker(namespace, pods_tracker, ignore_patterns):
crashed_restarted_pods = defaultdict(list)
all_pod_info_list = get_all_pod_info(namespace)
all_pod_info_list = cli.get_all_pod_info(namespace)
if all_pod_info_list is not None and len(all_pod_info_list) > 0:
for all_pod_info in all_pod_info_list:
for pod_info in all_pod_info.items:
Expand Down Expand Up @@ -292,7 +208,7 @@ def monitor_namespace(namespace, ignore_pattern=None):
notready_pods = set()
match = False
notready_containers = defaultdict(list)
all_pod_info_list = get_all_pod_info(namespace)
all_pod_info_list = cli.get_all_pod_info(namespace)
if all_pod_info_list is not None and len(all_pod_info_list) > 0:
for all_pod_info in all_pod_info_list:
for pod_info in all_pod_info.items:
Expand Down Expand Up @@ -339,6 +255,8 @@ def process_namespace(iteration, namespace, failed_pods_components, failed_pod_c

# Get cluster operators and return yaml
def get_cluster_operators():

# should update with kubernetes api
operators_status = runcommand.invoke("kubectl get co -o yaml --kubeconfig " + kubeconfig_path_global, cmd_timeout)
status_yaml = yaml.load(operators_status, Loader=yaml.FullLoader)
return status_yaml
Expand Down Expand Up @@ -377,34 +295,32 @@ def process_cluster_operator(distribution, watch_cluster_operators, iteration, i


# Check for NoSchedule taint in all the master nodes
def check_master_taint(master_nodes, master_label):
def check_master_taint():
schedulable_masters = []

for master_node in master_nodes:
node_info = get_node_info(master_node)
node = node_info.metadata.name
# updating to use get_nodes_infos(self) from krkn_lib
node_list_info = cli.get_nodes_infos()
for node_info in node_list_info:
node = node_info.name
NoSchedule_taint = False
try:
if node_info.spec is not None:
if node_info.spec.taints is not None:
for taint in node_info.spec.taints:
if taint.key == str(master_label) and taint.effect == "NoSchedule":
NoSchedule_taint = True
break
if not NoSchedule_taint:
schedulable_masters.append(node)
if node_info.taint is not None:
for taint in node_info.taint:
if node_info.node_type == "master" and taint.effect == "NoSchedule":
NoSchedule_taint = True
if not NoSchedule_taint:
schedulable_masters.append(node)
except Exception as e:
logging.info("Exception getting master nodes" + str(e))
schedulable_masters.append(node)
return schedulable_masters


def process_master_taint(master_nodes, master_label, iteration, iter_track_time):
def process_master_taint(watch_master_schedulable, iteration, iter_track_time):
schedulable_masters = []
if len(master_nodes) > 0:
if watch_master_schedulable:
if iteration % 10 == 1:
check_taint_start_time = time.time()
schedulable_masters = check_master_taint(master_nodes, master_label)
schedulable_masters = check_master_taint()
iter_track_time["check_master_taint"] = time.time() - check_taint_start_time
return schedulable_masters

Expand All @@ -418,6 +334,7 @@ def is_url_available(url, header=None):
else:
return True
except Exception:
logging.info(f"Url: {url} is not available")
return False


Expand Down Expand Up @@ -446,7 +363,7 @@ def get_csrs():

def get_host() -> str:
"""Returns the Kubernetes server URL"""
return client.configuration.Configuration.get_default_copy().host
return cli.get_host()


def get_clusterversion_string() -> str:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ slack_sdk
pyfiglet
prometheus_api_client
coverage
krkn-lib>=1.4.2
Loading

0 comments on commit 14bdbe6

Please sign in to comment.