diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index e27e6d228..002f77e95 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -67,8 +67,10 @@ def report(self, result, name, event, **kwargs): class URLExtractor(BaseExtractor): + url_path_regex = r"((?:\w|\d)(?:[\d\w-]+\.?)+(?::\d{1,5})?(?:/[-\w\.\(\)]+)*/?)" regexes = { - "fullurl": r"(?i)" + r"(\w{2,15})://((?:\w|\d)(?:[\d\w-]+\.?)+(?::\d{1,5})?(?:/[-\w\.\(\)]+)*/?)", + "fulluri": r"(?i)" + r"([a-z]\w{1,15})://" + url_path_regex, + "fullurl": r"(?i)" + r"(https?)://" + url_path_regex, "a-tag": r"]*?\s+)?href=([\"'])(.*?)\1", "script-tag": r"]*?\s+)?src=([\"'])(.*?)\1", } @@ -119,7 +121,7 @@ async def _search(self, content, event, **kwargs): # yield to event loop await self.excavate.helpers.sleep(0) for result in regex.findall(content): - if name == "fullurl": + if name.startswith("full"): protocol, other = result result = f"{protocol}://{other}" @@ -145,7 +147,14 @@ async def _search(self, content, event, **kwargs): yield result, name def report(self, result, name, event, **kwargs): - parsed_uri = self.excavate.helpers.urlparse(result) + parsed_uri = None + try: + parsed_uri = self.excavate.helpers.urlparse(result) + except Exception as e: + self.excavate.debug(f"Error parsing URI {result}: {e}") + netloc = getattr(parsed_uri, "netloc", None) + if netloc is None: + return host, port = self.excavate.helpers.split_host_port(parsed_uri.netloc) # Handle non-HTTP URIs (ftp, s3, etc.) if not "http" in parsed_uri.scheme.lower(): diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 4ca750b4e..0add8aea6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -269,3 +269,14 @@ async def setup_before_prep(self, module_test): def check(self, module_test, events): assert any(e.data == "test.asdf.fakedomain" for e in events) + + +class TestExcavateURL(TestExcavate): + async def setup_before_prep(self, module_test): + module_test.httpserver.expect_request("/").respond_with_data( + "SomeSMooshedDATAhttps://asdffoo.test.notreal/some/path" + ) + + def check(self, module_test, events): + assert any(e.data == "asdffoo.test.notreal" for e in events) + assert any(e.data == "https://asdffoo.test.notreal/some/path" for e in events)