-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
146 lines (119 loc) · 5.07 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from concurrent.futures.thread import ThreadPoolExecutor
import socket
import sys
import time
import argparse
import concurrent.futures
class PScan:
def __init__(self):
self.portlist_raw_string = ""
self.remote_host = ""
self.remote_host_fqdn = ""
self.remote_ip = ""
self.number_of_open_ports = 0
def get_ports(self, portlist_raw_string):
"""
Take a string with ports and splits into single port, then calculates the range of ports to check
"""
range_min_max = []
inflated_port_list = []
portlist_raw_list = portlist_raw_string.split(',')
# for every port number in the list
for port in portlist_raw_list:
if port != '':
# if the dash symbol is present, it's a range
if (port.find("-") != -1):
# adding the range of ports in a list
range_min_max.append(port.split('-'))
else:
# if the port doesn't contain letters
if port.isdigit():
port = int(port)
if port >= 0 and port <= 65535:
inflated_port_list.append(port)
# for every range to create
for range_to_create in range_min_max:
min, max = int(range_to_create[0]), int(range_to_create[1])
for port in range(min, max+1):
if port >= 0 and port <= 65535:
inflated_port_list.append(port)
# remove duplicates and sort the list
inflated_port_list = sorted(set(inflated_port_list))
return inflated_port_list
def scan_port(self, remote_host, port):
"""
New function to scan ports
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
# if port is open
if not sock.connect_ex((remote_host, port)):
try:
# get the service name for the port
serviceName = socket.getservbyport(port, "tcp")
except:
serviceName = ""
sock.close()
self.number_of_open_ports += 1
print(port, "\t", serviceName)
def scan_host(self, remote_host, ports_to_scan):
"""
Scans a host to check if the given ports are open
"""
# trying to obtain the ip address
try:
ip = socket.gethostbyname(remote_host)
except:
print("Error: ip invalid or can't resolve the host name in IP address!")
sys.exit()
# trying to obtain the FQDN
try:
fqdn = socket.getfqdn(remote_host)
except:
fqdn = remote_host
print("Starting port scan of host: {} ({})".format(remote_host, fqdn))
# this is to get the execution time
startTime = time.time()
# using multithreading to scan multiple ports simultaneously
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
{executor.submit(self.scan_port, remote_host, port)
: port for port in ports_to_scan}
# i wait for all the threads to complete
executor.shutdown(wait=True)
# calculating execution time
executionTime = round((time.time() - startTime), 2)
# printing some info
print("\nScan finished in {} seconds".format(executionTime))
print("Found {} open ports!".format(self.number_of_open_ports))
def initialize(self):
self.ports_to_scan = self.get_ports(self.portlist_raw_string)
if len(self.ports_to_scan):
self.scan_host(self.remote_host, self.ports_to_scan)
def parse_args(self):
parser_usage = '''main.py -p 21 192.168.1.1
main.py -p 21,80-90 192.168.1.1
main.py --port 21 192.168.1.1
main.py --port 21,80-90 192.168.1.1'''
parser = argparse.ArgumentParser(
description="A simple port scanner tool", usage=parser_usage)
parser.add_argument(
"ipaddress", help="The IP address you want to scan")
parser.add_argument(
"-p", "--port", help="A list of ports to scan", required=True, dest="ports_to_scan", action="store")
parser.add_argument(
"-t", "--timeout", help="Timeout to check if port is open (Default 0.1)", required=False, dest="timeout", action="store", default=0.1)
parser.add_argument(
"-w", "--workers", help="Maximum number of workers to use for multithreading (Default 1000)", required=False, dest="max_workers", action="store", default=1000)
# printing help if no argument given
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
arguments = parser.parse_args()
self.remote_host = arguments.ipaddress
self.portlist_raw_string = arguments.ports_to_scan
self.timeout = arguments.timeout
self.max_workers = int(arguments.max_workers)
if __name__ == '__main__':
pscan = PScan()
pscan.parse_args()
pscan.initialize()