Skip to content

Commit

Permalink
Merge pull request #1043 from blacklanternsecurity/mock-dns-cleanup
Browse files Browse the repository at this point in the history
Mock dns test cleanup
  • Loading branch information
liquidsec authored Feb 8, 2024
2 parents 0097506 + 500930a commit 036b1d7
Show file tree
Hide file tree
Showing 19 changed files with 192 additions and 210 deletions.
25 changes: 0 additions & 25 deletions bbot/core/helpers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,28 +1040,3 @@ def _get_dummy_module(self, name):
dummy_module = self.parent_helper._make_dummy_module(name=name, _type="DNS")
self._dummy_modules[name] = dummy_module
return dummy_module

def mock_dns(self, dns_dict):
if self._orig_resolve_raw is None:
self._orig_resolve_raw = self.resolve_raw

async def mock_resolve_raw(query, **kwargs):
results = []
errors = []
types = self._parse_rdtype(kwargs.get("type", ["A", "AAAA"]))
for t in types:
with suppress(KeyError):
results += self._mock_table[(query, t)]
return results, errors

for (query, rdtype), answers in dns_dict.items():
if isinstance(answers, str):
answers = [answers]
for answer in answers:
rdata = dns.rdata.from_text("IN", rdtype, answer)
try:
self._mock_table[(query, rdtype)].append((rdtype, rdata))
except KeyError:
self._mock_table[(query, rdtype)] = [(rdtype, [rdata])]

self.resolve_raw = mock_resolve_raw
65 changes: 65 additions & 0 deletions bbot/test/bbot_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import dns
import sys
import pytest
import asyncio # noqa
Expand Down Expand Up @@ -252,3 +253,67 @@ def install_all_python_deps():
for module in module_loader.preloaded().values():
deps_pip.update(set(module.get("deps", {}).get("pip", [])))
subprocess.run([sys.executable, "-m", "pip", "install"] + list(deps_pip))


class MockResolver:
import dns

def __init__(self, mock_data=None):
self.mock_data = mock_data if mock_data else {}
self.nameservers = ["127.0.0.1"]

async def resolve_address(self, ipaddr, *args, **kwargs):
modified_kwargs = {}
modified_kwargs.update(kwargs)
modified_kwargs["rdtype"] = "PTR"
return await self.resolve(str(dns.reversename.from_address(ipaddr)), *args, **modified_kwargs)

def create_dns_response(self, query_name, rdtype):
query_name = query_name.strip(".")
answers = self.mock_data.get(query_name, {}).get(rdtype, [])
if not answers:
raise self.dns.resolver.NXDOMAIN(f"No answer found for {query_name} {rdtype}")

message_text = f"""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
;QUESTION
{query_name}. IN {rdtype}
;ANSWER"""
for answer in answers:
message_text += f"\n{query_name}. 1 IN {rdtype} {answer}"

message_text += "\n;AUTHORITY\n;ADDITIONAL\n"
message = self.dns.message.from_text(message_text)
return message

async def resolve(self, query_name, rdtype=None):
if rdtype is None:
rdtype = "A"
elif isinstance(rdtype, str):
rdtype = rdtype.upper()
else:
rdtype = str(rdtype.name).upper()

domain_name = self.dns.name.from_text(query_name)
rdtype_obj = self.dns.rdatatype.from_text(rdtype)

if "_NXDOMAIN" in self.mock_data and query_name in self.mock_data["_NXDOMAIN"]:
# Simulate the NXDOMAIN exception
raise self.dns.resolver.NXDOMAIN

try:
response = self.create_dns_response(query_name, rdtype)
answer = self.dns.resolver.Answer(domain_name, rdtype_obj, self.dns.rdataclass.IN, response)
return answer
except self.dns.resolver.NXDOMAIN:
return []


@pytest.fixture()
def mock_dns():
def _mock_dns(scan, mock_data):
scan.helpers.dns.resolver = MockResolver(mock_data)

return _mock_dns
67 changes: 0 additions & 67 deletions bbot/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,70 +179,3 @@ def proxy_server():
# Stop the server.
server.shutdown()
server_thread.join()


class MockResolver:
import dns

def __init__(self, mock_data=None):
self.mock_data = mock_data if mock_data else {}
self.nameservers = ["127.0.0.1"]

async def resolve_address(self, host):
try:
from dns.asyncresolver import resolve_address

