From 6821954c77982415c1e2e5ea5bd684825e3cbabd Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Thu, 30 Nov 2023 13:24:33 -0500 Subject: [PATCH] better handling of non-HTTP URIs --- bbot/modules/internal/excavate.py | 14 ++--- bbot/test/conftest.py | 2 +- .../module_tests/test_module_excavate.py | 54 +++++++++++++++++-- 3 files changed, 60 insertions(+), 10 deletions(-) diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 04d8000fa..e27e6d228 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -149,6 +149,8 @@ def report(self, result, name, event, **kwargs): host, port = self.excavate.helpers.split_host_port(parsed_uri.netloc) # Handle non-HTTP URIs (ftp, s3, etc.) if not "http" in parsed_uri.scheme.lower(): + # these findings are pretty mundane so don't bother with them if they aren't in scope + abort_if = lambda e: e.scope_distance > 0 event_data = {"host": str(host), "description": f"Non-HTTP URI: {result}"} parsed_url = getattr(event, "parsed", None) if parsed_url: @@ -157,11 +159,16 @@ def report(self, result, name, event, **kwargs): event_data, "FINDING", source=event, + abort_if=abort_if, ) + protocol_data = {"protocol": parsed_uri.scheme, "host": str(host)} + if port: + protocol_data["port"] = port self.excavate.emit_event( - {"protocol": parsed_uri.scheme, "host": str(host)}, + protocol_data, "PROTOCOL", source=event, + abort_if=abort_if, ) return @@ -340,7 +347,6 @@ async def handle_event(self, event): web_spider_distance = getattr(event, "web_spider_distance", 0) num_redirects = max(getattr(event, "num_redirects", 0), web_spider_distance) location = event.data.get("location", "") - host = event.host # if it's a redirect if location: # get the url scheme @@ -361,10 +367,6 @@ async def handle_event(self, event): self.emit_event(url_event) else: self.verbose(f"Exceeded max HTTP redirects ({self.max_redirects}): {location}") - elif scheme: - # we ran into a scheme that's not HTTP or HTTPS - data = {"host": host, "description": f"Non-standard URI scheme: {scheme}://", "url": location} - self.emit_event(data, "FINDING", event) body = self.helpers.recursive_decode(event.data.get("body", "")) # Cloud extractors diff --git a/bbot/test/conftest.py b/bbot/test/conftest.py index 4dcf8ed21..15ef7ecc2 100644 --- a/bbot/test/conftest.py +++ b/bbot/test/conftest.py @@ -18,7 +18,7 @@ def pytest_sessionfinish(session, exitstatus): logger.removeHandler(handler) # Wipe out BBOT home dir - shutil.rmtree("/tmp/.bbot_test", ignore_errors=True) + # shutil.rmtree("/tmp/.bbot_test", ignore_errors=True) yield diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 2d65dde4d..4ca750b4e 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -150,7 +150,7 @@ def check(self, module_test, events): class TestExcavateRedirect(TestExcavate): - targets = ["http://127.0.0.1:8888/", "http://127.0.0.1:8888/relative/"] + targets = ["http://127.0.0.1:8888/", "http://127.0.0.1:8888/relative/", "http://127.0.0.1:8888/nonhttpredirect/"] config_overrides = {"scope_report_distance": 1} async def setup_before_prep(self, module_test): @@ -161,11 +161,59 @@ async def setup_before_prep(self, module_test): module_test.httpserver.expect_request("/relative/").respond_with_data( "", status=302, headers={"Location": "./owa/"} ) + module_test.httpserver.expect_request("/relative/owa/").respond_with_data( + "ftp://127.0.0.1:2121\nsmb://127.0.0.1\nssh://127.0.0.2" + ) + module_test.httpserver.expect_request("/nonhttpredirect/").respond_with_data( + "", status=302, headers={"Location": "awb://127.0.0.1:7777"} + ) module_test.httpserver.no_handler_status_code = 404 def check(self, module_test, events): - assert any(e.data == "https://www.test.notreal/yep" for e in events) - assert any(e.data == "http://127.0.0.1:8888/relative/owa/" for e in events) + assert 1 == len( + [ + e + for e in events + if e.type == "URL_UNVERIFIED" and e.data == "https://www.test.notreal/yep" and e.scope_distance == 1 + ] + ) + assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/relative/owa/"]) + assert 1 == len( + [ + e + for e in events + if e.type == "FINDING" and e.data["description"] == "Non-HTTP URI: awb://127.0.0.1:7777" + ] + ) + assert 1 == len( + [ + e + for e in events + if e.type == "PROTOCOL" and e.data["protocol"] == "AWB" and e.data.get("port", 0) == 7777 + ] + ) + assert 1 == len( + [ + e + for e in events + if e.type == "FINDING" and e.data["description"] == "Non-HTTP URI: ftp://127.0.0.1:2121" + ] + ) + assert 1 == len( + [ + e + for e in events + if e.type == "PROTOCOL" and e.data["protocol"] == "FTP" and e.data.get("port", 0) == 2121 + ] + ) + assert 1 == len( + [e for e in events if e.type == "FINDING" and e.data["description"] == "Non-HTTP URI: smb://127.0.0.1"] + ) + assert 1 == len( + [e for e in events if e.type == "PROTOCOL" and e.data["protocol"] == "SMB" and not "port" in e.data] + ) + assert 0 == len([e for e in events if e.type == "FINDING" and "ssh://127.0.0.1" in e.data["description"]]) + assert 0 == len([e for e in events if e.type == "PROTOCOL" and e.data["protocol"] == "SSH"]) class TestExcavateMaxLinksPerPage(TestExcavate):