-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathk8s_resource_check
173 lines (131 loc) · 5.78 KB
/
k8s_resource_check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/usr/bin/python
__author__ = 'Ben Pingilley'
import argparse
import sys
import ssl
import urllib
from kubernetes import client, config
from kubernetes.client.rest import ApiException
# define and gather command line options
parser = argparse.ArgumentParser(description='Interface Nagios with Kubernetes API for Monitoring.')
# Load config file
config.load_kube_config('/etc/kubernetes/admin.conf')
parser.add_argument(
"-e",
"--errored",
dest="errored",
choices=[ 'deployments', 'ingress', 'pods', 'nodes' ],
help="List deployments, ingress resources, pods or nodes which are not running."
)
# Parse Arguments
options = parser.parse_args()
def ingress():
# Define empty string for ingress resources that are not running
erroredIngress = ""
# Create an instance of the Core V1 API class
core_v1_api_instance = client.CoreV1Api()
# Create an instance of the Extensions V1 API class
ext_v1_api_instance = client.ExtensionsV1beta1Api()
# GET /apis/extensions/v1beta1/ingresses
all_ingress = ext_v1_api_instance.list_ingress_for_all_namespaces()
# Iterate through each ingress
for ingress in all_ingress.items:
# Assign to variables
namespace = ingress.metadata.namespace
name = ingress.spec.rules[0].http.paths[0].backend.service_name
try:
# GET /api/v1/services
service = core_v1_api_instance.read_namespaced_service(name, namespace, pretty='true')
except:
service_code = 'Error reaching service %s.%s\n' % (namespace, name)
erroredIngress += service_code
continue
# Create URL with port
url = 'http://%s:%s' % (service.spec.cluster_ip, service.spec.ports[0].port)
# Hard coded authserver until a health check is added
if 'authserver' in name:
url += '/.well-known/openid-configuration'
try:
context = ssl._create_unverified_context() # Ignore invalid certs.
# Get status of ingress service IP
code = urllib.urlopen(url, context=context).getcode()
# If url is not reachable, add url and code to string which will be returned at end
if code != 200 and code != 403: # 403 means it works but was forbidden
url_code = '%s\t%s\n' % (url, code)
erroredIngress += url_code
except IOError as e:
url_code = 'Error reaching %s: %s\n' % (url, e)
erroredIngress += url_code
# If URLs have been added, return list and exit 2 (Critical)
if len(erroredIngress) > 0:
sys.stdout.write(erroredIngress)
sys.exit(2)
# Else return OK
else:
sys.stdout.write('OK - All ingress resources returned 200 or 403\n')
def pods():
# Define empty string for pods that are not running
erroredPods = ""
# Create an instance of the Core V1 API class
core_v1_api_instance = client.CoreV1Api()
# GET /api/v1/pods
all_pods = core_v1_api_instance.list_pod_for_all_namespaces(watch=False)
# Iterate through each pod
for pod in all_pods.items:
try:
# If pod is functioning, reason will not exist. Thus this variable assign will fail
erroredPod = "%s\t%s\t%s.%s\n" % (pod.status.phase, pod.status.container_statuses[0].state.waiting.reason, pod.metadata.namespace, pod.metadata.name)
erroredPods += erroredPod
except:
continue
# If pods have been added, return list and exit 2 (Critical)
if len(erroredPods) > 0:
sys.stdout.write(erroredPods)
sys.exit(2)
# Else return OK
else:
sys.stdout.write('OK - All pods are functioning without errors\n')
def nodes():
# Define empty string for pods that are not running
erroredNodes = ""
# Create an instance of the Core V1 API class
core_v1_api_instance = client.CoreV1Api()
# GET /api/v1/nodes
all_nodes = core_v1_api_instance.list_node()
# Iterate through each node
for node in all_nodes.items:
# KubeletReady means node is functioning without errors
if node.status.conditions[3].reason != 'KubeletReady':
erroredNode = "%s\t%s\n" % (node.status.conditions[3].reason, node.metadata.name)
erroredNodes += erroredNode
# If nodes have been added, return list and exit 2 (Critical)
if len(erroredNodes) > 0:
sys.stdout.write(erroredNodes)
sys.exit(2)
# Else return OK
else:
sys.stdout.write('OK - All nodes are functioning without errors\n')
def deployments():
# Define empty string for deployments that are stalled
stalledDeployments = ""
# Create an instance of the Apps V1 Beta API class
apps_v1_api_instance = client.AppsV1beta1Api()
# GET /apis/apps/v1beta1/deployments
all_deployments = apps_v1_api_instance.list_deployment_for_all_namespaces()
# Iterate through each deployment
for deployment in all_deployments.items:
# Anything other than None is not a healthy deployment
if deployment.status.unavailable_replicas != None:
stalledDeployment = "%s.%s\tUnavailable Replicas: %s\\%s\n" % (deployment.metadata.namespace, deployment.metadata.name, deployment.status.unavailable_replicas, deployment.status.replicas)
stalledDeployments += stalledDeployment
# If deployments have been added, return list and exit 2 (Critical)
if len(stalledDeployments) > 0:
sys.stdout.write(stalledDeployments)
sys.exit(2)
# Else return OK
else:
sys.stdout.write('OK - All deployments are healthy\n')
# This dictionary routes to the correct function based on user input of -e
routes = {'deployments': deployments, 'ingress': ingress, 'pods': pods, 'nodes': nodes}
# Call the routes dictionary
routes[options.errored]()