From 3a3d22d6f3cdd691a5471556c090c4e16c26a82f Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Tue, 19 Sep 2023 13:45:24 -0400 Subject: [PATCH] add per_domain_only module attribute (cleaning up module inheritance) --- bbot/core/helpers/misc.py | 2 + bbot/modules/azure_tenant.py | 5 ++- bbot/modules/base.py | 36 ++++++++++++++++-- bbot/modules/emailformat.py | 5 ++- bbot/modules/templates/root_domains.py | 39 -------------------- bbot/modules/viewdns.py | 12 +----- bbot/test/test_step_1/test_helpers.py | 2 + bbot/test/test_step_1/test_modules_basic.py | 41 +++++++++++++++++++++ 8 files changed, 86 insertions(+), 56 deletions(-) diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index d90c5e1635..942eaabdda 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -407,6 +407,8 @@ def split_domain(hostname): Notes: - Utilizes the `tldextract` function to first break down the hostname. """ + if is_ip(hostname): + return ("", hostname) parsed = tldextract(hostname) subdomain = parsed.subdomain domain = parsed.registered_domain diff --git a/bbot/modules/azure_tenant.py b/bbot/modules/azure_tenant.py index b9ada3d186..4fcf9d7d92 100644 --- a/bbot/modules/azure_tenant.py +++ b/bbot/modules/azure_tenant.py @@ -1,10 +1,10 @@ import re from contextlib import suppress -from bbot.modules.templates.root_domains import root_domains +from bbot.modules.base import BaseModule -class azure_tenant(root_domains): +class azure_tenant(BaseModule): watched_events = ["DNS_NAME"] produced_events = ["DNS_NAME"] flags = ["affiliates", "subdomain-enum", "cloud-enum", "passive", "safe"] @@ -12,6 +12,7 @@ class azure_tenant(root_domains): base_url = "https://autodiscover-s.outlook.com" in_scope_only = True + per_domain_only = True async def setup(self): self.processed = set() diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 8b36d6b9d5..2470f060b1 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -33,7 +33,9 @@ class BaseModule: suppress_dupes (bool): Whether to suppress outgoing duplicate events. Default is True. - per_host_only (bool): Limit the module to only scanning once per host. Default is False. + per_host_only (bool): Limit the module to only scanning once per host:port. Default is False. + + per_domain_only (bool): Limit the module to only scanning once per domain. Default is False. scope_distance_modifier (int, None): Modifies scope distance acceptance for events. Default is 0. ``` @@ -87,6 +89,7 @@ class BaseModule: accept_dupes = False suppress_dupes = True per_host_only = False + per_domain_only = False scope_distance_modifier = 0 target_only = False in_scope_only = False @@ -715,10 +718,18 @@ async def _event_postcheck(self, event): return False, msg if self.per_host_only: - if self.get_per_host_hash(event) in self._per_host_tracker: + _hash = self.get_per_host_hash(event) + if _hash in self._per_host_tracker: return False, "per_host_only enabled and already seen host" else: - self._per_host_tracker.add(self.get_per_host_hash(event)) + self._per_host_tracker.add(_hash) + + if self.per_domain_only: + _hash = self.get_per_domain_hash(event) + if _hash in self._per_host_tracker: + return False, "per_domain_only enabled and already seen domain" + else: + self._per_host_tracker.add(_hash) if self._type == "output" and not event._stats_recorded: event._stats_recorded = True @@ -879,6 +890,25 @@ def get_per_host_hash(self, event): to_hash = f"{parsed.scheme}://{parsed.netloc}/" return hash(to_hash) + def get_per_domain_hash(self, event): + """ + Computes a per-domain hash value for a given event. This method may be optionally overridden in subclasses. + + Events with the same root domain will receive the same hash value. + + Args: + event (Event): The event object containing host, port, or parsed URL information. + + Returns: + int: The hash value computed for the domain. + + Examples: + >>> event = self.make_event("https://www.example.com:8443") + >>> self.get_per_domain_hash(event) + """ + _, domain = self.helpers.split_domain(event.host) + return hash(domain) + @property def name(self): return str(self._name) diff --git a/bbot/modules/emailformat.py b/bbot/modules/emailformat.py index 3817cb3f36..3fd47ee2db 100644 --- a/bbot/modules/emailformat.py +++ b/bbot/modules/emailformat.py @@ -1,12 +1,13 @@ -from bbot.modules.templates.root_domains import root_domains +from bbot.modules.base import BaseModule -class emailformat(root_domains): +class emailformat(BaseModule): watched_events = ["DNS_NAME"] produced_events = ["EMAIL_ADDRESS"] flags = ["passive", "email-enum", "safe"] meta = {"description": "Query email-format.com for email addresses"} in_scope_only = False + per_domain_only = True base_url = "https://www.email-format.com" diff --git a/bbot/modules/templates/root_domains.py b/bbot/modules/templates/root_domains.py index a1eaf8c99c..e852f81a23 100644 --- a/bbot/modules/templates/root_domains.py +++ b/bbot/modules/templates/root_domains.py @@ -28,42 +28,3 @@ async def filter_event(self, event): return False self.processed.add(hash(domain)) return True - - async def handle_event(self, event): - _, query = self.helpers.split_domain(event.data) - for domain, _ in await self.query(query): - self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"]) - # todo: registrar? - - async def query(self, query): - results = set() - url = f"{self.base_url}/reversewhois/?q={query}" - r = await self.helpers.request(url) - status_code = getattr(r, "status_code", 0) - if status_code not in (200,): - self.verbose(f"Error retrieving reverse whois results (status code: {status_code})") - - content = getattr(r, "content", b"") - from bs4 import BeautifulSoup - - html = BeautifulSoup(content, "html.parser") - found = set() - for table_row in html.findAll("tr"): - table_cells = table_row.findAll("td") - # make double-sure we're in the right table by checking the date field - try: - if self.date_regex.match(table_cells[1].text.strip()): - # domain == first cell - domain = table_cells[0].text.strip().lower() - # registrar == last cell - registrar = table_cells[-1].text.strip() - if domain and not domain == query: - result = (domain, registrar) - result_hash = hash(result) - if result_hash not in found: - found.add(result_hash) - results.add(result) - except IndexError: - self.debug(f"Invalid row {str(table_row)[:40]}...") - continue - return results diff --git a/bbot/modules/viewdns.py b/bbot/modules/viewdns.py index 4fbfb08f19..c2a5e44317 100644 --- a/bbot/modules/viewdns.py +++ b/bbot/modules/viewdns.py @@ -5,7 +5,7 @@ class viewdns(BaseModule): """ - Used as a base for modules that only act on root domains and not individual hostnames + Todo: Also retrieve registrar? """ watched_events = ["DNS_NAME"] @@ -16,25 +16,17 @@ class viewdns(BaseModule): } base_url = "https://viewdns.info" in_scope_only = True + per_domain_only = True _qsize = 1 async def setup(self): - self.processed = set() self.date_regex = re.compile(r"\d{4}-\d{2}-\d{2}") return True - async def filter_event(self, event): - _, domain = self.helpers.split_domain(event.data) - if hash(domain) in self.processed: - return False - self.processed.add(hash(domain)) - return True - async def handle_event(self, event): _, query = self.helpers.split_domain(event.data) for domain, _ in await self.query(query): self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"]) - # todo: registrar? async def query(self, query): results = set() diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index f3f35b7abe..f27f4d93b5 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -113,6 +113,8 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_config, bbot_https assert helpers.split_domain("www.test.notreal") == ("www", "test.notreal") assert helpers.split_domain("test.notreal") == ("", "test.notreal") assert helpers.split_domain("notreal") == ("", "notreal") + assert helpers.split_domain("192.168.0.1") == ("", "192.168.0.1") + assert helpers.split_domain("dead::beef") == ("", "dead::beef") assert helpers.split_host_port("https://evilcorp.co.uk") == ("evilcorp.co.uk", 443) assert helpers.split_host_port("http://evilcorp.co.uk:666") == ("evilcorp.co.uk", 666) diff --git a/bbot/test/test_step_1/test_modules_basic.py b/bbot/test/test_step_1/test_modules_basic.py index e14f8c4022..77c25a7a15 100644 --- a/bbot/test/test_step_1/test_modules_basic.py +++ b/bbot/test/test_step_1/test_modules_basic.py @@ -201,6 +201,47 @@ async def test_modules_basic_perhostonly(scan, helpers, events, bbot_config, bbo assert valid_1 == True assert valid_2 == False assert hash("http://evilcorp.com/") in module._per_host_tracker + assert reason_2 == "per_host_only enabled and already seen host" + + else: + assert valid_1 == True + assert valid_2 == True + + +@pytest.mark.asyncio +async def test_modules_basic_perdomainonly(scan, helpers, events, bbot_config, bbot_scanner, httpx_mock, monkeypatch): + per_domain_scan = bbot_scanner( + "evilcorp.com", + modules=list(set(available_modules + available_internal_modules)), + config=bbot_config, + ) + + await per_domain_scan.load_modules() + await per_domain_scan.setup_modules() + per_domain_scan.status = "RUNNING" + + # ensure that multiple events to the same "host" (schema + host) are blocked and check the per host tracker + + for module_name, module in sorted(per_domain_scan.modules.items()): + monkeypatch.setattr(module, "filter_event", BaseModule(per_domain_scan).filter_event) + + if "URL" in module.watched_events: + url_1 = per_domain_scan.make_event( + "http://www.evilcorp.com/1", event_type="URL", source=per_domain_scan.root_event, tags=["status-200"] + ) + url_1.set_scope_distance(0) + url_2 = per_domain_scan.make_event( + "http://mail.evilcorp.com/2", event_type="URL", source=per_domain_scan.root_event, tags=["status-200"] + ) + url_2.set_scope_distance(0) + valid_1, reason_1 = await module._event_postcheck(url_1) + valid_2, reason_2 = await module._event_postcheck(url_2) + + if module.per_domain_only == True: + assert valid_1 == True + assert valid_2 == False + assert hash("evilcorp.com") in module._per_host_tracker + assert reason_2 == "per_domain_only enabled and already seen domain" else: assert valid_1 == True