forked from HariSekhon/Nagios-Plugins
-
Notifications
You must be signed in to change notification settings - Fork 0
/
check_consul_service_leader_elected.py
executable file
·157 lines (137 loc) · 5.95 KB
/
check_consul_service_leader_elected.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python
# vim:ts=4:sts=4:sw=4:et
#
# Author: Hari Sekhon
# Date: 2018-06-25 18:16:27 +0100 (Mon, 25 Jun 2018)
#
# https://github.com/harisekhon/nagios-plugins
#
# License: see accompanying Hari Sekhon LICENSE file
#
# If you're using my code you're welcome to connect with me on LinkedIn and optionally send me feedback
# to help improve or steer this or other code I publish
#
# https://www.linkedin.com/in/harisekhon
#
"""
Nagios Plugin to check a Consul service's leader election for a specific leader key in the KV store using API v1
Optional regex check applies to the elected leader to ensure one of the nodes we expect is actually what is elected as
the leader for the service
Tested on Consul 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import os
import re
import sys
import time
import traceback
libdir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'pylib'))
sys.path.append(libdir)
try:
# pylint: disable=wrong-import-position
from harisekhon.utils import isStr, isList, support_msg_api, validate_chars, validate_regex, log
from harisekhon.utils import CriticalError, UnknownError
from harisekhon.nagiosplugin import RestNagiosPlugin
except ImportError as _:
print(traceback.format_exc(), end='')
sys.exit(4)
__author__ = 'Hari Sekhon'
__version__ = '0.2'
class CheckConsulServiceLeaderElected(RestNagiosPlugin):
def __init__(self):
super(CheckConsulServiceLeaderElected, self).__init__()
self.name = 'Consul'
self.default_port = 8500
self.path = '/v1/kv/'
self.key = None
self.auth = False
self.regex = None
self.msg = 'Consul message not defined yet'
# closure factory
@staticmethod
def check_response_code(msg):
def tmp(req):
if req.status_code != 200:
err = ''
if req.content and isStr(req.content) and len(req.content.split('\n')) < 2:
err += ': ' + req.content
raise CriticalError("{0}: '{1}' {2}{3}".format(msg, req.status_code, req.reason, err))
return tmp
def add_options(self):
super(CheckConsulServiceLeaderElected, self).add_options()
self.add_opt('-k', '--key', help='Key to query for elected leader')
self.add_opt('-r', '--regex',
help="Regex to compare the service's elected leader's hostname value against" + \
"(optional - this will incur an extra query against the session info)")
def process_options(self):
super(CheckConsulServiceLeaderElected, self).process_options()
self.key = self.get_opt('key')
self.regex = self.get_opt('regex')
if not self.key:
self.usage('--key not defined')
self.key = self.key.lstrip('/')
validate_chars(self.key, 'key', r'\w\/-')
if self.regex:
validate_regex(self.regex, 'key')
self.path += '{}'.format(self.key)
def parse_consul_json(self, name, content):
json_data = None
try:
json_data = json.loads(content)
except ValueError:
raise UnknownError("non-json {} data returned by consul at {}:{}: '{}'. {}"\
.format(name, self.host, self.port, content, support_msg_api()))
if not json_data:
raise UnknownError("blank {} contents returned by consul at {}:{}! '{}'. {}"\
.format(name, self.host, self.port, content, support_msg_api()))
if not isList(json_data):
raise UnknownError('non-list {} returned by consul at {}:{} for session data. {}'\
.format(name, self.host, self.port, support_msg_api()))
return json_data
def extract_session(self, req):
json_data = self.parse_consul_json('key', req.content)
for item in json_data:
if 'Session' in item:
return item['Session']
return None
def validate_session(self, req, session):
json_data = self.parse_consul_json('session', req.content)
if len(json_data) > 1:
raise UnknownError("more than 1 session returned for session '{}'!".format(session))
session_data = json_data[0]
if 'ID' not in session_data:
raise UnknownError('no session ID found in session data!')
if session_data['ID'] != session:
raise UnknownError("session ID '{}'returned does not match requested session '{}' !!"\
.format(session_data['ID'], session))
if 'Node' not in session_data:
raise UnknownError('Node field not found in session data! {}'.format(support_msg_api()))
leader_host = session_data['Node']
self.msg += ", leader='{}'".format(leader_host)
if not re.match(self.regex, leader_host):
self.critical()
self.msg += " (doesn't match expected regex '{}')".format(self.regex)
def run(self):
self.request.check_response_code = \
self.check_response_code("failed to retrieve Consul key '{0}'".format(self.key))
start = time.time()
req = self.query()
session = self.extract_session(req)
if not session:
raise CriticalError('no leader found for service associated with key \'{}\' (no session)!'.format(self.key))
log.info("session = '%s'", session)
self.msg = 'Consul service leader found'
#if self.verbose:
# self.msg += " for service at key '{}'".format(self.key)
if self.regex:
self.path = '/v1/session/info/{}'.format(session)
req = self.query()
self.validate_session(req, session)
query_time = time.time() - start
self.msg += ' | query_time={}'.format(query_time)
if __name__ == '__main__':
CheckConsulServiceLeaderElected().main()