-
Notifications
You must be signed in to change notification settings - Fork 0
/
skim_controller.py
executable file
·145 lines (130 loc) · 4.75 KB
/
skim_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
import skim_utils
#TODO: Get web server version and CVE check.
#wpscan etc
class SkimController:
'''
This is the Driver Class, it's instantiation in the main() drives execution of the application.
'''
def __init__(self):
'''
Controller Constructor - Set instance attributes here.
'''
try:
self.processes: int = 32
self.http_timeout: int = 60
self.staggering: int = 20
self.basepath:str = "/opt/skim/output/"
self.path_to_urls = "/opt/skim/master_list_external_domains.txt"
self.logfile: str = (self.basepath + "log.txt")
except Exception as e:
print("Error! in SkimController.constructor: " + str(e))
def clean_and_print_banner(self) -> bool:
'''
clean up and print the banner.
'''
try:
import toolbag
import skim_conf
utils = skim_utils.SkimUitls()
conf = skim_conf.Skim_conf()
tb = toolbag.Toolbag()
utils.remove_orphan_files(self.basepath)
tb.clear_screen()
conf.show_banner()
return True
except Exception as e:
print("Error! in SkimController.clean_and_print_banner: " + str(e))
def is_internet_available(self) -> bool:
'''
Check that internet connectivity is available.
'''
try:
utils = skim_utils.SkimUitls()
if utils.test_internet():
return True
else:
return False
except Exception as e:
print("Error! in SkimController.is_internet_available: " + str(e))
def parallelize(self, clean_master_list) -> bool:
'''
Generate a Pool of processes, then map the elements in the domain list to the director function
'''
try:
import multiprocessing
with multiprocessing.Pool(int(self.processes), maxtasksperchild=40) as pool:
pool.map(self.director, clean_master_list)
return True
except AssertionError as i:
print("Error!! in SkimController.parallelize multi processing map: " + str(i))
def director(self, url):
'''
Enforce staggering to avoid link flooding, then send to http requester.
'''
try:
import random
import time
import skim_requester
requester = skim_requester.SkimRequester()
wait = random.randint(0, int(self.staggering))
time.sleep(int(wait))
requester.send_request(url)
except Exception as e:
print("Error! in SkimController.director method: " + str(e))
def display_results(self) -> bool:
'''
Display the results and move the files into their own folder
'''
try:
import skim_cleaner
clean = skim_cleaner.SkimCleaner()
clean.print_counts()
clean.move_files("/opt/skim/output/")
return True
except Exception as e:
print("Error! in SkimController.display_results: " + str(e))
def does_everything_exist(self) -> bool:
'''
check dirs and files exists, if not create them.
'''
try:
util = skim_utils.SkimUitls()
print(str(self.basepath))
print(self.path_to_urls)
print(str(self.logfile))
base = util.check_dir_exists(self.basepath)
print(str(base))
urls = util.check_file_exists(self.path_to_urls)
print(str(urls))
log = util.check_file_exists(self.logfile)
print(str(log))
if not base or not urls or not log:
raise IOError
else:
return True
except IOError as i:
print("IO Error! - controller.does_everything_exist.Checking_files_creating_if_not_exists: " + str(i))
except Exception as e:
print("Error! - controller.does_everything_exist: " + str(e))
def main():
'''
Execution flow is controlled from here.
'''
controller = SkimController()
lint = skim_utils.SkimUitls().lint
if not controller.is_internet_available:
lint("\nError! - Check internet connectivity\n")
exit(1)
#check dirs and files exists, if not create them.
if not controller.does_everything_exist():
lint("\nIO Error! - File does not exist.")
exit(1)
else:
import skim_reader_io
reader = skim_reader_io.SkimReader().fetch_domain_list
controller.clean_and_print_banner()
list_of_domains = reader(controller.path_to_urls)
controller.parallelize(list_of_domains)
if __name__ == '__main__':
main()