-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathzabbix.py
175 lines (130 loc) · 6.84 KB
/
zabbix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# __author__ = 'maximus'
import logging
from pyzabbix import ZabbixAPI
import base64
log = logging.getLogger(__name__)
class ZbxException(Exception):
def __init__(self, message):
self.message = message
class ZabbixAgent(object):
def __init__(self, url, login, password):
self.url = url
self.login = login
self.password = password
self.zbx_api = ZabbixAPI(url=url, use_authenticate=False, user=login, password=password)
log.debug('Object ZabbixAgent created')
log.debug('API ver. %s', self.api_ver())
def get_item_data(self, hostname, item):
reply = self.zbx_api.host.get(filter={'name': hostname}, output='shorten')
if not reply:
raise ZbxException("hostname: {} not found".format(hostname))
hostid = reply[0]['hostid']
log.debug('hostID %s', hostid)
item_data = self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item}, output=['lastvalue'])
log.debug('itemID %s', hostid)
if not item_data:
raise ZbxException('item: {} not found'.format(item))
if len(item_data) > 1:
raise ZbxException('return items expected one item')
return int(item_data[0]['lastvalue'])
def get_item_data2(self, hostname, item_in, item_out):
reply = self.zbx_api.host.get(filter={'name': hostname}, output='shorten')
if not reply:
raise ZbxException("hostname: {} not found".format(hostname))
hostid = reply[0]['hostid']
log.debug('hostID %s', hostid)
if '+' in item_in:
item_in_split = list(map(str.strip, item_in.split('+')))
item_in_data = self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_in_split[0]}, output=['lastvalue'])
for item_in_elem in item_in_split[1:]:
item_in_data[0]['lastvalue'] = str(int(item_in_data[0]['lastvalue']) + int(self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_in_elem}, output=['lastvalue'])[0]['lastvalue']))
else:
item_in_data = self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_in}, output=['lastvalue'])
log.debug('itemID %s', hostid)
if not item_in_data:
raise ZbxException('item: {} not found'.format(item_in))
if '+' in item_out:
item_out_split = list(map(str.strip, item_out.split('+')))
item_out_data = self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_out_split[0]}, output=['lastvalue'])
for item_out_elem in item_out_split[1:]:
item_out_data[0]['lastvalue'] = str(int(item_out_data[0]['lastvalue']) + int(self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_out_elem}, output=['lastvalue'])[0]['lastvalue']))
else:
item_out_data = self.zbx_api.item.get(filter={'hostid': hostid, 'key_': item_out}, output=['lastvalue'])
log.debug('itemID %s', hostid)
if not item_in_data:
raise ZbxException('item: {} not found'.format(item_out))
if len(item_in_data) > 1:
raise ZbxException('return items expected one item: {}'.format(item_in_data))
if len(item_out_data) > 1:
raise ZbxException('return items expected one item: {}'.format(item_out_data))
return int(item_in_data[0]['lastvalue']), int(item_out_data[0]['lastvalue'])
def api_ver(self):
return self.zbx_api.api_version()
def scan_map(self, map_name):
map_data = self.zbx_api.map.get(filter={'name': map_name},
selectSelements=['elements', 'selementid', 'elementtype', 'iconid_off',
'x', 'y'],
selectLinks=['selementid1', 'selementid2', 'linkid']
)
if not map_data:
raise ZbxException('map: {} not found'.format(map_name))
if len(map_data) > 1:
raise ZbxException('return maps expected one map')
return map_data[0]
def scan_map_all(self):
maps_data = self.zbx_api.map.get(selectSelements=['elementid', 'selementid', 'elementtype', 'iconid_off',
'x', 'y'],
selectLinks=['selementid1', 'selementid2', 'linkid'])
if not maps_data:
raise ZbxException('maps not found')
if len(maps_data) > 1:
raise ZbxException('return maps expected one map')
return maps_data
def get_hostname(self, hostid):
hostname = self.zbx_api.host.get(hostids=hostid, output=['host'])
if not hostname:
raise ZbxException('hostname not found')
if len(hostname) > 1:
raise ZbxException('return hostnames expected one hostname')
return hostname[0]['host']
def get_mapname(self, mapid):
mapname = self.zbx_api.map.get(sysmapids=mapid, output=['name'])
if not mapname:
raise ZbxException('map name not found')
if len(mapname) > 1:
raise ZbxException('return map names expected one map name')
return mapname[0]['name']
def get_triggername(self, triggerid):
triggername = self.zbx_api.trigger.get(triggerid=triggerid, output=['description'])
if not triggername:
raise ZbxException('trigger name not found')
if len(triggername) > 1:
raise ZbxException('return trigger names expected one trigger name')
return triggername[0]['description']
def get_hostgroupname(self, groupid):
hostgroupname = self.zbx_api.hostgroup.get(groupids=groupid, output=['name'])
if not hostgroupname:
raise ZbxException('hostgroup name not found')
if len(hostgroupname) > 1:
raise ZbxException('return hostgroup names expected one hostgroup name')
return hostgroupname[0]['name']
def get_imagename(self, imageid):
imagename = self.zbx_api.image.get(imageids=imageid, output=['name'])
if not imagename:
raise ZbxException('image name not found')
if len(imagename) > 1:
raise ZbxException('return image names expected one image name')
return imagename[0]['name']
def image_to_zabbix(self, pathfn, zbx_img_name):
with open(pathfn, 'rb') as img:
b64img = base64.b64encode(img.read()).decode()
image_data = self.zbx_api.image.get(filter={'name': zbx_img_name})
if not image_data:
self.zbx_api.image.create(name=zbx_img_name, imagetype=2, image=b64img)
else:
self.zbx_api.image.update(imageid=image_data[0]['imageid'], image=b64img)
def image_get(self, imageid):
image_data = self.zbx_api.image.get(imageids=imageid, select_image=True)
return image_data[0]['image']