Skip to content

Commit

Permalink
Merge pull request #1972 from blacklanternsecurity/fix-wildcard-dedup
Browse files Browse the repository at this point in the history
Fix wildcard deduplication
  • Loading branch information
TheTechromancer authored Nov 18, 2024
2 parents 6b9f07f + 0c5220e commit e3e321a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
13 changes: 12 additions & 1 deletion bbot/modules/internal/dnsresolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ async def handle_event(self, event, **kwargs):
await self.resolve_event(main_host_event, types=non_minimal_rdtypes)
# check for wildcards if the event is within the scan's search distance
if new_event and main_host_event.scope_distance <= self.scan.scope_search_distance:
await self.handle_wildcard_event(main_host_event)
event_data_changed = await self.handle_wildcard_event(main_host_event)
if event_data_changed:
# since data has changed, we check again whether it's a duplicate
if self.scan.ingress_module.is_incoming_duplicate(event, add=True):
if not event._graph_important:
return False, "event was already emitted by its module"
else:
self.debug(
f"Event {event} was already emitted by its module, but it's graph-important so it gets a pass"
)

# if there weren't any DNS children and it's not an IP address, tag as unresolved
if not main_host_event.raw_dns_records and not event_is_ip:
Expand Down Expand Up @@ -152,6 +161,8 @@ async def handle_wildcard_event(self, event):
if wildcard_data != event.data:
self.debug(f'Wildcard detected, changing event.data "{event.data}" --> "{wildcard_data}"')
event.data = wildcard_data
return True
return False

async def emit_dns_children(self, event):
for rdtype, children in event.dns_children.items():
Expand Down
36 changes: 36 additions & 0 deletions bbot/test/test_step_1/test_dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,42 @@ def custom_lookup(query, rdtype):
assert len(modified_wildcard_events) == 0


@pytest.mark.asyncio
async def test_wildcard_deduplication(bbot_scanner):

custom_lookup = """
def custom_lookup(query, rdtype):
if rdtype == "TXT" and query.strip(".").endswith("evilcorp.com"):
return {""}
"""

mock_data = {
"evilcorp.com": {"A": ["127.0.0.1"]},
}

from bbot.modules.base import BaseModule

class DummyModule(BaseModule):
watched_events = ["DNS_NAME"]
per_domain_only = True

async def handle_event(self, event):
for i in range(30):
await self.emit_event(f"www{i}.evilcorp.com", "DNS_NAME", parent=event)

# scan without omitted event type
scan = bbot_scanner(
"evilcorp.com", config={"dns": {"minimal": False, "wildcard_ignore": []}, "omit_event_types": []}
)
await scan.helpers.dns._mock_dns(mock_data, custom_lookup_fn=custom_lookup)
dummy_module = DummyModule(scan)
scan.modules["dummy_module"] = dummy_module
events = [e async for e in scan.async_start()]
dns_name_events = [e for e in events if e.type == "DNS_NAME"]
assert len(dns_name_events) == 2
assert 1 == len([e for e in dns_name_events if e.data == "_wildcard.evilcorp.com"])


@pytest.mark.asyncio
async def test_dns_raw_records(bbot_scanner):

Expand Down

0 comments on commit e3e321a

Please sign in to comment.