result = await resolve_address(host)
return result
except ImportError:
raise ImportError("dns.asyncresolver.Resolver.resolve_address not found")

def create_dns_response(self, query_name, rdtype):
answers = self.mock_data.get(query_name, {}).get(rdtype, [])
if not answers:
raise self.dns.resolver.NXDOMAIN(f"No answer found for {query_name} {rdtype}")

message_text = f"""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
;QUESTION
{query_name}. IN {rdtype}
;ANSWER"""
for answer in answers:
message_text += f"\n{query_name}. 1 IN {rdtype} {answer}"

message_text += "\n;AUTHORITY\n;ADDITIONAL\n"
message = self.dns.message.from_text(message_text)
return message

async def resolve(self, query_name, rdtype=None):
if rdtype is None:
rdtype = "A"
elif isinstance(rdtype, str):
rdtype = rdtype.upper()
else:
rdtype = str(rdtype.name).upper()

domain_name = self.dns.name.from_text(query_name)
rdtype_obj = self.dns.rdatatype.from_text(rdtype)

if "_NXDOMAIN" in self.mock_data and query_name in self.mock_data["_NXDOMAIN"]:
# Simulate the NXDOMAIN exception
raise self.dns.resolver.NXDOMAIN

try:
response = self.create_dns_response(query_name, rdtype)
answer = self.dns.resolver.Answer(domain_name, rdtype_obj, self.dns.rdataclass.IN, response)
return answer
except self.dns.resolver.NXDOMAIN:
return []


@pytest.fixture()
def configure_mock_resolver(monkeypatch):
def _configure(mock_data):
mock_resolver = MockResolver(mock_data)
return mock_resolver

return _configure
10 changes: 7 additions & 3 deletions bbot/test/test_step_1/test_dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


@pytest.mark.asyncio
async def test_dns(bbot_scanner, bbot_config):
async def test_dns(bbot_scanner, bbot_config, mock_dns):
scan = bbot_scanner("1.1.1.1", config=bbot_config)
helpers = scan.helpers

