diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 51b8a4dc2..95a5a848f 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -4,7 +4,7 @@ import jwt as j from urllib.parse import urljoin -from bbot.core.helpers.regexes import _email_regex +from bbot.core.helpers.regexes import _email_regex, dns_name_regex from bbot.modules.internal.base import BaseInternalModule @@ -36,6 +36,24 @@ def report(self, result, name, event): pass +class CSPExtractor(BaseExtractor): + regexes = {"CSP": r"(?i)(?m)Content-Security-Policy:.+$"} + + def extract_domains(self, csp): + domains = dns_name_regex.findall(csp) + unique_domains = set(domains) + return unique_domains + + async def search(self, content, event, **kwargs): + async for csp, name in self._search(content, event, **kwargs): + extracted_domains = self.extract_domains(csp) + for domain in extracted_domains: + self.report(domain, event, **kwargs) + + def report(self, domain, event, **kwargs): + self.excavate.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate"]) + + class HostnameExtractor(BaseExtractor): regexes = {} @@ -297,6 +315,7 @@ class excavate(BaseInternalModule): scope_distance_modifier = None async def setup(self): + self.csp = CSPExtractor(self) self.hostname = HostnameExtractor(self) self.url = URLExtractor(self) self.email = EmailExtractor(self) @@ -306,7 +325,6 @@ async def setup(self): self.serialization = SerializationExtractor(self) self.functionality = FunctionalityExtractor(self) self.max_redirects = self.scan.config.get("http_max_redirects", 5) - return True async def search(self, source, extractors, event, **kwargs): @@ -369,7 +387,7 @@ async def handle_event(self, event): headers = self.helpers.recursive_decode(event.data.get("raw_header", "")) await self.search( headers, - [self.hostname, self.url, self.email, self.error_extractor, self.jwt, self.serialization], + [self.hostname, self.url, self.email, self.error_extractor, self.jwt, self.serialization, self.csp], event, consider_spider_danger=False, ) diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 1d74aa2ba..18d453b03 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -209,3 +209,15 @@ def check(self, module_test, events): url_data = [e.data for e in url_events if "spider-danger" not in e.tags] assert "http://127.0.0.1:8888/10" in url_data assert "http://127.0.0.1:8888/11" not in url_data + + +class TestExcavateCSP(TestExcavate): + csp_test_header = "default-src 'self'; script-src fake.domain.com; object-src 'none';" + + async def setup_before_prep(self, module_test): + expect_args = {"method": "GET", "uri": "/"} + respond_args = {"headers": {"Content-Security-Policy": self.csp_test_header}} + module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) + + def check(self, module_test, events): + assert any(e.data == "fake.domain.com" for e in events)