Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mock dns test cleanup #1043

Merged
merged 7 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions bbot/core/helpers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,28 +1040,3 @@ def _get_dummy_module(self, name):
dummy_module = self.parent_helper._make_dummy_module(name=name, _type="DNS")
self._dummy_modules[name] = dummy_module
return dummy_module

def mock_dns(self, dns_dict):
if self._orig_resolve_raw is None:
self._orig_resolve_raw = self.resolve_raw

async def mock_resolve_raw(query, **kwargs):
results = []
errors = []
types = self._parse_rdtype(kwargs.get("type", ["A", "AAAA"]))
for t in types:
with suppress(KeyError):
results += self._mock_table[(query, t)]
return results, errors

for (query, rdtype), answers in dns_dict.items():
if isinstance(answers, str):
answers = [answers]
for answer in answers:
rdata = dns.rdata.from_text("IN", rdtype, answer)
try:
self._mock_table[(query, rdtype)].append((rdtype, rdata))
except KeyError:
self._mock_table[(query, rdtype)] = [(rdtype, [rdata])]

self.resolve_raw = mock_resolve_raw
65 changes: 65 additions & 0 deletions bbot/test/bbot_fixtures.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import dns
import sys
import pytest
import asyncio # noqa
Expand Down Expand Up @@ -252,3 +253,67 @@ def install_all_python_deps():
for module in module_loader.preloaded().values():
deps_pip.update(set(module.get("deps", {}).get("pip", [])))
subprocess.run([sys.executable, "-m", "pip", "install"] + list(deps_pip))


class MockResolver:
import dns

def __init__(self, mock_data=None):
self.mock_data = mock_data if mock_data else {}
self.nameservers = ["127.0.0.1"]

async def resolve_address(self, ipaddr, *args, **kwargs):
modified_kwargs = {}
modified_kwargs.update(kwargs)
modified_kwargs["rdtype"] = "PTR"
return await self.resolve(str(dns.reversename.from_address(ipaddr)), *args, **modified_kwargs)

def create_dns_response(self, query_name, rdtype):
query_name = query_name.strip(".")
answers = self.mock_data.get(query_name, {}).get(rdtype, [])
if not answers:
raise self.dns.resolver.NXDOMAIN(f"No answer found for {query_name} {rdtype}")

message_text = f"""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
;QUESTION
{query_name}. IN {rdtype}
;ANSWER"""
for answer in answers:
message_text += f"\n{query_name}. 1 IN {rdtype} {answer}"

message_text += "\n;AUTHORITY\n;ADDITIONAL\n"
message = self.dns.message.from_text(message_text)
return message

async def resolve(self, query_name, rdtype=None):
if rdtype is None:
rdtype = "A"
elif isinstance(rdtype, str):
rdtype = rdtype.upper()
else:
rdtype = str(rdtype.name).upper()

domain_name = self.dns.name.from_text(query_name)
rdtype_obj = self.dns.rdatatype.from_text(rdtype)

if "_NXDOMAIN" in self.mock_data and query_name in self.mock_data["_NXDOMAIN"]:
# Simulate the NXDOMAIN exception
raise self.dns.resolver.NXDOMAIN

try:
response = self.create_dns_response(query_name, rdtype)
answer = self.dns.resolver.Answer(domain_name, rdtype_obj, self.dns.rdataclass.IN, response)
return answer
except self.dns.resolver.NXDOMAIN:
return []


@pytest.fixture()
def mock_dns():
def _mock_dns(scan, mock_data):
scan.helpers.dns.resolver = MockResolver(mock_data)

return _mock_dns
67 changes: 0 additions & 67 deletions bbot/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,70 +179,3 @@ def proxy_server():
# Stop the server.
server.shutdown()
server_thread.join()


class MockResolver:
import dns

def __init__(self, mock_data=None):
self.mock_data = mock_data if mock_data else {}
self.nameservers = ["127.0.0.1"]

async def resolve_address(self, host):
try:
from dns.asyncresolver import resolve_address

result = await resolve_address(host)
return result
except ImportError:
raise ImportError("dns.asyncresolver.Resolver.resolve_address not found")

def create_dns_response(self, query_name, rdtype):
answers = self.mock_data.get(query_name, {}).get(rdtype, [])
if not answers:
raise self.dns.resolver.NXDOMAIN(f"No answer found for {query_name} {rdtype}")

