From 13997435b308ca27b86f98cb65b7e46814139e91 Mon Sep 17 00:00:00 2001 From: github-actions Date: Sat, 16 Nov 2024 23:51:34 -0500 Subject: [PATCH] add nmap xml output --- bbot/cli.py | 14 +- bbot/modules/base.py | 2 +- bbot/modules/internal/speculate.py | 2 +- bbot/modules/output/nmap_xml.py | 161 ++++++++++++++++++ bbot/modules/output/stdout.py | 2 +- bbot/scanner/preset/args.py | 18 +- bbot/test/test_step_1/test_modules_basic.py | 20 +-- .../module_tests/test_module_nmap_xml.py | 85 +++++++++ 8 files changed, 282 insertions(+), 22 deletions(-) create mode 100644 bbot/modules/output/nmap_xml.py create mode 100644 bbot/test/test_step_2/module_tests/test_module_nmap_xml.py diff --git a/bbot/cli.py b/bbot/cli.py index 877f2bcaa..4ffd3398b 100755 --- a/bbot/cli.py +++ b/bbot/cli.py @@ -80,7 +80,7 @@ async def _main(): return # if we're listing modules or their options - if options.list_modules or options.list_module_options: + if options.list_modules or options.list_output_modules or options.list_module_options: # if no modules or flags are specified, enable everything if not (options.modules or options.output_modules or options.flags): @@ -99,7 +99,17 @@ async def _main(): print("") print("### MODULES ###") print("") - for row in preset.module_loader.modules_table(preset.modules).splitlines(): + modules = sorted(set(preset.scan_modules + preset.internal_modules)) + for row in preset.module_loader.modules_table(modules).splitlines(): + print(row) + return + + # --list-output-modules + if options.list_output_modules: + print("") + print("### OUTPUT MODULES ###") + print("") + for row in preset.module_loader.modules_table(preset.output_modules).splitlines(): print(row) return diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 956d59c98..93593b9a3 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -51,7 +51,7 @@ class BaseModule: target_only (bool): Accept only the initial target event(s). Default is False. - in_scope_only (bool): Accept only explicitly in-scope events. Default is False. + in_scope_only (bool): Accept only explicitly in-scope events, regardless of the scan's search distance. Default is False. options (Dict): Customizable options for the module, e.g., {"api_key": ""}. Empty dict by default. diff --git a/bbot/modules/internal/speculate.py b/bbot/modules/internal/speculate.py index 0664f954a..260a85aa5 100644 --- a/bbot/modules/internal/speculate.py +++ b/bbot/modules/internal/speculate.py @@ -149,7 +149,7 @@ async def handle_event(self, event): # don't act on unresolved DNS_NAMEs usable_dns = False if event.type == "DNS_NAME": - if self.dns_disable or ("a-record" in event.tags or "aaaa-record" in event.tags): + if self.dns_disable or event.resolved_hosts: usable_dns = True if event.type == "IP_ADDRESS" or usable_dns: diff --git a/bbot/modules/output/nmap_xml.py b/bbot/modules/output/nmap_xml.py new file mode 100644 index 000000000..5279a5a3c --- /dev/null +++ b/bbot/modules/output/nmap_xml.py @@ -0,0 +1,161 @@ +import sys +from xml.dom import minidom +from datetime import datetime +from xml.etree.ElementTree import Element, SubElement, tostring + +from bbot import __version__ +from bbot.modules.output.base import BaseOutputModule + + +class NmapHost: + __slots__ = ["hostnames", "open_ports"] + + def __init__(self): + self.hostnames = set() + # a dict of {port: {protocol: banner}} + self.open_ports = dict() + + +class Nmap_XML(BaseOutputModule): + watched_events = ["OPEN_TCP_PORT", "DNS_NAME", "IP_ADDRESS", "PROTOCOL"] + meta = {"description": "Output to Nmap XML", "created_date": "2024-11-16", "author": "@TheTechromancer"} + output_filename = "output.nmap.xml" + in_scope_only = True + + async def setup(self): + self.hosts = {} + self._prep_output_dir(self.output_filename) + return True + + async def handle_event(self, event): + event_host = event.host + + # we always record by IP + ips = [] + for ip in event.resolved_hosts: + try: + ips.append(self.helpers.make_ip_type(ip)) + except ValueError: + continue + if not ips and self.helpers.is_ip(event_host): + ips = [event_host] + + for ip in ips: + try: + nmap_host = self.hosts[ip] + except KeyError: + nmap_host = NmapHost() + self.hosts[ip] = nmap_host + + event_port = getattr(event, "port", None) + if event.type == "OPEN_TCP_PORT": + if event_port not in nmap_host.open_ports: + nmap_host.open_ports[event.port] = {} + elif event.type == "PROTOCOL": + if event_port is not None: + try: + existing_services = nmap_host.open_ports[event.port] + except KeyError: + existing_services = {} + nmap_host.open_ports[event.port] = existing_services + protocol = event.data["protocol"].lower() + if protocol not in existing_services: + existing_services[protocol] = event.data.get("banner", None) + + if self.helpers.is_ip(event_host): + if str(event.module) == "PTR": + nmap_host.hostnames.add(event.parent.data) + else: + nmap_host.hostnames.add(event_host) + + async def report(self): + scan_start_time = str(int(self.scan.start_time.timestamp())) + scan_start_time_str = self.scan.start_time.strftime("%a %b %d %H:%M:%S %Y") + scan_end_time = datetime.now() + scan_end_time_str = scan_end_time.strftime("%a %b %d %H:%M:%S %Y") + scan_end_time_timestamp = str(scan_end_time.timestamp()) + scan_duration = scan_end_time - self.scan.start_time + num_hosts_up = len(self.hosts) + + # Create the root element + nmaprun = Element( + "nmaprun", + { + "scanner": "bbot", + "args": " ".join(sys.argv), + "start": scan_start_time, + "startstr": scan_start_time_str, + "version": str(__version__), + "xmloutputversion": "1.05", + }, + ) + + ports_scanned = [] + speculate_module = self.scan.modules.get("speculate", None) + if speculate_module is not None: + ports_scanned = speculate_module.ports + portscan_module = self.scan.modules.get("portscan", None) + if portscan_module is not None: + ports_scanned = self.helpers.parse_port_string(str(portscan_module.ports)) + num_ports_scanned = len(sorted(ports_scanned)) + ports_scanned = ",".join(str(x) for x in sorted(ports_scanned)) + + # Add scaninfo + SubElement( + nmaprun, + "scaninfo", + {"type": "syn", "protocol": "tcp", "numservices": str(num_ports_scanned), "services": ports_scanned}, + ) + + # Add host information + for ip, nmap_host in self.hosts.items(): + hostnames = sorted(nmap_host.hostnames) + ports = sorted(nmap_host.open_ports) + + host_elem = SubElement(nmaprun, "host") + SubElement(host_elem, "status", {"state": "up", "reason": "user-set", "reason_ttl": "0"}) + SubElement(host_elem, "address", {"addr": str(ip), "addrtype": f"ipv{ip.version}"}) + + if hostnames: + hostnames_elem = SubElement(host_elem, "hostnames") + for hostname in hostnames: + SubElement(hostnames_elem, "hostname", {"name": hostname, "type": "user"}) + + ports = SubElement(host_elem, "ports") + for port, protocols in nmap_host.open_ports.items(): + port_elem = SubElement(ports, "port", {"protocol": "tcp", "portid": str(port)}) + SubElement(port_elem, "state", {"state": "open", "reason": "syn-ack", "reason_ttl": "0"}) + # + for protocol, banner in protocols.items(): + attrs = {"name": protocol, "method": "probed", "conf": "10"} + if banner is not None: + attrs["product"] = banner + attrs["extrainfo"] = banner + SubElement(port_elem, "service", attrs) + + # Add runstats + runstats = SubElement(nmaprun, "runstats") + SubElement( + runstats, + "finished", + { + "time": scan_end_time_timestamp, + "timestr": scan_end_time_str, + "summary": f"BBOT done at {scan_end_time_str}; {num_hosts_up} scanned in {scan_duration} seconds", + "elapsed": str(scan_duration.total_seconds()), + "exit": "success", + }, + ) + SubElement(runstats, "hosts", {"up": str(num_hosts_up), "down": "0", "total": str(num_hosts_up)}) + + # make backup of the file + self.helpers.backup_file(self.output_file) + + # Pretty-format the XML + rough_string = tostring(nmaprun, encoding="utf-8") + reparsed = minidom.parseString(rough_string) + pretty_xml = reparsed.toprettyxml(indent=" ") + + with open(self.output_file, "w") as f: + f.write(pretty_xml) + self.info(f"Saved Nmap XML output to {self.output_file}") diff --git a/bbot/modules/output/stdout.py b/bbot/modules/output/stdout.py index 6e4ccf5be..33ffad5da 100644 --- a/bbot/modules/output/stdout.py +++ b/bbot/modules/output/stdout.py @@ -6,7 +6,7 @@ class Stdout(BaseOutputModule): watched_events = ["*"] - meta = {"description": "Output to text"} + meta = {"description": "Output to text", "created_date": "2024-04-03", "author": "@TheTechromancer"} options = {"format": "text", "event_types": [], "event_fields": [], "in_scope_only": False, "accept_dupes": True} options_desc = { "format": "Which text format to display, choices: text,json", diff --git a/bbot/scanner/preset/args.py b/bbot/scanner/preset/args.py index 986fd909f..7c10e35da 100644 --- a/bbot/scanner/preset/args.py +++ b/bbot/scanner/preset/args.py @@ -53,6 +53,11 @@ class BBOTArgs: "", "bbot -l", ), + ( + "List output modules", + "", + "bbot -lo", + ), ( "List presets", "", @@ -278,12 +283,6 @@ def create_parser(self, *args, **kwargs): ) output = p.add_argument_group(title="Output") - output.add_argument( - "-o", - "--output-dir", - help="Directory to output scan results", - metavar="DIR", - ) output.add_argument( "-om", "--output-modules", @@ -292,6 +291,13 @@ def create_parser(self, *args, **kwargs): help=f'Output module(s). Choices: {",".join(self.preset.module_loader.output_module_choices)}', metavar="MODULE", ) + output.add_argument("-lo", "--list-output-modules", action="store_true", help="List available output modules") + output.add_argument( + "-o", + "--output-dir", + help="Directory to output scan results", + metavar="DIR", + ) output.add_argument("--json", "-j", action="store_true", help="Output scan data in JSON format") output.add_argument("--brief", "-br", action="store_true", help="Output only the data itself") output.add_argument("--event-types", nargs="+", default=[], help="Choose which event types to display") diff --git a/bbot/test/test_step_1/test_modules_basic.py b/bbot/test/test_step_1/test_modules_basic.py index 3051f8de1..4c793e993 100644 --- a/bbot/test/test_step_1/test_modules_basic.py +++ b/bbot/test/test_step_1/test_modules_basic.py @@ -159,17 +159,15 @@ async def test_modules_basic_checks(events, httpx_mock): assert not ( "web-basic" in flags and "web-thorough" in flags ), f'module "{module_name}" should have either "web-basic" or "web-thorough" flags, not both' - meta = preloaded.get("meta", {}) - # make sure every module has a description - assert meta.get("description", ""), f"{module_name} must have a description" - # make sure every module has an author - assert meta.get("author", ""), f"{module_name} must have an author" - # make sure every module has a created date - created_date = meta.get("created_date", "") - assert created_date, f"{module_name} must have a created date" - assert created_date_regex.match( - created_date - ), f"{module_name}'s created_date must match the format YYYY-MM-DD" + meta = preloaded.get("meta", {}) + # make sure every module has a description + assert meta.get("description", ""), f"{module_name} must have a description" + # make sure every module has an author + assert meta.get("author", ""), f"{module_name} must have an author" + # make sure every module has a created date + created_date = meta.get("created_date", "") + assert created_date, f"{module_name} must have a created date" + assert created_date_regex.match(created_date), f"{module_name}'s created_date must match the format YYYY-MM-DD" # attribute checks watched_events = preloaded.get("watched_events") diff --git a/bbot/test/test_step_2/module_tests/test_module_nmap_xml.py b/bbot/test/test_step_2/module_tests/test_module_nmap_xml.py new file mode 100644 index 000000000..b88595be0 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_nmap_xml.py @@ -0,0 +1,85 @@ +import xml.etree.ElementTree as ET + +from bbot.modules.base import BaseModule +from .base import ModuleTestBase + + +class TestNmap_XML(ModuleTestBase): + modules_overrides = ["nmap_xml", "speculate"] + targets = ["blacklanternsecurity.com", "127.0.0.3"] + config_overrides = {"dns": {"minimal": False}} + + class DummyModule(BaseModule): + watched_events = ["OPEN_TCP_PORT"] + _name = "dummy_module" + + async def handle_event(self, event): + if event.port == 80: + await self.emit_event( + {"host": str(event.host), "port": event.port, "protocol": "http", "banner": "Apache"}, + "PROTOCOL", + parent=event, + ) + elif event.port == 443: + await self.emit_event( + {"host": str(event.host), "port": event.port, "protocol": "https"}, "PROTOCOL", parent=event + ) + + async def setup_before_prep(self, module_test): + self.dummy_module = self.DummyModule(module_test.scan) + module_test.scan.modules["dummy_module"] = self.dummy_module + await module_test.mock_dns( + { + "blacklanternsecurity.com": {"A": ["127.0.0.1", "127.0.0.2"]}, + "3.0.0.127.in-addr.arpa": {"PTR": ["www.blacklanternsecurity.com"]}, + "www.blacklanternsecurity.com": {"A": ["127.0.0.1"]}, + } + ) + + def check(self, module_test, events): + nmap_xml_file = module_test.scan.modules["nmap_xml"].output_file + nmap_xml = open(nmap_xml_file).read() + + # Parse the XML + root = ET.fromstring(nmap_xml) + + # Expected IP addresses + expected_ips = {"127.0.0.1", "127.0.0.2", "127.0.0.3"} + found_ips = set() + + # Iterate over each host in the XML + for host in root.findall("host"): + # Get the IP address + address = host.find("address").get("addr") + found_ips.add(address) + + # Get hostnames if available + hostnames = sorted([hostname.get("name") for hostname in host.findall(".//hostname")]) + + # Get open ports and services + ports = [] + for port in host.findall(".//port"): + port_id = port.get("portid") + state = port.find("state").get("state") + if state == "open": + service_name = port.find("service").get("name") + service_product = port.find("service").get("product", "") + service_extrainfo = port.find("service").get("extrainfo", "") + ports.append((port_id, service_name, service_product, service_extrainfo)) + + # Sort ports for consistency + ports.sort() + + # Assertions + if address == "127.0.0.1": + assert hostnames == ["blacklanternsecurity.com", "www.blacklanternsecurity.com"] + assert ports == sorted([("80", "http", "Apache", "Apache"), ("443", "https", "", "")]) + elif address == "127.0.0.2": + assert hostnames == sorted(["blacklanternsecurity.com"]) + assert ports == sorted([("80", "http", "Apache", "Apache"), ("443", "https", "", "")]) + elif address == "127.0.0.3": + assert hostnames == [] # No hostnames for this IP + assert ports == sorted([("80", "http", "Apache", "Apache"), ("443", "https", "", "")]) + + # Assert that all expected IPs were found + assert found_ips == expected_ips