diff --git a/bbot/core/flags.py b/bbot/core/flags.py index 63c90fa4c..d8dbf8566 100644 --- a/bbot/core/flags.py +++ b/bbot/core/flags.py @@ -2,6 +2,7 @@ "active": "Makes active connections to target systems", "affiliates": "Discovers affiliated hostnames/domains", "aggressive": "Generates a large amount of network traffic", + "baddns": "Runs all modules from the DNS auditing tool BadDNS", "cloud-enum": "Enumerates cloud resources", "deadly": "Highly aggressive", "email-enum": "Enumerates email addresses", diff --git a/bbot/core/helpers/dns.py b/bbot/core/helpers/dns.py index ecfc71b94..6cbaf9f8e 100644 --- a/bbot/core/helpers/dns.py +++ b/bbot/core/helpers/dns.py @@ -1025,28 +1025,3 @@ def _get_dummy_module(self, name): dummy_module.suppress_dupes = False self._dummy_modules[name] = dummy_module return dummy_module - - def mock_dns(self, dns_dict): - if self._orig_resolve_raw is None: - self._orig_resolve_raw = self.resolve_raw - - async def mock_resolve_raw(query, **kwargs): - results = [] - errors = [] - types = self._parse_rdtype(kwargs.get("type", ["A", "AAAA"])) - for t in types: - with suppress(KeyError): - results += self._mock_table[(query, t)] - return results, errors - - for (query, rdtype), answers in dns_dict.items(): - if isinstance(answers, str): - answers = [answers] - for answer in answers: - rdata = dns.rdata.from_text("IN", rdtype, answer) - try: - self._mock_table[(query, rdtype)].append((rdtype, rdata)) - except KeyError: - self._mock_table[(query, rdtype)] = [(rdtype, [rdata])] - - self.resolve_raw = mock_resolve_raw diff --git a/bbot/core/helpers/names_generator.py b/bbot/core/helpers/names_generator.py index 16432cb0e..3e16b446a 100644 --- a/bbot/core/helpers/names_generator.py +++ b/bbot/core/helpers/names_generator.py @@ -93,6 +93,7 @@ "giddy", "glowering", "glutinous", + "golden", "gothic", "grievous", "gummy", @@ -431,6 +432,7 @@ "gollum", "grace", "gregory", + "gus", "hagrid", "hannah", "harold", diff --git a/bbot/modules/baddns.py b/bbot/modules/baddns.py new file mode 100644 index 000000000..9abfebc84 --- /dev/null +++ b/bbot/modules/baddns.py @@ -0,0 +1,86 @@ +from baddns.base import get_all_modules +from baddns.lib.loader import load_signatures +from .base import BaseModule + +import asyncio +import logging +from bbot.core.logger.logger import include_logger + +include_logger(logging.getLogger("baddns")) + + +class baddns(BaseModule): + watched_events = ["DNS_NAME", "DNS_NAME_UNRESOLVED"] + produced_events = ["FINDING", "VULNERABILITY"] + flags = ["active", "safe", "web-basic", "baddns", "cloud-enum", "subdomain-hijack"] + meta = {"description": "Check hosts for domain/subdomain takeovers"} + options = {"custom_nameservers": [], "only_high_confidence": False} + options_desc = { + "custom_nameservers": "Force BadDNS to use a list of custom nameservers", + "only_high_confidence": "Do not emit low-confidence or generic detections", + } + max_event_handlers = 8 + deps_pip = ["baddns~=1.1.0"] + + def select_modules(self): + selected_modules = [] + for m in get_all_modules(): + if m.name in ["CNAME", "NS", "MX", "references", "TXT"]: + selected_modules.append(m) + return selected_modules + + async def setup(self): + self.custom_nameservers = self.config.get("custom_nameservers", []) or None + if self.custom_nameservers: + self.custom_nameservers = self.helpers.chain_lists(self.custom_nameservers) + self.only_high_confidence = self.config.get("only_high_confidence", False) + self.signatures = load_signatures() + return True + + async def handle_event(self, event): + + tasks = [] + for ModuleClass in self.select_modules(): + module_instance = ModuleClass( + event.data, + http_client_class=self.scan.helpers.web.AsyncClient, + dns_client=self.scan.helpers.dns.resolver, + custom_nameservers=self.custom_nameservers, + signatures=self.signatures, + ) + tasks.append((module_instance, asyncio.create_task(module_instance.dispatch()))) + + for module_instance, task in tasks: + if await task: + results = module_instance.analyze() + if results and len(results) > 0: + for r in results: + r_dict = r.to_dict() + + if r_dict["confidence"] in ["CONFIRMED", "PROBABLE"]: + data = { + "severity": "MEDIUM", + "description": f"{r_dict['description']}. Confidence: [{r_dict['confidence']}] Signature: [{r_dict['signature']}] Indicator: [{r_dict['indicator']}] Trigger: [{r_dict['trigger']}] baddns Module: [{r_dict['module']}]", + "host": str(event.host), + } + await self.emit_event( + data, "VULNERABILITY", event, tags=[f"baddns-{module_instance.name.lower()}"] + ) + + elif r_dict["confidence"] in ["UNLIKELY", "POSSIBLE"] and not self.only_high_confidence: + data = { + "description": f"{r_dict['description']} Confidence: [{r_dict['confidence']}] Signature: [{r_dict['signature']}] Indicator: [{r_dict['indicator']}] Trigger: [{r_dict['trigger']}] baddns Module: [{r_dict['module']}]", + "host": str(event.host), + } + await self.emit_event( + data, "FINDING", event, tags=[f"baddns-{module_instance.name.lower()}"] + ) + else: + self.warning(f"Got unrecognized confidence level: {r['confidence']}") + + found_domains = r_dict.get("found_domains", None) + if found_domains: + for found_domain in found_domains: + await self.emit_event( + found_domain, "DNS_NAME", event, tags=[f"baddns-{module_instance.name.lower()}"] + ) diff --git a/bbot/modules/baddns_zone.py b/bbot/modules/baddns_zone.py new file mode 100644 index 000000000..a42fe2e21 --- /dev/null +++ b/bbot/modules/baddns_zone.py @@ -0,0 +1,34 @@ +from baddns.base import get_all_modules +from .baddns import baddns as baddns_module + +import logging +from bbot.core.logger.logger import include_logger + +include_logger(logging.getLogger("baddns_zone")) + + +class baddns_zone(baddns_module): + watched_events = ["DNS_NAME"] + produced_events = ["FINDING", "VULNERABILITY"] + flags = ["active", "safe", "subdomain-enum", "baddns", "cloud-enum"] + meta = {"description": "Check hosts for DNS zone transfers and NSEC walks"} + options = {"custom_nameservers": [], "only_high_confidence": False} + options_desc = { + "custom_nameservers": "Force BadDNS to use a list of custom nameservers", + "only_high_confidence": "Do not emit low-confidence or generic detections", + } + max_event_handlers = 8 + deps_pip = ["baddns~=1.1.0"] + + def select_modules(self): + selected_modules = [] + for m in get_all_modules(): + if m.name in ["NSEC", "zonetransfer"]: + selected_modules.append(m) + return selected_modules + + # minimize nsec records feeding back into themselves + async def filter_event(self, event): + if "baddns-nsec" in event.tags or "baddns-nsec" in event.source.tags: + return False + return True diff --git a/bbot/modules/dnszonetransfer.py b/bbot/modules/dnszonetransfer.py deleted file mode 100644 index 0a48526dc..000000000 --- a/bbot/modules/dnszonetransfer.py +++ /dev/null @@ -1,70 +0,0 @@ -import dns.zone -import dns.query - -from bbot.modules.base import BaseModule - - -class dnszonetransfer(BaseModule): - flags = ["subdomain-enum", "active", "safe"] - watched_events = ["DNS_NAME"] - produced_events = ["DNS_NAME"] - meta = {"description": "Attempt DNS zone transfers"} - options = {"timeout": 10} - options_desc = {"timeout": "Max seconds to wait before timing out"} - _max_event_handlers = 5 - suppress_dupes = False - - async def setup(self): - self.timeout = self.config.get("timeout", 10) - return True - - async def filter_event(self, event): - if any([x in event.tags for x in ("ns-record", "soa-record")]): - return True - return False - - async def handle_event(self, event): - domain = event.data - self.debug("Finding nameservers with NS/SOA query") - nameservers = list(await self.helpers.resolve(event.data, type=("NS", "SOA"))) - nameserver_ips = set() - for n in nameservers: - nameserver_ips.update(await self.helpers.resolve(n)) - self.debug(f"Found {len(nameservers):} nameservers for domain {domain}") - for nameserver in nameserver_ips: - if self.scan.stopping: - break - try: - self.debug(f"Attempting zone transfer against {nameserver} for domain {domain}") - zone = await self.scan.run_in_executor(self.zone_transfer, nameserver, domain) - except Exception as e: - self.debug(f"Error retrieving zone for {domain}: {e}") - continue - self.hugesuccess(f"Successful zone transfer against {nameserver} for domain {domain}!") - finding_description = f"Successful DNS zone transfer against {nameserver} for {domain}" - await self.emit_event( - {"host": str(event.host), "description": finding_description}, "FINDING", source=event - ) - for name, ttl, rdata in zone.iterate_rdatas(): - if str(name) == "@": - parent_data = domain - else: - parent_data = f"{name}.{domain}" - parent_event = self.make_event(parent_data, "DNS_NAME", event) - if not parent_event or parent_event == event: - parent_event = event - else: - await self.emit_event(parent_event) - for rdtype, t in self.helpers.dns.extract_targets(rdata): - if not self.helpers.is_ip(t): - t = f"{t}.{domain}" - module = self.helpers.dns._get_dummy_module(rdtype) - child_event = self.scan.make_event(t, "DNS_NAME", parent_event, module=module) - await self.emit_event(child_event) - else: - self.debug(f"No data returned by {nameserver} for domain {domain}") - - def zone_transfer(self, nameserver, domain): - xfr_answer = dns.query.xfr(nameserver, domain, timeout=self.timeout, lifetime=self.timeout) - zone = dns.zone.from_xfr(xfr_answer) - return zone diff --git a/bbot/modules/nsec.py b/bbot/modules/nsec.py deleted file mode 100644 index 7d313c140..000000000 --- a/bbot/modules/nsec.py +++ /dev/null @@ -1,46 +0,0 @@ -from bbot.modules.base import BaseModule - - -class NSEC(BaseModule): - watched_events = ["DNS_NAME"] - produced_events = ["DNS_NAME"] - flags = ["subdomain-enum", "passive", "safe"] - meta = {"description": "Enumerate subdomains by NSEC-walking"} - _max_event_handlers = 5 - - async def filter_event(self, event): - if "ns-record" in event.tags: - return True - return False - - async def handle_event(self, event): - emitted_finding = False - async for result in self.nsec_walk(event.data): - if not emitted_finding: - emitted_finding = True - await self.emit_event( - {"host": event.data, "description": f"DNSSEC NSEC Zone Walking Enabled for domain: {event.data}"}, - "FINDING", - source=event, - ) - await self.emit_event(result, "DNS_NAME", source=event) - - async def get_nsec_record(self, domain): - domain = domain.replace("\\000.", "") - try: - for result in await self.helpers.resolve(domain, type="NSEC"): - return str(result) - except Exception as e: - self.warning(f"Error getting NSEC record for {domain}: {e}") - - async def nsec_walk(self, domain): - encountered = set() - current_domain = domain - while 1: - next_domain = await self.get_nsec_record(current_domain) - if next_domain is None or next_domain in encountered: - break - encountered.add(next_domain) - if not next_domain.startswith("\\"): - yield next_domain - current_domain = next_domain diff --git a/bbot/modules/subdomain_hijack.py b/bbot/modules/subdomain_hijack.py deleted file mode 100644 index 8c2611065..000000000 --- a/bbot/modules/subdomain_hijack.py +++ /dev/null @@ -1,137 +0,0 @@ -import re -import json -import httpx - -from bbot.modules.base import BaseModule -from bbot.core.helpers.misc import tldextract - - -class subdomain_hijack(BaseModule): - flags = ["subdomain-hijack", "subdomain-enum", "cloud-enum", "safe", "active", "web-basic", "web-thorough"] - watched_events = ["DNS_NAME", "DNS_NAME_UNRESOLVED"] - produced_events = ["FINDING"] - meta = {"description": "Detect hijackable subdomains"} - options = { - "fingerprints": "https://raw.githubusercontent.com/EdOverflow/can-i-take-over-xyz/master/fingerprints.json" - } - options_desc = {"fingerprints": "URL or path to fingerprints.json"} - scope_distance_modifier = 3 - _max_event_handlers = 5 - - async def setup(self): - fingerprints_url = self.config.get("fingerprints") - fingerprints_file = await self.helpers.wordlist(fingerprints_url) - with open(fingerprints_file) as f: - fingerprints = json.load(f) - self.fingerprints = [] - for f in fingerprints: - try: - f = Fingerprint(f) - except Exception as e: - self.warning(f"Error instantiating fingerprint: {e}") - continue - if not (f.domains and f.vulnerable and f.fingerprint and f.cicd_pass): - self.debug(f"Skipping fingerprint: {f}") - continue - self.debug(f"Processed fingerprint: {f}") - self.fingerprints.append(f) - if not self.fingerprints: - return None, "No valid fingerprints" - self.debug(f"Successfully processed {len(self.fingerprints):,} fingerprints") - return True - - async def handle_event(self, event): - hijackable, reason = await self.check_subdomain(event) - if hijackable: - source_hosts = [] - e = event - while 1: - host = getattr(e, "host", "") - if host: - if e not in source_hosts: - source_hosts.append(e) - e = e.get_source() - else: - break - - url = f"https://{event.host}" - description = f'Hijackable Subdomain "{event.data}": {reason}' - source_hosts = source_hosts[::-1] - if source_hosts: - source_hosts_str = str(source_hosts[0].host) - for e in source_hosts[1:]: - source_hosts_str += f" -[{e.module.name}]-> {e.host}" - description += f" ({source_hosts_str})" - await self.emit_event( - {"host": event.host, "url": url, "description": description}, "FINDING", source=event - ) - else: - self.debug(reason) - - async def check_subdomain(self, event): - for f in self.fingerprints: - for domain in f.domains: - self_matches = self.helpers.host_in_host(event.data, domain) - child_matches = any(self.helpers.host_in_host(domain, h) for h in event.resolved_hosts) - if event.type == "DNS_NAME_UNRESOLVED": - if self_matches and f.nxdomain: - return True, "NXDOMAIN" - else: - if self_matches or child_matches: - for scheme in ("https", "http"): - if self.scan.stopping: - return False, "Scan cancelled" - # first, try base request - url = f"{scheme}://{event.data}" - match, reason = await self._verify_fingerprint(f, url, cache_for=60 * 60 * 24) - if match: - return match, reason - # next, try subdomain -[CNAME]-> other_domain - url = f"{scheme}://{domain}" - headers = {"Host": event.data} - match, reason = await self._verify_fingerprint(f, url, headers=headers) - if match: - return match, reason - return False, f'Subdomain "{event.data}" not hijackable' - - async def _verify_fingerprint(self, fingerprint, *args, **kwargs): - kwargs["raise_error"] = True - kwargs["timeout"] = 10 - kwargs["retries"] = 0 - if fingerprint.http_status is not None: - kwargs["allow_redirects"] = False - try: - r = await self.helpers.request(*args, **kwargs) - if fingerprint.http_status is not None and r.status_code == fingerprint.http_status: - return True, f"HTTP status == {fingerprint.http_status}" - text = getattr(r, "text", "") - if ( - not fingerprint.nxdomain - and not fingerprint.http_status - and fingerprint.fingerprint_regex.findall(text) - ): - return True, "Fingerprint match" - except httpx.RequestError as e: - if fingerprint.nxdomain and "Name or service not known" in str(e): - return True, f"NXDOMAIN" - return False, "No match" - - -class Fingerprint: - def __init__(self, fingerprint): - assert isinstance(fingerprint, dict), "fingerprint must be a dictionary" - self.engine = fingerprint.get("service") - self.cnames = fingerprint.get("cname", []) - self.domains = list(set([tldextract(c).registered_domain for c in self.cnames])) - self.http_status = fingerprint.get("http_status", None) - self.nxdomain = fingerprint.get("nxdomain", False) - self.vulnerable = fingerprint.get("vulnerable", False) - self.fingerprint = fingerprint.get("fingerprint", "") - self.cicd_pass = fingerprint.get("cicd_pass", False) - try: - self.fingerprint_regex = re.compile(self.fingerprint, re.MULTILINE) - except re.error: - self.fingerprint_regex = re.compile(re.escape(self.fingerprint), re.MULTILINE) - - def __str__(self): - return f"{self.engine}: {self.fingerprint} (cnames: {self.cnames}, vulnerable: {self.vulnerable}, cicd_pass: {self.cicd_pass})" diff --git a/bbot/test/bbot_fixtures.py b/bbot/test/bbot_fixtures.py index a08ef3ece..7170e5a70 100644 --- a/bbot/test/bbot_fixtures.py +++ b/bbot/test/bbot_fixtures.py @@ -1,4 +1,5 @@ import os +import dns import sys import pytest import asyncio # noqa @@ -252,3 +253,67 @@ def install_all_python_deps(): for module in module_loader.preloaded().values(): deps_pip.update(set(module.get("deps", {}).get("pip", []))) subprocess.run([sys.executable, "-m", "pip", "install"] + list(deps_pip)) + + +class MockResolver: + import dns + + def __init__(self, mock_data=None): + self.mock_data = mock_data if mock_data else {} + self.nameservers = ["127.0.0.1"] + + async def resolve_address(self, ipaddr, *args, **kwargs): + modified_kwargs = {} + modified_kwargs.update(kwargs) + modified_kwargs["rdtype"] = "PTR" + return await self.resolve(str(dns.reversename.from_address(ipaddr)), *args, **modified_kwargs) + + def create_dns_response(self, query_name, rdtype): + query_name = query_name.strip(".") + answers = self.mock_data.get(query_name, {}).get(rdtype, []) + if not answers: + raise self.dns.resolver.NXDOMAIN(f"No answer found for {query_name} {rdtype}") + + message_text = f"""id 1234 +opcode QUERY +rcode NOERROR +flags QR AA RD +;QUESTION +{query_name}. IN {rdtype} +;ANSWER""" + for answer in answers: + message_text += f"\n{query_name}. 1 IN {rdtype} {answer}" + + message_text += "\n;AUTHORITY\n;ADDITIONAL\n" + message = self.dns.message.from_text(message_text) + return message + + async def resolve(self, query_name, rdtype=None): + if rdtype is None: + rdtype = "A" + elif isinstance(rdtype, str): + rdtype = rdtype.upper() + else: + rdtype = str(rdtype.name).upper() + + domain_name = self.dns.name.from_text(query_name) + rdtype_obj = self.dns.rdatatype.from_text(rdtype) + + if "_NXDOMAIN" in self.mock_data and query_name in self.mock_data["_NXDOMAIN"]: + # Simulate the NXDOMAIN exception + raise self.dns.resolver.NXDOMAIN + + try: + response = self.create_dns_response(query_name, rdtype) + answer = self.dns.resolver.Answer(domain_name, rdtype_obj, self.dns.rdataclass.IN, response) + return answer + except self.dns.resolver.NXDOMAIN: + return [] + + +@pytest.fixture() +def mock_dns(): + def _mock_dns(scan, mock_data): + scan.helpers.dns.resolver = MockResolver(mock_data) + + return _mock_dns diff --git a/bbot/test/test_step_1/test_dns.py b/bbot/test/test_step_1/test_dns.py index debaed2b9..37ddf86a8 100644 --- a/bbot/test/test_step_1/test_dns.py +++ b/bbot/test/test_step_1/test_dns.py @@ -2,7 +2,7 @@ @pytest.mark.asyncio -async def test_dns(bbot_scanner, bbot_config): +async def test_dns(bbot_scanner, bbot_config, mock_dns): scan = bbot_scanner("1.1.1.1", config=bbot_config) helpers = scan.helpers @@ -85,8 +85,12 @@ async def test_dns(bbot_scanner, bbot_config): dns_config = OmegaConf.create({"dns_resolution": True}) dns_config = OmegaConf.merge(bbot_config, dns_config) scan2 = bbot_scanner("evilcorp.com", config=dns_config) - scan2.helpers.dns.mock_dns( - {("evilcorp.com", "TXT"): '"v=spf1 include:cloudprovider.com ~all"', ("cloudprovider.com", "A"): "1.2.3.4"} + mock_dns( + scan2, + { + "evilcorp.com": {"TXT": ['"v=spf1 include:cloudprovider.com ~all"']}, + "cloudprovider.com": {"A": ["1.2.3.4"]}, + }, ) events = [e async for e in scan2.async_start()] assert 1 == len( diff --git a/bbot/test/test_step_1/test_manager_deduplication.py b/bbot/test/test_step_1/test_manager_deduplication.py index e046988ae..e8ec69edc 100644 --- a/bbot/test/test_step_1/test_manager_deduplication.py +++ b/bbot/test/test_step_1/test_manager_deduplication.py @@ -3,7 +3,7 @@ @pytest.mark.asyncio -async def test_manager_deduplication(bbot_config, bbot_scanner): +async def test_manager_deduplication(bbot_config, bbot_scanner, mock_dns): class DefaultModule(BaseModule): _name = "default_module" @@ -62,7 +62,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs) scan.modules["per_hostport_only"] = per_hostport_only scan.modules["per_domain_only"] = per_domain_only if _dns_mock: - scan.helpers.dns.mock_dns(_dns_mock) + mock_dns(scan, _dns_mock) if scan_callback is not None: scan_callback(scan) return ( @@ -76,12 +76,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs) ) dns_mock_chain = { - ("default_module.test.notreal", "A"): "127.0.0.3", - ("everything_module.test.notreal", "A"): "127.0.0.4", - ("no_suppress_dupes.test.notreal", "A"): "127.0.0.5", - ("accept_dupes.test.notreal", "A"): "127.0.0.6", - ("per_hostport_only.test.notreal", "A"): "127.0.0.7", - ("per_domain_only.test.notreal", "A"): "127.0.0.8", + "default_module.test.notreal": {"A": ["127.0.0.3"]}, + "everything_module.test.notreal": {"A": ["127.0.0.4"]}, + "no_suppress_dupes.test.notreal": {"A": ["127.0.0.5"]}, + "accept_dupes.test.notreal": {"A": ["127.0.0.6"]}, + "per_hostport_only.test.notreal": {"A": ["127.0.0.7"]}, + "per_domain_only.test.notreal": {"A": ["127.0.0.8"]}, } # dns search distance = 1, report distance = 0 diff --git a/bbot/test/test_step_1/test_manager_scope_accuracy.py b/bbot/test/test_step_1/test_manager_scope_accuracy.py index a927da8a3..de9ffd72c 100644 --- a/bbot/test/test_step_1/test_manager_scope_accuracy.py +++ b/bbot/test/test_step_1/test_manager_scope_accuracy.py @@ -31,7 +31,7 @@ def bbot_other_httpservers(): @pytest.mark.asyncio -async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl): +async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl, mock_dns): """ This test ensures that BBOT correctly handles different scope distance settings. It performs these tests for normal modules, output modules, and their graph variants, @@ -102,7 +102,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs) scan.modules["dummy_graph_output_module"] = dummy_graph_output_module scan.modules["dummy_graph_batch_output_module"] = dummy_graph_batch_output_module if _dns_mock: - scan.helpers.dns.mock_dns(_dns_mock) + mock_dns(scan, _dns_mock) if scan_callback is not None: scan_callback(scan) return ( @@ -114,12 +114,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs) ) dns_mock_chain = { - ("test.notreal", "A"): "127.0.0.66", - ("127.0.0.66", "PTR"): "test.notrealzies", - ("test.notrealzies", "CNAME"): "www.test.notreal", - ("www.test.notreal", "A"): "127.0.0.77", - ("127.0.0.77", "PTR"): "test2.notrealzies", - ("test2.notrealzies", "A"): "127.0.0.88", + "test.notreal": {"A": ["127.0.0.66"]}, + "66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]}, + "test.notrealzies": {"CNAME": ["www.test.notreal"]}, + "www.test.notreal": {"A": ["127.0.0.77"]}, + "77.0.0.127.in-addr.arpa": {"PTR": ["test2.notrealzies"]}, + "test2.notrealzies": {"A": ["127.0.0.88"]}, } # dns search distance = 1, report distance = 0 @@ -240,9 +240,9 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs) assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"]) dns_mock_chain = { - ("test.notreal", "A"): "127.0.0.66", - ("127.0.0.66", "PTR"): "test.notrealzies", - ("test.notrealzies", "A"): "127.0.0.77", + "test.notreal": {"A": ["127.0.0.66"]}, + "66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]}, + "test.notrealzies": {"A": ["127.0.0.77"]}, } class DummyVulnModule(BaseModule): @@ -667,7 +667,7 @@ def custom_setup(scan): "127.0.0.0/31", modules=["speculate", "sslcert"], _config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}}, - _dns_mock={("www.bbottest.notreal", "A"): "127.0.1.0", ("test.notreal", "A"): "127.0.0.1"}, + _dns_mock={"www.bbottest.notreal": {"A": ["127.0.1.0"]}, "test.notreal": {"A": ["127.0.0.1"]}}, ) assert len(events) == 6 @@ -724,7 +724,7 @@ def custom_setup(scan): modules=["speculate", "sslcert"], whitelist=["127.0.1.0"], _config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}}, - _dns_mock={("www.bbottest.notreal", "A"): "127.0.0.1", ("test.notreal", "A"): "127.0.1.0"}, + _dns_mock={"www.bbottest.notreal": {"A": ["127.0.0.1"]}, "test.notreal": {"A": ["127.0.1.0"]}}, ) assert len(events) == 3 @@ -770,7 +770,7 @@ def custom_setup(scan): @pytest.mark.asyncio -async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog): +async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog, mock_dns): bbot_httpserver.expect_request(uri="/").respond_with_data(response_data="") @@ -784,9 +784,9 @@ async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, cap whitelist=["127.0.0.0/29", "test.notreal"], blacklist=["127.0.0.64/29"], ) - scan.helpers.dns.mock_dns({ - ("www-prod.test.notreal", "A"): "127.0.0.66", - ("www-dev.test.notreal", "A"): "127.0.0.22", + mock_dns(scan, { + "www-prod.test.notreal": {"A": ["127.0.0.66"]}, + "www-dev.test.notreal": {"A": ["127.0.0.22"]}, }) events = [e async for e in scan.async_start()] diff --git a/bbot/test/test_step_1/test_modules_basic.py b/bbot/test/test_step_1/test_modules_basic.py index 7a25bb4ea..8784923ed 100644 --- a/bbot/test/test_step_1/test_modules_basic.py +++ b/bbot/test/test_step_1/test_modules_basic.py @@ -303,7 +303,7 @@ async def test_modules_basic_perdomainonly(scan, helpers, events, bbot_config, b @pytest.mark.asyncio -async def test_modules_basic_stats(helpers, events, bbot_config, bbot_scanner, httpx_mock, monkeypatch): +async def test_modules_basic_stats(helpers, events, bbot_config, bbot_scanner, httpx_mock, monkeypatch, mock_dns): from bbot.modules.base import BaseModule class dummy(BaseModule): @@ -311,23 +311,50 @@ class dummy(BaseModule): watched_events = ["*"] async def handle_event(self, event): + # quick emit events like FINDINGS behave differently than normal ones + # hosts are not speculated from them await self.emit_event( {"host": "www.evilcorp.com", "url": "http://www.evilcorp.com", "description": "asdf"}, "FINDING", event ) + await self.emit_event("https://asdf.evilcorp.com", "URL", event, tags=["status-200"]) scan = bbot_scanner( "evilcorp.com", + modules=["speculate"], config=bbot_config, force_start=True, ) - scan.helpers.dns.mock_dns({("evilcorp.com", "A"): "127.0.254.1", ("www.evilcorp.com", "A"): "127.0.254.2"}) + mock_dns( + scan, + { + "evilcorp.com": {"A": ["127.0.254.1"]}, + "www.evilcorp.com": {"A": ["127.0.254.2"]}, + "asdf.evilcorp.com": {"A": ["127.0.254.3"]}, + }, + ) scan.modules["dummy"] = dummy(scan) events = [e async for e in scan.async_start()] - assert len(events) == 3 - - assert set(scan.stats.module_stats) == {"dummy", "python", "TARGET"} + assert len(events) == 6 + assert 1 == len([e for e in events if e.type == "SCAN"]) + assert 2 == len([e for e in events if e.type == "DNS_NAME"]) + assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "evilcorp.com"]) + # the reason we don't have a DNS_NAME for www.evilcorp.com is because FINDING.quick_emit = True + assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.evilcorp.com"]) + assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "asdf.evilcorp.com"]) + assert 1 == len([e for e in events if e.type == "ORG_STUB" and e.data == "evilcorp"]) + assert 1 == len([e for e in events if e.type == "FINDING"]) + + assert scan.stats.events_emitted_by_type == { + "SCAN": 1, + "DNS_NAME": 2, + "URL": 1, + "FINDING": 1, + "ORG_STUB": 1, + } + + assert set(scan.stats.module_stats) == {"host", "speculate", "python", "dummy", "TARGET"} target_stats = scan.stats.module_stats["TARGET"] assert target_stats.emitted == {"SCAN": 1, "DNS_NAME": 1} @@ -338,19 +365,17 @@ async def handle_event(self, event): assert target_stats.consumed_total == 0 dummy_stats = scan.stats.module_stats["dummy"] - assert dummy_stats.emitted == {"FINDING": 1} - assert dummy_stats.emitted_total == 1 - assert dummy_stats.produced == {"FINDING": 1} - assert dummy_stats.produced_total == 1 - assert dummy_stats.consumed == {"SCAN": 1, "DNS_NAME": 1} - assert dummy_stats.consumed_total == 2 + assert dummy_stats.emitted == {"FINDING": 1, "URL": 1} + assert dummy_stats.emitted_total == 2 + assert dummy_stats.produced == {"FINDING": 1, "URL": 1} + assert dummy_stats.produced_total == 2 + assert dummy_stats.consumed == {"DNS_NAME": 2, "OPEN_TCP_PORT": 1, "SCAN": 1, "URL": 1} + assert dummy_stats.consumed_total == 5 python_stats = scan.stats.module_stats["python"] assert python_stats.emitted == {} assert python_stats.emitted_total == 0 assert python_stats.produced == {} assert python_stats.produced_total == 0 - assert python_stats.consumed == {"SCAN": 1, "FINDING": 1, "DNS_NAME": 1} - assert python_stats.consumed_total == 3 - - assert scan.stats.events_emitted_by_type == {"SCAN": 1, "FINDING": 1, "DNS_NAME": 1} + assert python_stats.consumed == {"DNS_NAME": 2, "FINDING": 1, "ORG_STUB": 1, "SCAN": 1, "URL": 1} + assert python_stats.consumed_total == 6 diff --git a/bbot/test/test_step_1/test_scan.py b/bbot/test/test_step_1/test_scan.py index fde8dc368..9aa89ffe2 100644 --- a/bbot/test/test_step_1/test_scan.py +++ b/bbot/test/test_step_1/test_scan.py @@ -9,6 +9,7 @@ async def test_scan( neograph, monkeypatch, bbot_scanner, + mock_dns, ): scan0 = bbot_scanner( "1.1.1.1/31", @@ -55,15 +56,15 @@ async def test_scan( assert not scan2.in_scope("1.0.0.1") dns_table = { - ("1.1.1.1", "PTR"): "one.one.one.one", - ("one.one.one.one", "A"): "1.1.1.1", + "1.1.1.1.in-addr.arpa": {"PTR": ["one.one.one.one"]}, + "one.one.one.one": {"A": ["1.1.1.1"]}, } # make sure DNS resolution works dns_config = OmegaConf.create({"dns_resolution": True}) dns_config = OmegaConf.merge(bbot_config, dns_config) scan4 = bbot_scanner("1.1.1.1", config=dns_config) - scan4.helpers.dns.mock_dns(dns_table) + mock_dns(scan4, dns_table) events = [] async for event in scan4.async_start(): events.append(event) @@ -74,7 +75,7 @@ async def test_scan( no_dns_config = OmegaConf.create({"dns_resolution": False}) no_dns_config = OmegaConf.merge(bbot_config, no_dns_config) scan5 = bbot_scanner("1.1.1.1", config=no_dns_config) - scan5.helpers.dns.mock_dns(dns_table) + mock_dns(scan5, dns_table) events = [] async for event in scan5.async_start(): events.append(event) diff --git a/bbot/test/test_step_2/module_tests/base.py b/bbot/test/test_step_2/module_tests/base.py index fa8d0a2d3..e92f3a63c 100644 --- a/bbot/test/test_step_2/module_tests/base.py +++ b/bbot/test/test_step_2/module_tests/base.py @@ -8,7 +8,7 @@ from bbot.scanner import Scanner from bbot.modules import module_loader from bbot.core.helpers.misc import rand_string -from ...bbot_fixtures import test_config +from ...bbot_fixtures import test_config, MockResolver log = logging.getLogger("bbot.test.modules") @@ -94,13 +94,15 @@ def set_expect_requests(self, expect_args={}, respond_args={}): def set_expect_requests_handler(self, expect_args=None, request_handler=None): self.httpserver.expect_request(expect_args).respond_with_handler(request_handler) + def mock_dns(self, mock_data, scan=None): + if scan is None: + scan = self.scan + scan.helpers.dns.resolver = MockResolver(mock_data) + @property def module(self): return self.scan.modules[self.name] - def mock_record(self, *args, **kwargs): - return MockRecord(*args, **kwargs) - @pytest_asyncio.fixture async def module_test(self, httpx_mock, bbot_httpserver, bbot_httpserver_ssl, monkeypatch, request): module_test = self.ModuleTest(self, httpx_mock, bbot_httpserver, bbot_httpserver_ssl, monkeypatch, request) diff --git a/bbot/test/test_step_2/module_tests/test_module_affiliates.py b/bbot/test/test_step_2/module_tests/test_module_affiliates.py index fc9bd1c58..4afd4cd29 100644 --- a/bbot/test/test_step_2/module_tests/test_module_affiliates.py +++ b/bbot/test/test_step_2/module_tests/test_module_affiliates.py @@ -6,12 +6,11 @@ class TestAffiliates(ModuleTestBase): config_overrides = {"dns_resolution": True} async def setup_before_prep(self, module_test): - module_test.scan.helpers.dns.mock_dns( + module_test.mock_dns( { - ("8.8.8.8", "PTR"): "dns.google", - ("dns.google", "A"): "8.8.8.8", - ("dns.google", "NS"): "ns1.zdns.google", - ("ns1.zdns.google", "A"): "1.2.3.4", + "8.8.8.8.in-addr.arpa": {"PTR": ["dns.google"]}, + "dns.google": {"A": ["8.8.8.8"], "NS": ["ns1.zdns.google"]}, + "ns1.zdns.google": {"A": ["1.2.3.4"]}, } ) diff --git a/bbot/test/test_step_2/module_tests/test_module_aggregate.py b/bbot/test/test_step_2/module_tests/test_module_aggregate.py index 59f5370b3..7a41fe022 100644 --- a/bbot/test/test_step_2/module_tests/test_module_aggregate.py +++ b/bbot/test/test_step_2/module_tests/test_module_aggregate.py @@ -5,7 +5,7 @@ class TestAggregate(ModuleTestBase): config_overrides = {"dns_resolution": True, "scope_report_distance": 1} async def setup_before_prep(self, module_test): - module_test.scan.helpers.dns.mock_dns({("blacklanternsecurity.com", "A"): "1.2.3.4"}) + module_test.mock_dns({"blacklanternsecurity.com": {"A": ["1.2.3.4"]}}) def check(self, module_test, events): filename = next(module_test.scan.home.glob("scan-stats-table*.txt")) diff --git a/bbot/test/test_step_2/module_tests/test_module_asset_inventory.py b/bbot/test/test_step_2/module_tests/test_module_asset_inventory.py index cc4399266..b63de31ec 100644 --- a/bbot/test/test_step_2/module_tests/test_module_asset_inventory.py +++ b/bbot/test/test_step_2/module_tests/test_module_asset_inventory.py @@ -8,10 +8,10 @@ class TestAsset_Inventory(ModuleTestBase): modules_overrides = ["asset_inventory", "nmap", "sslcert"] async def setup_before_prep(self, module_test): - module_test.scan.helpers.dns.mock_dns( + module_test.mock_dns( { - ("127.0.0.1", "PTR"): "www.bbottest.notreal", - ("www.bbottest.notreal", "A"): "127.0.0.1", + "1.0.0.127.in-addr.arpa": {"PTR": ["www.bbottest.notreal"]}, + "www.bbottest.notreal": {"A": ["127.0.0.1"]}, } ) diff --git a/bbot/test/test_step_2/module_tests/test_module_baddns.py b/bbot/test/test_step_2/module_tests/test_module_baddns.py new file mode 100644 index 000000000..57cca7d5f --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_baddns.py @@ -0,0 +1,67 @@ +from .base import ModuleTestBase + + +class BaseTestBaddns(ModuleTestBase): + modules_overrides = ["baddns"] + targets = ["bad.dns"] + config_overrides = {"dns_resolution": True} + + async def dispatchWHOIS(x): + return None + + def select_modules(self): + from baddns.base import get_all_modules + + selected_modules = [] + for m in get_all_modules(): + if m.name in ["CNAME"]: + selected_modules.append(m) + return selected_modules + + +class TestBaddns_cname_nxdomain(BaseTestBaddns): + async def setup_after_prep(self, module_test): + from bbot.modules import baddns as baddns_module + from baddns.lib.whoismanager import WhoisManager + + module_test.mock_dns( + {"bad.dns": {"CNAME": ["baddns.azurewebsites.net."]}, "_NXDOMAIN": ["baddns.azurewebsites.net"]} + ) + module_test.monkeypatch.setattr(baddns_module.baddns, "select_modules", self.select_modules) + module_test.monkeypatch.setattr(WhoisManager, "dispatchWHOIS", self.dispatchWHOIS) + + def check(self, module_test, events): + assert any(e.data == "baddns.azurewebsites.net" for e in events), "CNAME detection failed" + assert any(e.type == "VULNERABILITY" for e in events), "Failed to emit VULNERABILITY" + assert any("baddns-cname" in e.tags for e in events), "Failed to add baddns tag" + + +class TestBaddns_cname_signature(BaseTestBaddns): + targets = ["bad.dns:8888"] + modules_overrides = ["baddns", "speculate"] + + async def setup_after_prep(self, module_test): + from bbot.modules import baddns as baddns_module + from baddns.base import BadDNS_base + from baddns.lib.whoismanager import WhoisManager + + def set_target(self, target): + return "127.0.0.1:8888" + + expect_args = {"method": "GET", "uri": "/"} + respond_args = {"response_data": "