-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathnetlas_passive_scan.py
100 lines (77 loc) · 2.77 KB
/
netlas_passive_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import netlas
import time
import re
import json
import copy
import yaml
import sys
import argparse
outputDict = {}
api_key = 'YourAPIKey'
def createParser ():
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input', help='Path to input file')
parser.add_argument('-o', '--output', default='yaml', help='Output settings. Default: YAML in console. Custom: JSON in console ("json" argument).')
parser.add_argument('-k', '--key', default='None', help='Your Netlas.io API key')
return parser
def portToKey(data):
return str(data['port']) + '/' + str(data['prot4']) + ', '
def oneResponse(data):
tagList = []
cveList = []
responseDict = {}
try:
tags = data['tag']
for tag in tags:
tagName = tag['name']
tagList.append(tagName)
except:
pass
try:
cves = data['cve']
for cve in cves:
cveName = cve['name']
cveScore = cve['base_score']
cveSeverity = cve['severity']
cveList.append(cveName + ", Score: " + cveScore + ", Severity: " + cveSeverity)
except:
pass
if len(tagList) != 0:
responseDict["Tags"] = tagList
if len(cveList) != 0:
responseDict["CVEs"] = cveList
return responseDict
if __name__ == '__main__':
parser = createParser()
namespace = parser.parse_args(sys.argv[1:])
if namespace.key != 'None':
api_key = namespace.key
netlas_connection = netlas.Netlas(api_key=api_key)
res = 0
inputFileName = namespace.input
inputFile = open(inputFileName, "r")
outputType = namespace.output
while True:
line = inputFile.readline()
if not line:
break
sQuery = "host:" + line.replace("\n", "")
cnt_of_res = netlas_connection.count(query=sQuery, datatype='response')
if cnt_of_res['count'] != 0:
downloaded_query = netlas_connection.download(query=sQuery, datatype='response', size=cnt_of_res['count'])
domainDict = {}
localOutputDict = {}
for query_res in downloaded_query:
items = json.loads(query_res)
data = items['data']
uri = data['uri']
domainDict[portToKey(data) + uri] = oneResponse(data)
outputDict[line.replace("\n", "")] = copy.deepcopy(domainDict)
if outputType == "yaml":
localOutputDict[line.replace("\n", "")] = copy.deepcopy(domainDict)
print(yaml.dump(localOutputDict, sort_keys=False), flush=True)
elif outputType == "json":
localOutputDict[line.replace("\n", "")] = copy.deepcopy(domainDict)
print(json.dumps(localOutputDict), flush=True)
time.sleep(1)
inputFile.close()