-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathkibana_test.py
104 lines (92 loc) · 2.64 KB
/
kibana_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import copy
import json
from elastalert.kibana import add_filter
from elastalert.kibana import dashboard_temp
from elastalert.kibana import filters_from_dashboard
from elastalert.kibana import kibana4_dashboard_link
# Dashboard schema with only filters section
test_dashboard = '''{
"title": "AD Lock Outs",
"services": {
"filter": {
"list": {
"0": {
"type": "time",
"field": "@timestamp",
"from": "now-7d",
"to": "now",
"mandate": "must",
"active": true,
"alias": "",
"id": 0
},
"1": {
"type": "field",
"field": "_log_type",
"query": "\\"active_directory\\"",
"mandate": "must",
"active": true,
"alias": "",
"id": 1
},
"2": {
"type": "querystring",
"query": "ad.security_auditing_code:4740",
"mandate": "must",
"active": true,
"alias": "",
"id": 2
}
},
"ids": [
0,
1,
2
]
}
}
}'''
test_dashboard = json.loads(test_dashboard)
def test_filters_from_dashboard():
filters = filters_from_dashboard(test_dashboard)
assert {'term': {'_log_type': '"active_directory"'}} in filters
assert {'query': {'query_string': {'query': 'ad.security_auditing_code:4740'}}} in filters
def test_add_filter():
basic_filter = {"term": {"this": "that"}}
db = copy.deepcopy(dashboard_temp)
add_filter(db, basic_filter)
assert db['services']['filter']['list']['1'] == {
'field': 'this',
'alias': '',
'mandate': 'must',
'active': True,
'query': '"that"',
'type': 'field',
'id': 1
}
list_filter = {"term": {"this": ["that", "those"]}}
db = copy.deepcopy(dashboard_temp)
add_filter(db, list_filter)
assert db['services']['filter']['list']['1'] == {
'field': 'this',
'alias': '',
'mandate': 'must',
'active': True,
'query': '("that" AND "those")',
'type': 'field',
'id': 1
}
def test_url_encoded():
url = kibana4_dashboard_link('example.com/#/Dashboard', '2015-01-01T00:00:00Z', '2017-01-01T00:00:00Z')
assert not any([special_char in url for special_char in ["',\":;?&=()"]])
def test_url_env_substitution(environ):
environ.update({
'KIBANA_HOST': 'kibana',
'KIBANA_PORT': '5601',
})
url = kibana4_dashboard_link(
'http://$KIBANA_HOST:$KIBANA_PORT/#/Dashboard',
'2015-01-01T00:00:00Z',
'2017-01-01T00:00:00Z',
)
assert url.startswith('http://kibana:5601/#/Dashboard')