From 9af5ad83de83d83795914cb06857f980db358401 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Wed, 24 Jan 2024 08:50:15 -0500 Subject: [PATCH] gowitness - don't visit out-of-scope URLs (except social media pages) --- bbot/core/event/base.py | 1 + bbot/modules/gowitness.py | 26 ++++++---- .../module_tests/test_module_gowitness.py | 47 +++++++++++++------ 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index 54b9dae02..f46faad01 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -1165,6 +1165,7 @@ class SOCIAL(DictEvent): class WEBSCREENSHOT(DictHostEvent): _always_emit = True + _quick_emit = True class AZURE_TENANT(DictEvent): diff --git a/bbot/modules/gowitness.py b/bbot/modules/gowitness.py index 5592eaa65..a335bf021 100644 --- a/bbot/modules/gowitness.py +++ b/bbot/modules/gowitness.py @@ -7,7 +7,7 @@ class gowitness(BaseModule): - watched_events = ["URL"] + watched_events = ["URL", "SOCIAL"] produced_events = ["WEBSCREENSHOT", "URL", "URL_UNVERIFIED", "TECHNOLOGY"] flags = ["active", "safe", "web-screenshots"] meta = {"description": "Take screenshots of webpages"} @@ -76,9 +76,8 @@ class gowitness(BaseModule): }, ] _batch_size = 100 - # visit up to and including the scan's configured search distance plus one - # this is one hop further than the default - scope_distance_modifier = 1 + # gowitness accepts SOCIAL events up to distance 2, otherwise it is in-scope-only + scope_distance_modifier = 2 async def setup(self): self.timeout = self.config.get("timeout", 10) @@ -120,12 +119,21 @@ async def filter_event(self, event): # ignore events from self if event.type == "URL" and event.module == self: return False, "event is from self" + # Accept out-of-scope SOCIAL pages, but not URLs + if event.scope_distance > 0: + if event.type != "SOCIAL": + return False, "event is not in-scope" return True async def handle_batch(self, *events): self.prep() - stdin = "\n".join([str(e.data) for e in events]) - events = {e.data: e for e in events} + event_dict = {} + for e in events: + key = e.data + if e.type == "SOCIAL": + key = e.data["url"] + event_dict[key] = e + stdin = "\n".join(list(event_dict)) async for line in self.helpers.run_live(self.command, input=stdin): self.debug(line) @@ -136,7 +144,7 @@ async def handle_batch(self, *events): final_url = screenshot["final_url"] filename = screenshot["filename"] webscreenshot_data = {"filename": filename, "url": final_url} - source_event = events[url] + source_event = event_dict[url] await self.emit_event(webscreenshot_data, "WEBSCREENSHOT", source=source_event) # emit URLs @@ -147,7 +155,7 @@ async def handle_batch(self, *events): _id = row["url_id"] source_url = self.screenshots_taken[_id] - source_event = events[source_url] + source_event = event_dict[source_url] if self.helpers.is_spider_danger(source_event, url): tags.append("spider-danger") if url and url.startswith("http"): @@ -157,7 +165,7 @@ async def handle_batch(self, *events): for _, row in self.new_technologies.items(): source_id = row["url_id"] source_url = self.screenshots_taken[source_id] - source_event = events[source_url] + source_event = event_dict[source_url] technology = row["value"] tech_data = {"technology": technology, "url": source_url, "host": str(source_event.host)} await self.emit_event(tech_data, "TECHNOLOGY", source=source_event) diff --git a/bbot/test/test_step_2/module_tests/test_module_gowitness.py b/bbot/test/test_step_2/module_tests/test_module_gowitness.py index 35e0db799..89fed5da5 100644 --- a/bbot/test/test_step_2/module_tests/test_module_gowitness.py +++ b/bbot/test/test_step_2/module_tests/test_module_gowitness.py @@ -3,7 +3,7 @@ class TestGowitness(ModuleTestBase): targets = ["127.0.0.1:8888"] - modules_overrides = ["gowitness", "httpx"] + modules_overrides = ["gowitness", "httpx", "social", "excavate"] import shutil from pathlib import Path @@ -14,6 +14,7 @@ class TestGowitness(ModuleTestBase): async def setup_after_prep(self, module_test): respond_args = { "response_data": """BBOT is life + @@ -21,21 +22,37 @@ async def setup_after_prep(self, module_test): "headers": {"Server": "Apache/2.4.41 (Ubuntu)"}, } module_test.set_expect_requests(respond_args=respond_args) + request_args = dict(uri="/blacklanternsecurity") + respond_args = dict(response_data="blacklanternsecurity github") + module_test.set_expect_requests(request_args, respond_args) + + # monkeypatch social + old_emit_event = module_test.scan.modules["social"].emit_event + + async def new_emit_event(event_data, event_type, **kwargs): + if event_data["url"] == "https://github.com/blacklanternsecurity": + event_data["url"] = event_data["url"].replace("https://github.com", "http://127.0.0.1:8888") + await old_emit_event(event_data, event_type, **kwargs) + + module_test.monkeypatch.setattr(module_test.scan.modules["social"], "emit_event", new_emit_event) def check(self, module_test, events): screenshots_path = self.home_dir / "scans" / module_test.scan.name / "gowitness" / "screenshots" screenshots = list(screenshots_path.glob("*.png")) - assert screenshots, f"No .png files found at {screenshots_path}" - url = False - webscreenshot = False - technology = False - for event in events: - if event.type == "URL_UNVERIFIED": - url = True - elif event.type == "WEBSCREENSHOT": - webscreenshot = True - elif event.type == "TECHNOLOGY": - technology = True - assert url, "No URL emitted" - assert webscreenshot, "No WEBSCREENSHOT emitted" - assert technology, "No TECHNOLOGY emitted" + assert ( + len(screenshots) == 2 + ), f"{len(screenshots):,} .png files found at {screenshots_path}, should have been 2" + assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/"]) + assert 1 == len( + [e for e in events if e.type == "SOCIAL" and e.data["url"] == "http://127.0.0.1:8888/blacklanternsecurity"] + ) + assert 2 == len([e for e in events if e.type == "WEBSCREENSHOT"]) + assert 1 == len([e for e in events if e.type == "WEBSCREENSHOT" and e.data["url"] == "http://127.0.0.1:8888/"]) + assert 1 == len( + [ + e + for e in events + if e.type == "WEBSCREENSHOT" and e.data["url"] == "http://127.0.0.1:8888/blacklanternsecurity" + ] + ) + assert len([e for e in events if e.type == "TECHNOLOGY"])