-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathedl_functions.py
265 lines (240 loc) · 14.3 KB
/
edl_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
from __init__ import *
import base64
import http.client
import requests
import ssl
import re
import xml.etree.ElementTree as ET
# Сheck_01: Validate json
# Import input variables
def check_json(file):
try:
with open(file) as input_json:
input = json.load(input_json)
firewall_connection = input["firewall_connection"]
dynamic_lists = input["dynamic_lists"]
logger.success(f"Input file \"{file}\" read successfully!")
return firewall_connection, dynamic_lists
except ValueError:
logger.critical(f"Error reading \"{file}\", check syntax!")
quit()
# Сheck_02: Validate API Restconf port
# Import API Restconf port in int
def check_api_port(not_valid_api_port):
try:
api_port = int(not_valid_api_port)
logger.success(f"Firewall API Restconf port {not_valid_api_port} is correct!")
return api_port
except ValueError:
logger.critical(f"Firewall API Restconf port {not_valid_api_port} is incorrect!")
quit()
# Сheck_03: Validate API Restconf token and checking firewall connecting
def fw_get_token(username, password, fw_address, api_port):
api_token = f"{username}:{password}"
api_token = base64.b64encode(bytes(api_token.encode(encoding="UTF-8"))).decode()
answer = fw_send_request(api_token,
fw_address,
api_port,
"GET",
"/restconf/data/huawei-device:device-state",
None)
if answer:
status, reason, reply = answer
if status == 200 or status == 201:
logger.success(f"Connection to the firewall:\"{fw_address}\" was successful")
return api_token
else:
logger.critical("Connection to the firewall was not successful")
logger.critical(reason)
quit()
else:
quit()
# Connecting to firewall Huawei USG
def fw_send_request(api_token, fw_address, fw_api_port, method, url, body):
headers = {
"Cache-Control": "no-cache,no-store",
"Connection": "Keep-Alive",
"Accept": "application/yang-data+xml",
"Content-Type": "application/yang-data+xml",
"Authorization": "Basic " + api_token
}
# context = ssl._create_default_https_context() #security version tlsv1 tlsv1.1 tlsv1.2
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) # sercurity version tlsv1.2
context.set_ciphers(RESTRICTED_SERVER_CIPHERS)
httpClient = http.client.HTTPSConnection(fw_address, fw_api_port, timeout=600, context=context)
try:
httpClient.request(method, url, body, headers)
response = httpClient.getresponse()
status = response.status
reason = response.reason
reply = response.read()
httpClient.close()
logger.debug(f"Firewall connection successfully established!")
return status, reason, reply
except Exception as e:
httpClient.close()
logger.critical(e)
return 0
# Create new Dynamic list group
def fw_create_new_edl(name, api_token, fw_address, fw_api_port, fw_vsys):
answer_create_group = fw_send_request(api_token,
fw_address,
fw_api_port,
"PUT",
f"/restconf/data/huawei-address-set:address-set/addr-group={fw_vsys},{name}",
None)
if answer_create_group:
logger.success(f"Dynamic list group \"{name}\" has been successfully created on firewall")
else:
logger.error(f"Dynamic list group \"{name}\" was not created in the firewall!")
# Delete old object from group-address-objects and from Firewall
def fw_erase_old_edl(name, check, api_token, fw_address, fw_api_port, fw_vsys):
xml_str = ET.fromstring(check)
for i in xml_str:
# Delete old object from group-address-objects
answer_delete_object_from_object = fw_send_request(api_token,
fw_address,
fw_api_port,
"DELETE",
f"/restconf/data/huawei-address-set:address-set/addr-group={fw_vsys},{name}/elements={i[0].text}",
None)
if answer_delete_object_from_object:
logger.success(f"Sublist \"{i[1].text}\" detached from dynamic list group \"{name}\" successfully!")
else:
logger.error(f"Sublist \"{i[1].text}\" was not detached from dynamic list group \"{name}\"!")
# Delete old object from Firewall
answer_delete_object = fw_send_request(api_token,
fw_address,
fw_api_port,
"DELETE",
f"/restconf/data/huawei-address-set:address-set/addr-object={fw_vsys},{i[1].text}",
None)
if answer_delete_object:
logger.success(f"Sublist \"{i[1].text}\" deleted successfully!")
else:
logger.error(f"Sublist \"{i[1].text}\" was not deleted!")
# Get txt Dynamic List
def get_edl(name, url):
try:
response = requests.get(url)
if response.status_code == 200:
logger.success(f"Connection to the Dynamic List \"{name}\" was successfully")
return list(filter(None, response.text.split("\n")))
else:
logger.error(f"Connection to the Dynamic List \"{name}\" was not successful, returned code - {response.status_code}")
return 0
except Exception as e:
logger.error(f"Connection to the Dynamic List \"{name}\" was not successful, check Dynamic List address \"{url}\"")
logger.error(e)
return 0
# Check and delete address list from Firewall
def fw_check_obj(name, api_token, fw_address, fw_api_port, fw_vsys):
logger.info(f"Getting information about sublist \"{name}\"...")
answer = fw_send_request(api_token,
fw_address,
fw_api_port,
"GET",
f"/restconf/data/huawei-address-set:address-set/addr-object={fw_vsys},{name}",
None)
if answer:
status, reason, reply = answer
# object is exists
if status == 200 and reply.decode("UTF-8").find("<data>") != -1:
logger.info(f"Sublist \"{name}\" is exists")
# Delete address list from Firewall
answer_delete_object = fw_send_request(api_token,
fw_address,
fw_api_port,
"DELETE",
f"/restconf/data/huawei-address-set:address-set/addr-object={fw_vsys},{name}",
None)
if answer_delete_object:
logger.success(f"Sublist \"{name}\" deleted successfully")
else:
logger.error(f"Sublist \"{name}\" was not deleted!")
# object not exists
else:
logger.info(f"Sublist \"{name}\" does not exist")
else:
logger.error(f"Sublist deletion problem: \"{name}\"")
# Deploy
def fw_deploy_edl(edl, name, api_token, fw_address, fw_api_port, fw_vsys):
# Deploy new sublists and deploy new sublists to group object
group_body = ""
for record in edl:
# First border address
if edl.index(record) % RANGE_LIST == 0 and edl.index(record) == 0:
body = "<addr-object>"
try:
if ipaddress.ip_network(record).version == 4:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv4>{ipaddress.ip_network(record)}</address-ipv4></elements>"
elif ipaddress.ip_network(record).version == 6:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv6>{ipaddress.ip_network(record).exploded}</address-ipv6></elements>"
except ValueError:
logger.error(f"Dynamic list \"{name}\" contains invalid IP address: \"{record}\" with index \"{edl.index(record)}\"")
# Border address
elif edl.index(record) % RANGE_LIST == 0:
# Delete old sublist
fw_check_obj(f"{name}_{edl.index(record) // RANGE_LIST - 1}", api_token, fw_address, fw_api_port, fw_vsys)
# Deploy new sublist
body += "</addr-object>"
logger.info(f"Deploy addr-object \"{name}_{edl.index(record) // RANGE_LIST - 1}\"")
created_obj_status, created_obj_reason, created_obj_reply = fw_send_request(api_token,
fw_address,
fw_api_port,
"POST",
f"/restconf/data/huawei-address-set:address-set/addr-object={fw_vsys},{name}_{edl.index(record) // RANGE_LIST - 1}",
body)
if created_obj_status == 201:
logger.success(f"Deploy addr-object \"{name}_{edl.index(record) // RANGE_LIST - 1}\" was successful!")
else:
logger.error(f"Deploy addr-object \"{name}_{edl.index(record) // RANGE_LIST - 1}\" was not successful!")
group_body += f"<elements><elem-id>{edl.index(record) // RANGE_LIST - 1}</elem-id><addrset-name>{name}_{edl.index(record) // RANGE_LIST - 1}</addrset-name></elements>"
body = "<addr-object>"
try:
if ipaddress.ip_network(record).version == 4:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv4>{ipaddress.ip_network(record)}</address-ipv4></elements>"
elif ipaddress.ip_network(record).version == 6:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv6>{ipaddress.ip_network(record).exploded}</address-ipv6></elements>"
except ValueError:
logger.error(f"Dynamic list \"{name}\" contains invalid IP address: \"{record}\" with index \"{edl.index(record)}\"")
# Last address
elif edl.index(record) == len(edl)-1:
try:
if ipaddress.ip_network(record).version == 4:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv4>{ipaddress.ip_network(record)}</address-ipv4></elements>"
elif ipaddress.ip_network(record).version == 6:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv6>{ipaddress.ip_network(record).exploded}</address-ipv6></elements>"
except ValueError:
logger.error(f"Dynamic list \"{name}\" contains invalid IP address: \"{record}\" with index \"{edl.index(record)}\"")
# Delete old sublist
fw_check_obj(f"{name}_{edl.index(record) // RANGE_LIST}", api_token, fw_address, fw_api_port, fw_vsys)
# Deploy new sublist
body += "</addr-object>"
logger.info(f"Deploy addr-object \"{name}_{edl.index(record) // RANGE_LIST}\"")
created_obj_status, created_obj_reason, created_obj_reply = fw_send_request(api_token,
fw_address,
fw_api_port,
"POST",
f"/restconf/data/huawei-address-set:address-set/addr-object={fw_vsys},{name}_{edl.index(record) // RANGE_LIST}",
body)
# Attach new sublists to grouplist
group_body += f"<elements><elem-id>{edl.index(record) // RANGE_LIST}</elem-id><addrset-name>{name}_{edl.index(record) // RANGE_LIST}</addrset-name></elements></addr-group>"
logger.info(f"Attaching sublists to grouplist")
time_now = strftime("%H:%M %d.%m.%Y", time.localtime())
group_body = f"<addr-group><vsys>{fw_vsys}</vsys><name>{name}</name><desc>changed at {time_now}</desc>" + group_body
answer_modify_desk = fw_send_request(api_token,
fw_address,
fw_api_port,
"PATCH",
f"/restconf/data/huawei-address-set:address-set/addr-group={fw_vsys},{name}",
group_body)
# Other adress
else:
try:
if ipaddress.ip_network(record).version == 4:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv4>{ipaddress.ip_network(record)}</address-ipv4></elements>"
elif ipaddress.ip_network(record).version == 6:
body += f"<elements><elem-id>{edl.index(record)%RANGE_LIST}</elem-id><address-ipv6>{ipaddress.ip_network(record).exploded}</address-ipv6></elements>"
except ValueError:
logger.error(f"Dynamic list \"{name}\" contains invalid IP address: \"{record}\" with index \"{edl.index(record)}\"")