-
Notifications
You must be signed in to change notification settings - Fork 561
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #896 from domwhewell-sage/dastardly-scanner
Dastardly scanner
- Loading branch information
Showing
5 changed files
with
230 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
from lxml import etree | ||
from bbot.modules.base import BaseModule | ||
|
||
|
||
class dastardly(BaseModule): | ||
watched_events = ["HTTP_RESPONSE"] | ||
produced_events = ["FINDING", "VULNERABILITY"] | ||
flags = ["active", "aggressive", "slow", "web-thorough"] | ||
meta = {"description": "Lightweight web application security scanner"} | ||
|
||
deps_pip = ["lxml~=4.9.2"] | ||
deps_ansible = [ | ||
{ | ||
"name": "Check if Docker is already installed", | ||
"command": "docker --version", | ||
"register": "docker_installed", | ||
"ignore_errors": True, | ||
}, | ||
{ | ||
"name": "Install Docker (Non-Debian)", | ||
"package": {"name": "docker", "state": "present"}, | ||
"become": True, | ||
"when": "ansible_facts['os_family'] != 'Debian' and docker_installed.rc != 0", | ||
}, | ||
{ | ||
"name": "Install Docker (Debian)", | ||
"package": { | ||
"name": "docker.io", | ||
"state": "present", | ||
}, | ||
"become": True, | ||
"when": "ansible_facts['os_family'] == 'Debian' and docker_installed.rc != 0", | ||
}, | ||
] | ||
per_host_only = True | ||
|
||
async def setup(self): | ||
await self.helpers.run("systemctl", "start", "docker", sudo=True) | ||
await self.helpers.run("docker", "pull", "public.ecr.aws/portswigger/dastardly:latest", sudo=True) | ||
self.output_dir = self.scan.home / "dastardly" | ||
self.helpers.mkdir(self.output_dir) | ||
return True | ||
|
||
async def filter_event(self, event): | ||
# Reject redirects. This helps to avoid scanning the same site twice. | ||
is_redirect = str(event.data["status_code"]).startswith("30") | ||
if is_redirect: | ||
return False, "URL is a redirect" | ||
return True | ||
|
||
async def handle_event(self, event): | ||
host = event.parsed._replace(path="/").geturl() | ||
self.verbose(f"Running Dastardly scan against {host}") | ||
command, output_file = self.construct_command(host) | ||
finished_proc = await self.helpers.run(command, sudo=True) | ||
self.debug(f'dastardly stdout: {getattr(finished_proc, "stdout", "")}') | ||
self.debug(f'dastardly stderr: {getattr(finished_proc, "stderr", "")}') | ||
for testsuite in self.parse_dastardly_xml(output_file): | ||
url = testsuite.endpoint | ||
for testcase in testsuite.testcases: | ||
for failure in testcase.failures: | ||
if failure.severity == "Info": | ||
self.emit_event( | ||
{ | ||
"host": str(event.host), | ||
"url": url, | ||
"description": failure.instance, | ||
}, | ||
"FINDING", | ||
event, | ||
) | ||
else: | ||
self.emit_event( | ||
{ | ||
"severity": failure.severity, | ||
"host": str(event.host), | ||
"url": url, | ||
"description": failure.instance, | ||
}, | ||
"VULNERABILITY", | ||
event, | ||
) | ||
|
||
def construct_command(self, target): | ||
date_time = self.helpers.make_date() | ||
file_name = self.helpers.tagify(target) | ||
temp_path = self.output_dir / f"{date_time}_{file_name}.xml" | ||
command = [ | ||
"docker", | ||
"run", | ||
"--user", | ||
"0", | ||
"--rm", | ||
"-v", | ||
f"{self.output_dir}:/dastardly", | ||
"-e", | ||
f"BURP_START_URL={target}", | ||
"-e", | ||
f"BURP_REPORT_FILE_PATH=/dastardly/{temp_path.name}", | ||
"public.ecr.aws/portswigger/dastardly:latest", | ||
] | ||
return command, temp_path | ||
|
||
def parse_dastardly_xml(self, xml_file): | ||
try: | ||
with open(xml_file, "rb") as f: | ||
et = etree.parse(f) | ||
for testsuite in et.iter("testsuite"): | ||
yield TestSuite(testsuite) | ||
except Exception as e: | ||
self.warning(f"Error parsing Dastardly XML at {xml_file}: {e}") | ||
|
||
|
||
class Failure: | ||
def __init__(self, xml): | ||
self.etree = xml | ||
|
||
# instance information | ||
self.instance = self.etree.attrib.get("message", "") | ||
self.severity = self.etree.attrib.get("type", "") | ||
self.text = self.etree.text | ||
|
||
|
||
class TestCase: | ||
def __init__(self, xml): | ||
self.etree = xml | ||
|
||
# title information | ||
self.title = self.etree.attrib.get("name", "") | ||
|
||
# findings / failures(as dastardly names them) | ||
self.failures = [] | ||
for failure in self.etree.findall("failure"): | ||
self.failures.append(Failure(failure)) | ||
|
||
|
||
class TestSuite: | ||
def __init__(self, xml): | ||
self.etree = xml | ||
|
||
# endpoint information | ||
self.endpoint = self.etree.attrib.get("name", "") | ||
|
||
# test cases | ||
self.testcases = [] | ||
for testcase in self.etree.findall("testcase"): | ||
self.testcases.append(TestCase(testcase)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
bbot/test/test_step_2/module_tests/test_module_dastardly.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
import json | ||
from werkzeug import Response | ||
|
||
from .base import ModuleTestBase | ||
|
||
|
||
class TestDastardly(ModuleTestBase): | ||
targets = ["http://127.0.0.1:5556/"] | ||
modules_overrides = ["httpx", "dastardly"] | ||
|
||
web_response = """<!DOCTYPE html> | ||
<html> | ||
<body> | ||
<a href="/test?test=yes">visit this<a/> | ||
</body> | ||
</html>""" | ||
|
||
def xss_handler(self, request): | ||
response = f"""<!DOCTYPE html> | ||
<html> | ||
<head> | ||
<title>Email Form</title> | ||
</head> | ||
<body> | ||
{request.args.get("test", "")} | ||
</body> | ||
</html>""" | ||
return Response(response, content_type="text/html") | ||
|
||
async def get_docker_ip(self, module_test): | ||
docker_ip = "172.17.0.1" | ||
try: | ||
ip_output = await module_test.scan.helpers.run(["ip", "-j", "-4", "a", "show", "dev", "docker0"]) | ||
interface_json = json.loads(ip_output.stdout) | ||
docker_ip = interface_json[0]["addr_info"][0]["local"] | ||
except Exception: | ||
pass | ||
return docker_ip | ||
|
||
async def setup_after_prep(self, module_test): | ||
httpserver = module_test.request_fixture.getfixturevalue("bbot_httpserver_allinterfaces") | ||
httpserver.expect_request("/").respond_with_data(self.web_response) | ||
httpserver.expect_request("/test").respond_with_handler(self.xss_handler) | ||
|
||
# get docker IP | ||
docker_ip = await self.get_docker_ip(module_test) | ||
module_test.scan.target.add_target(docker_ip) | ||
|
||
# replace 127.0.0.1 with docker host IP to allow dastardly access to local http server | ||
old_filter_event = module_test.module.filter_event | ||
|
||
def new_filter_event(event): | ||
self.new_url = f"http://{docker_ip}:5556/" | ||
event.data["url"] = self.new_url | ||
event.parsed = module_test.scan.helpers.urlparse(self.new_url) | ||
return old_filter_event(event) | ||
|
||
module_test.monkeypatch.setattr(module_test.module, "filter_event", new_filter_event) | ||
|
||
def check(self, module_test, events): | ||
assert 1 == len([e for e in events if e.type == "VULNERABILITY"]) | ||
assert 1 == len( | ||
[e for e in events if e.type == "VULNERABILITY" and f"{self.new_url}test" in e.data["description"]] | ||
) |