From 5170e874866772292274c6035b8e815379446b55 Mon Sep 17 00:00:00 2001 From: liquidsec Date: Sun, 15 Oct 2023 17:29:18 -0400 Subject: [PATCH 1/3] excavate CSP submodule functional --- bbot/modules/internal/excavate.py | 25 ++++++++++++++++--- .../module_tests/test_module_excavate.py | 12 +++++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 51b8a4dc2..56cb92625 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -4,7 +4,7 @@ import jwt as j from urllib.parse import urljoin -from bbot.core.helpers.regexes import _email_regex +from bbot.core.helpers.regexes import _email_regex, dns_name_regex from bbot.modules.internal.base import BaseInternalModule @@ -36,6 +36,25 @@ def report(self, result, name, event): pass +class CSPExtractor(BaseExtractor): + regexes = {"CSP": r"(?i)(?m)Content-Security-Policy:.+$"} + + def extract_domains(self, csp): + domains = dns_name_regex.findall(csp) + unique_domains = set(domains) + return unique_domains + + async def search(self, content, event, **kwargs): + results = set() + async for csp, name in self._search(content, event, **kwargs): + extracted_domains = self.extract_domains(csp) + for domain in extracted_domains: + self.report(domain, event, **kwargs) + + def report(self, domain, event, **kwargs): + self.excavate.emit_event(domain, "DNS_NAME_UNRESOLVED", source=event) + + class HostnameExtractor(BaseExtractor): regexes = {} @@ -297,6 +316,7 @@ class excavate(BaseInternalModule): scope_distance_modifier = None async def setup(self): + self.csp = CSPExtractor(self) self.hostname = HostnameExtractor(self) self.url = URLExtractor(self) self.email = EmailExtractor(self) @@ -306,7 +326,6 @@ async def setup(self): self.serialization = SerializationExtractor(self) self.functionality = FunctionalityExtractor(self) self.max_redirects = self.scan.config.get("http_max_redirects", 5) - return True async def search(self, source, extractors, event, **kwargs): @@ -369,7 +388,7 @@ async def handle_event(self, event): headers = self.helpers.recursive_decode(event.data.get("raw_header", "")) await self.search( headers, - [self.hostname, self.url, self.email, self.error_extractor, self.jwt, self.serialization], + [self.hostname, self.url, self.email, self.error_extractor, self.jwt, self.serialization, self.csp], event, consider_spider_danger=False, ) diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 1d74aa2ba..18d453b03 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -209,3 +209,15 @@ def check(self, module_test, events): url_data = [e.data for e in url_events if "spider-danger" not in e.tags] assert "http://127.0.0.1:8888/10" in url_data assert "http://127.0.0.1:8888/11" not in url_data + + +class TestExcavateCSP(TestExcavate): + csp_test_header = "default-src 'self'; script-src fake.domain.com; object-src 'none';" + + async def setup_before_prep(self, module_test): + expect_args = {"method": "GET", "uri": "/"} + respond_args = {"headers": {"Content-Security-Policy": self.csp_test_header}} + module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) + + def check(self, module_test, events): + assert any(e.data == "fake.domain.com" for e in events) From 864c933f166b9c221830f23a2f8c55ab0b644134 Mon Sep 17 00:00:00 2001 From: liquidsec Date: Sun, 15 Oct 2023 17:36:41 -0400 Subject: [PATCH 2/3] removed uncessary set --- bbot/modules/internal/excavate.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 56cb92625..296cb5a59 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -45,7 +45,6 @@ def extract_domains(self, csp): return unique_domains async def search(self, content, event, **kwargs): - results = set() async for csp, name in self._search(content, event, **kwargs): extracted_domains = self.extract_domains(csp) for domain in extracted_domains: From 663cf444978585561f6ebfec5e0f55ef366f09e0 Mon Sep 17 00:00:00 2001 From: liquidsec Date: Thu, 19 Oct 2023 11:27:47 -0400 Subject: [PATCH 3/3] emitting DNS_NAME instead --- bbot/modules/internal/excavate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 296cb5a59..95a5a848f 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -51,7 +51,7 @@ async def search(self, content, event, **kwargs): self.report(domain, event, **kwargs) def report(self, domain, event, **kwargs): - self.excavate.emit_event(domain, "DNS_NAME_UNRESOLVED", source=event) + self.excavate.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"]) class HostnameExtractor(BaseExtractor):