Skip to content

Commit

Permalink
add per_domain_only module attribute (cleaning up module inheritance)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechromancer committed Sep 20, 2023
1 parent 89931dc commit 499c85a
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 56 deletions.
2 changes: 2 additions & 0 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ def split_domain(hostname):
Notes:
- Utilizes the `tldextract` function to first break down the hostname.
"""
if is_ip(hostname):
return ("", hostname)
parsed = tldextract(hostname)
subdomain = parsed.subdomain
domain = parsed.registered_domain
Expand Down
5 changes: 3 additions & 2 deletions bbot/modules/azure_tenant.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import re
from contextlib import suppress

from bbot.modules.templates.root_domains import root_domains
from bbot.modules.base import BaseModule


class azure_tenant(root_domains):
class azure_tenant(BaseModule):
watched_events = ["DNS_NAME"]
produced_events = ["DNS_NAME"]
flags = ["affiliates", "subdomain-enum", "cloud-enum", "passive", "safe"]
meta = {"description": "Query Azure for tenant sister domains"}

base_url = "https://autodiscover-s.outlook.com"
in_scope_only = True
per_domain_only = True

async def setup(self):
self.processed = set()
Expand Down
36 changes: 33 additions & 3 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class BaseModule:
suppress_dupes (bool): Whether to suppress outgoing duplicate events. Default is True.
per_host_only (bool): Limit the module to only scanning once per host. Default is False.
per_host_only (bool): Limit the module to only scanning once per host:port. Default is False.
per_domain_only (bool): Limit the module to only scanning once per domain. Default is False.
scope_distance_modifier (int, None): Modifies scope distance acceptance for events. Default is 0.
```
Expand Down Expand Up @@ -87,6 +89,7 @@ class BaseModule:
accept_dupes = False
suppress_dupes = True
per_host_only = False
per_domain_only = False
scope_distance_modifier = 0
target_only = False
in_scope_only = False
Expand Down Expand Up @@ -715,10 +718,18 @@ async def _event_postcheck(self, event):
return False, msg

if self.per_host_only:
if self.get_per_host_hash(event) in self._per_host_tracker:
_hash = self.get_per_host_hash(event)
if _hash in self._per_host_tracker:
return False, "per_host_only enabled and already seen host"
else:
self._per_host_tracker.add(self.get_per_host_hash(event))
self._per_host_tracker.add(_hash)

if self.per_domain_only:
_hash = self.get_per_domain_hash(event)
if _hash in self._per_host_tracker:
return False, "per_domain_only enabled and already seen domain"
else:
self._per_host_tracker.add(_hash)

if self._type == "output" and not event._stats_recorded:
event._stats_recorded = True
Expand Down Expand Up @@ -879,6 +890,25 @@ def get_per_host_hash(self, event):
to_hash = f"{parsed.scheme}://{parsed.netloc}/"
return hash(to_hash)

def get_per_domain_hash(self, event):
"""
Computes a per-domain hash value for a given event. This method may be optionally overridden in subclasses.
Events with the same root domain will receive the same hash value.
Args:
event (Event): The event object containing host, port, or parsed URL information.
Returns:
int: The hash value computed for the domain.
Examples:
>>> event = self.make_event("https://www.example.com:8443")
>>> self.get_per_domain_hash(event)
"""
_, domain = self.helpers.split_domain(event.host)
return hash(domain)

@property
def name(self):
return str(self._name)
Expand Down
5 changes: 3 additions & 2 deletions bbot/modules/emailformat.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from bbot.modules.templates.root_domains import root_domains
from bbot.modules.base import BaseModule


class emailformat(root_domains):
class emailformat(BaseModule):
watched_events = ["DNS_NAME"]
produced_events = ["EMAIL_ADDRESS"]
flags = ["passive", "email-enum", "safe"]
meta = {"description": "Query email-format.com for email addresses"}
in_scope_only = False
per_domain_only = True

base_url = "https://www.email-format.com"

Expand Down
39 changes: 0 additions & 39 deletions bbot/modules/templates/root_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,42 +28,3 @@ async def filter_event(self, event):
return False
self.processed.add(hash(domain))
return True

async def handle_event(self, event):
_, query = self.helpers.split_domain(event.data)
for domain, _ in await self.query(query):
self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"])
# todo: registrar?

async def query(self, query):
results = set()
url = f"{self.base_url}/reversewhois/?q={query}"
r = await self.helpers.request(url)
status_code = getattr(r, "status_code", 0)
if status_code not in (200,):
self.verbose(f"Error retrieving reverse whois results (status code: {status_code})")

content = getattr(r, "content", b"")
from bs4 import BeautifulSoup

html = BeautifulSoup(content, "html.parser")
found = set()
for table_row in html.findAll("tr"):
table_cells = table_row.findAll("td")
# make double-sure we're in the right table by checking the date field
try:
if self.date_regex.match(table_cells[1].text.strip()):
# domain == first cell
domain = table_cells[0].text.strip().lower()
# registrar == last cell
registrar = table_cells[-1].text.strip()
if domain and not domain == query:
result = (domain, registrar)
result_hash = hash(result)
if result_hash not in found:
found.add(result_hash)
results.add(result)
except IndexError:
self.debug(f"Invalid row {str(table_row)[:40]}...")
continue
return results
12 changes: 2 additions & 10 deletions bbot/modules/viewdns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class viewdns(BaseModule):
"""
Used as a base for modules that only act on root domains and not individual hostnames
Todo: Also retrieve registrar?
"""

watched_events = ["DNS_NAME"]
Expand All @@ -16,25 +16,17 @@ class viewdns(BaseModule):
}
base_url = "https://viewdns.info"
in_scope_only = True
per_domain_only = True
_qsize = 1

async def setup(self):
self.processed = set()
self.date_regex = re.compile(r"\d{4}-\d{2}-\d{2}")
return True

async def filter_event(self, event):
_, domain = self.helpers.split_domain(event.data)
if hash(domain) in self.processed:
return False
self.processed.add(hash(domain))
return True

async def handle_event(self, event):
_, query = self.helpers.split_domain(event.data)
for domain, _ in await self.query(query):
self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"])
# todo: registrar?

async def query(self, query):
results = set()
Expand Down
2 changes: 2 additions & 0 deletions bbot/test/test_step_1/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_config, bbot_https
assert helpers.split_domain("www.test.notreal") == ("www", "test.notreal")
assert helpers.split_domain("test.notreal") == ("", "test.notreal")
assert helpers.split_domain("notreal") == ("", "notreal")
assert helpers.split_domain("192.168.0.1") == ("", "192.168.0.1")
assert helpers.split_domain("dead::beef") == ("", "dead::beef")

assert helpers.split_host_port("https://evilcorp.co.uk") == ("evilcorp.co.uk", 443)
assert helpers.split_host_port("http://evilcorp.co.uk:666") == ("evilcorp.co.uk", 666)
Expand Down
41 changes: 41 additions & 0 deletions bbot/test/test_step_1/test_modules_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,47 @@ async def test_modules_basic_perhostonly(scan, helpers, events, bbot_config, bbo
assert valid_1 == True
assert valid_2 == False
assert hash("http://evilcorp.com/") in module._per_host_tracker
assert reason_2 == "per_host_only enabled and already seen host"

else:
assert valid_1 == True
assert valid_2 == True


@pytest.mark.asyncio
async def test_modules_basic_perdomainonly(scan, helpers, events, bbot_config, bbot_scanner, httpx_mock, monkeypatch):
per_domain_scan = bbot_scanner(
"evilcorp.com",
modules=list(set(available_modules + available_internal_modules)),
config=bbot_config,
)

await per_domain_scan.load_modules()
await per_domain_scan.setup_modules()
per_domain_scan.status = "RUNNING"

# ensure that multiple events to the same "host" (schema + host) are blocked and check the per host tracker

for module_name, module in sorted(per_domain_scan.modules.items()):
monkeypatch.setattr(module, "filter_event", BaseModule(per_domain_scan).filter_event)

if "URL" in module.watched_events:
url_1 = per_domain_scan.make_event(
"http://www.evilcorp.com/1", event_type="URL", source=per_domain_scan.root_event, tags=["status-200"]
)
url_1.set_scope_distance(0)
url_2 = per_domain_scan.make_event(
"http://mail.evilcorp.com/2", event_type="URL", source=per_domain_scan.root_event, tags=["status-200"]
)
url_2.set_scope_distance(0)
valid_1, reason_1 = await module._event_postcheck(url_1)
valid_2, reason_2 = await module._event_postcheck(url_2)

if module.per_domain_only == True:
assert valid_1 == True
assert valid_2 == False
assert hash("evilcorp.com") in module._per_host_tracker
assert reason_2 == "per_domain_only enabled and already seen domain"

else:
assert valid_1 == True
Expand Down

0 comments on commit 499c85a

Please sign in to comment.