diff --git a/bbot/modules/output/emails.py b/bbot/modules/output/emails.py new file mode 100644 index 0000000000..029bc5aca7 --- /dev/null +++ b/bbot/modules/output/emails.py @@ -0,0 +1,25 @@ +from bbot.modules.base import BaseModule +from bbot.modules.output.human import Human + + +class Emails(Human): + watched_events = ["EMAIL_ADDRESS"] + flags = ["email-enum"] + meta = {"description": "Output any email addresses found belonging to the target domain"} + options = {"output_file": ""} + options_desc = {"output_file": "Output to file"} + in_scope_only = True + + output_filename = "emails.txt" + + def _scope_distance_check(self, event): + return BaseModule._scope_distance_check(self, event) + + async def handle_event(self, event): + if self.file is not None: + self.file.write(f"{event.data}\n") + self.file.flush() + + async def report(self): + if getattr(self, "_file", None) is not None: + self.info(f"Saved email addresses to {self.output_file}") diff --git a/bbot/modules/output/subdomains.py b/bbot/modules/output/subdomains.py index 819018d65a..49dea2db8f 100644 --- a/bbot/modules/output/subdomains.py +++ b/bbot/modules/output/subdomains.py @@ -1,5 +1,3 @@ -from contextlib import suppress - from bbot.modules.base import BaseModule from bbot.modules.output.human import Human @@ -32,11 +30,6 @@ async def handle_event(self, event): self.file.write(f"{event.data}\n") self.file.flush() - async def cleanup(self): - if getattr(self, "_file", None) is not None: - with suppress(Exception): - self.file.close() - async def report(self): if getattr(self, "_file", None) is not None: self.info(f"Saved subdomains to {self.output_file}") diff --git a/bbot/test/test_step_2/module_tests/test_module_emails.py b/bbot/test/test_step_2/module_tests/test_module_emails.py new file mode 100644 index 0000000000..44b9ab0787 --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_emails.py @@ -0,0 +1,23 @@ +from .base import ModuleTestBase + + +class TestEmais(ModuleTestBase): + modules_overrides = ["emails", "emailformat", "skymem"] + + async def setup_before_prep(self, module_test): + module_test.httpx_mock.add_response( + url="https://www.email-format.com/d/blacklanternsecurity.com/", + text="
info@blacklanternsecurity.com
", + ) + module_test.httpx_mock.add_response( + url="https://www.skymem.info/srch?q=blacklanternsecurity.com", + text="info@blacklanternsecurity.com
", + ) + + def check(self, module_test, events): + assert 2 == len([e for e in events if e.data == "info@blacklanternsecurity.com"]) + email_file = module_test.scan.home / "emails.txt" + emails = open(email_file).read().splitlines() + # make sure deduping works as intended + assert len(emails) == 1 + assert set(emails) == {"info@blacklanternsecurity.com"}