message_text = f"""id 1234
opcode QUERY
rcode NOERROR
flags QR AA RD
;QUESTION
{query_name}. IN {rdtype}
;ANSWER"""
for answer in answers:
message_text += f"\n{query_name}. 1 IN {rdtype} {answer}"

message_text += "\n;AUTHORITY\n;ADDITIONAL\n"
message = self.dns.message.from_text(message_text)
return message

async def resolve(self, query_name, rdtype=None):
if rdtype is None:
rdtype = "A"
elif isinstance(rdtype, str):
rdtype = rdtype.upper()
else:
rdtype = str(rdtype.name).upper()

domain_name = self.dns.name.from_text(query_name)
rdtype_obj = self.dns.rdatatype.from_text(rdtype)

if "_NXDOMAIN" in self.mock_data and query_name in self.mock_data["_NXDOMAIN"]:
# Simulate the NXDOMAIN exception
raise self.dns.resolver.NXDOMAIN

try:
response = self.create_dns_response(query_name, rdtype)
answer = self.dns.resolver.Answer(domain_name, rdtype_obj, self.dns.rdataclass.IN, response)
return answer
except self.dns.resolver.NXDOMAIN:
return []


@pytest.fixture()
def configure_mock_resolver(monkeypatch):
def _configure(mock_data):
mock_resolver = MockResolver(mock_data)
return mock_resolver

return _configure
10 changes: 7 additions & 3 deletions bbot/test/test_step_1/test_dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


@pytest.mark.asyncio
async def test_dns(bbot_scanner, bbot_config):
async def test_dns(bbot_scanner, bbot_config, mock_dns):
scan = bbot_scanner("1.1.1.1", config=bbot_config)
helpers = scan.helpers