Expand Down Expand Up @@ -85,8 +85,12 @@ async def test_dns(bbot_scanner, bbot_config):
dns_config = OmegaConf.create({"dns_resolution": True})
dns_config = OmegaConf.merge(bbot_config, dns_config)
scan2 = bbot_scanner("evilcorp.com", config=dns_config)
scan2.helpers.dns.mock_dns(
{("evilcorp.com", "TXT"): '"v=spf1 include:cloudprovider.com ~all"', ("cloudprovider.com", "A"): "1.2.3.4"}
mock_dns(
scan2,
{
"evilcorp.com": {"TXT": ['"v=spf1 include:cloudprovider.com ~all"']},
"cloudprovider.com": {"A": ["1.2.3.4"]},
},
)
events = [e async for e in scan2.async_start()]
assert 1 == len(
Expand Down
16 changes: 8 additions & 8 deletions bbot/test/test_step_1/test_manager_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


@pytest.mark.asyncio
async def test_manager_deduplication(bbot_config, bbot_scanner):
async def test_manager_deduplication(bbot_config, bbot_scanner, mock_dns):

class DefaultModule(BaseModule):
_name = "default_module"
Expand Down Expand Up @@ -62,7 +62,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
scan.modules["per_hostport_only"] = per_hostport_only
scan.modules["per_domain_only"] = per_domain_only
if _dns_mock:
scan.helpers.dns.mock_dns(_dns_mock)
mock_dns(scan, _dns_mock)
if scan_callback is not None:
scan_callback(scan)
return (
Expand All @@ -76,12 +76,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
)

dns_mock_chain = {
("default_module.test.notreal", "A"): "127.0.0.3",
("everything_module.test.notreal", "A"): "127.0.0.4",
("no_suppress_dupes.test.notreal", "A"): "127.0.0.5",
("accept_dupes.test.notreal", "A"): "127.0.0.6",
("per_hostport_only.test.notreal", "A"): "127.0.0.7",
("per_domain_only.test.notreal", "A"): "127.0.0.8",
"default_module.test.notreal": {"A": ["127.0.0.3"]},
"everything_module.test.notreal": {"A": ["127.0.0.4"]},
"no_suppress_dupes.test.notreal": {"A": ["127.0.0.5"]},
"accept_dupes.test.notreal": {"A": ["127.0.0.6"]},
"per_hostport_only.test.notreal": {"A": ["127.0.0.7"]},
"per_domain_only.test.notreal": {"A": ["127.0.0.8"]},
}

# dns search distance = 1, report distance = 0
Expand Down
34 changes: 17 additions & 17 deletions bbot/test/test_step_1/test_manager_scope_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def bbot_other_httpservers():


@pytest.mark.asyncio
async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl):
async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl, mock_dns):
"""
This test ensures that BBOT correctly handles different scope distance settings.
It performs these tests for normal modules, output modules, and their graph variants,
Expand Down Expand Up @@ -102,7 +102,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
scan.modules["dummy_graph_output_module"] = dummy_graph_output_module
scan.modules["dummy_graph_batch_output_module"] = dummy_graph_batch_output_module
if _dns_mock:
scan.helpers.dns.mock_dns(_dns_mock)
mock_dns(scan, _dns_mock)
if scan_callback is not None:
scan_callback(scan)
return (
Expand All @@ -114,12 +114,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
)

dns_mock_chain = {
("test.notreal", "A"): "127.0.0.66",
("127.0.0.66", "PTR"): "test.notrealzies",
("test.notrealzies", "CNAME"): "www.test.notreal",
("www.test.notreal", "A"): "127.0.0.77",
("127.0.0.77", "PTR"): "test2.notrealzies",
("test2.notrealzies", "A"): "127.0.0.88",
"test.notreal": {"A": ["127.0.0.66"]},
"66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]},
"test.notrealzies": {"CNAME": ["www.test.notreal"]},
"www.test.notreal": {"A": ["127.0.0.77"]},
"77.0.0.127.in-addr.arpa": {"PTR": ["test2.notrealzies"]},
"test2.notrealzies": {"A": ["127.0.0.88"]},
}

# dns search distance = 1, report distance = 0
Expand Down Expand Up @@ -240,9 +240,9 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])

dns_mock_chain = {
("test.notreal", "A"): "127.0.0.66",
("127.0.0.66", "PTR"): "test.notrealzies",
("test.notrealzies", "A"): "127.0.0.77",
"test.notreal": {"A": ["127.0.0.66"]},
"66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]},
"test.notrealzies": {"A": ["127.0.0.77"]},
}

class DummyVulnModule(BaseModule):
Expand Down Expand Up @@ -667,7 +667,7 @@ def custom_setup(scan):
"127.0.0.0/31",
modules=["speculate", "sslcert"],
_config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}},
_dns_mock={("www.bbottest.notreal", "A"): "127.0.1.0", ("test.notreal", "A"): "127.0.0.1"},
_dns_mock={"www.bbottest.notreal": {"A": ["127.0.1.0"]}, "test.notreal": {"A": ["127.0.0.1"]}},
)

assert len(events) == 6
Expand Down Expand Up @@ -727,7 +727,7 @@ def custom_setup(scan):
modules=["speculate", "sslcert"],
whitelist=["127.0.1.0"],
_config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}},
_dns_mock={("www.bbottest.notreal", "A"): "127.0.0.1", ("test.notreal", "A"): "127.0.1.0"},
_dns_mock={"www.bbottest.notreal": {"A": ["127.0.0.1"]}, "test.notreal": {"A": ["127.0.1.0"]}},
)

assert len(events) == 3
Expand Down Expand Up @@ -777,7 +777,7 @@ def custom_setup(scan):


@pytest.mark.asyncio
async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog):
async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog, mock_dns):

bbot_httpserver.expect_request(uri="/").respond_with_data(response_data="<a href='http://www-prod.test.notreal:8888'/><a href='http://www-dev.test.notreal:8888'/>")

Expand All @@ -791,9 +791,9 @@ async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, cap
whitelist=["127.0.0.0/29", "test.notreal"],
blacklist=["127.0.0.64/29"],
)
scan.helpers.dns.mock_dns({
("www-prod.test.notreal", "A"): "127.0.0.66",
("www-dev.test.notreal", "A"): "127.0.0.22",
mock_dns(scan, {
"www-prod.test.notreal": {"A": ["127.0.0.66"]},
"www-dev.test.notreal": {"A": ["127.0.0.22"]},
})

events = [e async for e in scan.async_start()]
Expand Down
Loading

0 comments on commit 036b1d7

Please sign in to comment.