diff --git a/netprobe.py b/netprobe.py index b5674de..a53d5bf 100644 --- a/netprobe.py +++ b/netprobe.py @@ -1,5 +1,6 @@ +#!/usr/bin/env python3 + import argparse -import socket from rich.console import Console from rich.table import Table from scapy.all import ARP, Ether, srp @@ -10,11 +11,14 @@ console = Console() def get_arguments(): - parser = argparse.ArgumentParser(description='Network Scanner Tool') - parser.add_argument("-t", "--target", metavar="", help="Target IP address or subnet (default: 192.168.1.0/24)", default="192.168.1.0/24") - parser.add_argument("-i", "--interface", metavar="", help="Interface to use (default: None)", default=None) + parser = argparse.ArgumentParser(description='NetProbe: Network Scanner Tool') + parser.add_argument("-t", "--target", metavar="", help="Target IP address or subnet (default: 192.168.1.0/24)", default="192.168.1.0/24", nargs='+', required=True) + parser.add_argument("-i", "--interface", metavar="", help="Interface to use (default: None)", default=None, nargs='+', required=True) parser.add_argument("-l", "--live", action="store_true", help="Enable live tracking of devices") parser.add_argument("-o", "--output", metavar="", help="Output file to save the results") + parser.add_argument("-m", "--manufacturer", metavar="", help="Filter by manufacturer (e.g., 'Apple')") + parser.add_argument("-r", "--ip-range", metavar="", help="Filter by IP range (e.g., '192.168.1.0/24')") + parser.add_argument("-s", "--scan-rate", metavar="", help="Scan rate in seconds (default: 5)", type=int, default=5) return parser.parse_args() def scan(target, iface): @@ -37,7 +41,8 @@ def scan(target, iface): for sent, received in result: mac_address = received.hwsrc manufacturer = get_device_info(mac_address) - devices.append({'ip': received.psrc, 'mac': mac_address, 'manufacturer': manufacturer}) + packet_size = len(sent) + len(received) + devices.append({'ip': received.psrc, 'mac': mac_address, 'manufacturer': manufacturer, 'packet_size': packet_size}) return devices @@ -54,30 +59,48 @@ def get_device_info(mac_address): return manufacturer +def apply_filters(devices, manufacturer_filter, ip_range_filter): + filtered_devices = [] + for device in devices: + if (not manufacturer_filter or device['manufacturer'].lower() == manufacturer_filter.lower()) and (not ip_range_filter or ip_in_range(device['ip'], ip_range_filter)): + filtered_devices.append(device) + return filtered_devices + +def ip_in_range(ip, ip_range): + try: + ip_network = ip_range.split('/')[0] + subnet_mask = int(ip_range.split('/')[1]) + return ipaddress.ip_address(ip) in ipaddress.ip_network(ip_network, strict=False) + except ValueError: + return False -def display_live_updates(target, iface, output_file=None): +def display_live_updates(targets, interfaces, output_file=None, manufacturer_filter=None, ip_range_filter=None, scan_rate=5): known_devices = [] while True: - devices = scan(target, iface) + for target in targets: + for iface in interfaces: + devices = scan(target, iface) - # Save newly discovered devices - new_devices = [d for d in devices if d not in known_devices] - if new_devices and output_file: - save_results(new_devices, output_file) - known_devices.extend(new_devices) + # Save newly discovered devices + new_devices = [d for d in devices if d not in known_devices] + if new_devices and output_file: + save_results(new_devices, output_file) + known_devices.extend(new_devices) - console.clear() - table = Table(show_header=True, header_style="bold magenta") - table.add_column("IP Address", style="bold red") - table.add_column("MAC Address", style="bold blue") - table.add_column("Manufacturer", style="bold yellow") + filtered_devices = apply_filters(devices, manufacturer_filter, ip_range_filter) - for device in devices: - table.add_row(device['ip'], device['mac'], device['manufacturer']) + console.clear() + table = Table(show_header=True, header_style="bold magenta") + table.add_column("IP Address", style="bold red") + table.add_column("MAC Address", style="bold blue") + table.add_column("Packet Size", style="bold green") + table.add_column("Manufacturer", style="bold yellow") - console.print(table) - time.sleep(5) + for device in filtered_devices: + table.add_row(device['ip'], device['mac'], str(device['packet_size']), device['manufacturer']) + console.print(table) + time.sleep(scan_rate) def save_results(devices, output_file): with open(output_file, 'a') as f: # 'a' for append mode @@ -85,35 +108,44 @@ def save_results(devices, output_file): f.write(f"IP Address: {device['ip']}\n") f.write(f"MAC Address: {device['mac']}\n") f.write(f"Manufacturer: {device['manufacturer']}\n") + f.write(f"Packet Size: {device['packet_size']} bytes\n") f.write("\n") - def main(): try: args = get_arguments() - target = args.target - iface = args.interface + targets = args.target + interfaces = args.interface live_tracking = args.live output_file = args.output + manufacturer_filter = args.manufacturer + ip_range_filter = args.ip_range + scan_rate = args.scan_rate - devices = scan(target, iface) + devices = [] + for target in targets: + for iface in interfaces: + devices.extend(scan(target, iface)) table = Table(show_header=True, header_style="bold magenta") table.add_column("IP Address", style="bold red") table.add_column("MAC Address", style="bold blue") + table.add_column("Packet Size", style="bold green") table.add_column("Manufacturer", style="bold yellow") - for device in devices: - table.add_row(device['ip'], device['mac'], device['manufacturer']) + filtered_devices = apply_filters(devices, manufacturer_filter, ip_range_filter) + + for device in filtered_devices: + table.add_row(device['ip'], device['mac'], str(device['packet_size']), device['manufacturer']) console.print(table) if output_file: - save_results(devices, output_file) + save_results(filtered_devices, output_file) console.print(f"\n[bold green]Results saved to: {output_file}") if live_tracking: - t = threading.Thread(target=display_live_updates, args=(target, iface, output_file)) + t = threading.Thread(target=display_live_updates, args=(targets, interfaces, output_file, manufacturer_filter, ip_range_filter, scan_rate)) t.daemon = True # This makes sure the thread will exit when the main program exits t.start() while t.is_alive(): # This loop will keep the main thread running while the display thread is running @@ -121,6 +153,5 @@ def main(): except KeyboardInterrupt: console.print("\n[bold red]Program interrupted by user. Exiting...") - if __name__ == "__main__": main()