Expand Down Expand Up @@ -85,8 +85,12 @@ async def test_dns(bbot_scanner, bbot_config):
dns_config = OmegaConf.create({"dns_resolution": True})
dns_config = OmegaConf.merge(bbot_config, dns_config)
scan2 = bbot_scanner("evilcorp.com", config=dns_config)
scan2.helpers.dns.mock_dns(
{("evilcorp.com", "TXT"): '"v=spf1 include:cloudprovider.com ~all"', ("cloudprovider.com", "A"): "1.2.3.4"}
mock_dns(
scan2,
{
"evilcorp.com": {"TXT": ['"v=spf1 include:cloudprovider.com ~all"']},
"cloudprovider.com": {"A": ["1.2.3.4"]},
},
)
events = [e async for e in scan2.async_start()]
assert 1 == len(
Expand Down
16 changes: 8 additions & 8 deletions bbot/test/test_step_1/test_manager_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@


@pytest.mark.asyncio
async def test_manager_deduplication(bbot_config, bbot_scanner):
async def test_manager_deduplication(bbot_config, bbot_scanner, mock_dns):

class DefaultModule(BaseModule):
_name = "default_module"
Expand Down Expand Up @@ -62,7 +62,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
scan.modules["per_hostport_only"] = per_hostport_only
scan.modules["per_domain_only"] = per_domain_only
if _dns_mock:
scan.helpers.dns.mock_dns(_dns_mock)
mock_dns(scan, _dns_mock)
if scan_callback is not None:
scan_callback(scan)
return (
Expand All @@ -76,12 +76,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
)

dns_mock_chain = {
("default_module.test.notreal", "A"): "127.0.0.3",
("everything_module.test.notreal", "A"): "127.0.0.4",
("no_suppress_dupes.test.notreal", "A"): "127.0.0.5",
("accept_dupes.test.notreal", "A"): "127.0.0.6",
("per_hostport_only.test.notreal", "A"): "127.0.0.7",
("per_domain_only.test.notreal", "A"): "127.0.0.8",
"default_module.test.notreal": {"A": ["127.0.0.3"]},
"everything_module.test.notreal": {"A": ["127.0.0.4"]},
"no_suppress_dupes.test.notreal": {"A": ["127.0.0.5"]},
"accept_dupes.test.notreal": {"A": ["127.0.0.6"]},
"per_hostport_only.test.notreal": {"A": ["127.0.0.7"]},
"per_domain_only.test.notreal": {"A": ["127.0.0.8"]},
}

# dns search distance = 1, report distance = 0
Expand Down
34 changes: 17 additions & 17 deletions bbot/test/test_step_1/test_manager_scope_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def bbot_other_httpservers():


@pytest.mark.asyncio
async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl):
async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers, bbot_httpserver_ssl, mock_dns):
"""
This test ensures that BBOT correctly handles different scope distance settings.
It performs these tests for normal modules, output modules, and their graph variants,
Expand Down Expand Up @@ -102,7 +102,7 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
scan.modules["dummy_graph_output_module"] = dummy_graph_output_module
scan.modules["dummy_graph_batch_output_module"] = dummy_graph_batch_output_module
if _dns_mock:
scan.helpers.dns.mock_dns(_dns_mock)
mock_dns(scan, _dns_mock)
if scan_callback is not None:
scan_callback(scan)
return (
Expand All @@ -114,12 +114,12 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
)

dns_mock_chain = {
("test.notreal", "A"): "127.0.0.66",
("127.0.0.66", "PTR"): "test.notrealzies",
("test.notrealzies", "CNAME"): "www.test.notreal",
("www.test.notreal", "A"): "127.0.0.77",
("127.0.0.77", "PTR"): "test2.notrealzies",
("test2.notrealzies", "A"): "127.0.0.88",
"test.notreal": {"A": ["127.0.0.66"]},
"66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]},
"test.notrealzies": {"CNAME": ["www.test.notreal"]},
"www.test.notreal": {"A": ["127.0.0.77"]},
"77.0.0.127.in-addr.arpa": {"PTR": ["test2.notrealzies"]},
"test2.notrealzies": {"A": ["127.0.0.88"]},
}

# dns search distance = 1, report distance = 0
Expand Down Expand Up @@ -240,9 +240,9 @@ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs)
assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])

dns_mock_chain = {
("test.notreal", "A"): "127.0.0.66",
("127.0.0.66", "PTR"): "test.notrealzies",
("test.notrealzies", "A"): "127.0.0.77",
"test.notreal": {"A": ["127.0.0.66"]},
"66.0.0.127.in-addr.arpa": {"PTR": ["test.notrealzies"]},
"test.notrealzies": {"A": ["127.0.0.77"]},
}

class DummyVulnModule(BaseModule):
Expand Down Expand Up @@ -667,7 +667,7 @@ def custom_setup(scan):
"127.0.0.0/31",
modules=["speculate", "sslcert"],
_config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}},
_dns_mock={("www.bbottest.notreal", "A"): "127.0.1.0", ("test.notreal", "A"): "127.0.0.1"},
_dns_mock={"www.bbottest.notreal": {"A": ["127.0.1.0"]}, "test.notreal": {"A": ["127.0.0.1"]}},
)

assert len(events) == 6
Expand Down Expand Up @@ -727,7 +727,7 @@ def custom_setup(scan):
modules=["speculate", "sslcert"],
whitelist=["127.0.1.0"],
_config={"dns_resolution": False, "scope_report_distance": 0, "internal_modules": {"speculate": {"ports": "9999"}}},
_dns_mock={("www.bbottest.notreal", "A"): "127.0.0.1", ("test.notreal", "A"): "127.0.1.0"},
_dns_mock={"www.bbottest.notreal": {"A": ["127.0.0.1"]}, "test.notreal": {"A": ["127.0.1.0"]}},
)

assert len(events) == 3
Expand Down Expand Up @@ -777,7 +777,7 @@ def custom_setup(scan):


@pytest.mark.asyncio
async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog):
async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, caplog, mock_dns):

bbot_httpserver.expect_request(uri="/").respond_with_data(response_data="<a href='http://www-prod.test.notreal:8888'/><a href='http://www-dev.test.notreal:8888'/>")

Expand All @@ -791,9 +791,9 @@ async def test_manager_blacklist(bbot_config, bbot_scanner, bbot_httpserver, cap
whitelist=["127.0.0.0/29", "test.notreal"],
blacklist=["127.0.0.64/29"],
)
scan.helpers.dns.mock_dns({
("www-prod.test.notreal", "A"): "127.0.0.66",
("www-dev.test.notreal", "A"): "127.0.0.22",
mock_dns(scan, {
"www-prod.test.notreal": {"A": ["127.0.0.66"]},
"www-dev.test.notreal": {"A": ["127.0.0.22"]},
})

events = [e async for e in scan.async_start()]
Expand Down
Loading
Loading