-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtopo_discovery.py
114 lines (95 loc) · 3.61 KB
/
topo_discovery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from pydantic import BaseModel
import routeros_api
import argparse
import io
import os
class Neighbor(BaseModel):
id: str
address: str
cost: str
hops: str
path: str
l2mtu: str
identity: str
version: str
board: str
def get_romon_discovery(router_ip, router_user, router_pass, router_port, insecure='no'):
if insecure == 'yes':
connection = routeros_api.RouterOsApiPool(router_ip, username=router_user, port=int(router_port),
password=router_pass, use_ssl=False, ssl_verify=False,
plaintext_login=True)
else:
connection = routeros_api.RouterOsApiPool(router_ip, username=router_user, port=int(router_port),
password=router_pass, use_ssl=True, ssl_verify=False,
plaintext_login=True)
api = connection.get_api()
resp = api.get_binary_resource('/').call('tool/romon/discover', {'duration': '20'})
devices = []
for l in resp:
# print(l['path'], type(l['path']))
path = l['path'].decode('utf-8').replace(',', '===')
# print(path, type(path))
neighbor = Neighbor(
id=l['address'],
# flags=l['flags'],
address=l['address'],
cost=l['cost'],
hops=l['hops'],
path=path,
l2mtu=l['l2mtu'],
identity=l['identity'].decode('utf-8', errors='ignore'),
version=l['version'],
board=l['board']
)
devices.append(neighbor)
resp = api.get_binary_resource('/').call('system/identity/print')
root_identity = resp[0]['name']
root_neighbor = Neighbor(
id="00:00:00:00:00:00",
address="00:00:00:00:00:00",
cost="0",
hops="0",
path="",
l2mtu="0",
identity=root_identity,
version="6",
board="RouterBoard"
)
devices.append(root_neighbor)
connection.disconnect()
return unique_neighbors(devices)
def unique_neighbors(neighbors):
unique = []
unique_neighbors = []
for n in neighbors:
if n.address not in unique:
unique.append(n.address)
unique_neighbors.append(n)
return unique_neighbors
def create_csv_file(neighbors, filename):
with open(filename, 'w') as f:
f.write('address,identity,board,version,cost,hops,path,l2mtu\n')
for n in neighbors:
f.write(f'{n.address},{n.identity},{n.board},{n.version},{n.cost},{n.hops},{n.path},{n.l2mtu}\n')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--isp_name', type=str, required=True)
parser.add_argument('--router_ip', type=str, required=True)
parser.add_argument('--router_port', type=str, required=True, default='8729')
parser.add_argument('--router_user', type=str, required=True, default='admin')
parser.add_argument('--router_pass', type=str, required=True, default='')
parser.add_argument('--insecure', type=str, required=False, default='no')
args = parser.parse_args()
ISP_NAME = args.isp_name
router_ip = args.router_ip
router_user = args.router_user
router_pass = args.router_pass
router_port = args.router_port
insecure = args.insecure
neighbors = get_romon_discovery(router_ip, router_user, router_pass, router_port, insecure)
# Create 'topos' directory if it doesn't exist
if not os.path.exists('topos'):
os.makedirs('topos')
create_csv_file(neighbors, f'topos/{ISP_NAME}.csv')
if __name__ == '__main__':
main()