From f0f0fdc1aa17c52c14d2dbce9d361f089d1501b8 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Thu, 21 Dec 2023 20:23:19 -0500 Subject: [PATCH] wrote tests, shodan_port --> internetdb --- bbot/modules/internetdb.py | 121 ++++++++++++++++++ bbot/modules/shodan_dns.py | 10 +- bbot/modules/shodan_port.py | 109 ---------------- bbot/modules/templates/github.py | 6 +- bbot/modules/templates/shodan.py | 34 +++++ .../module_tests/test_module_internetdb.py | 55 ++++++++ .../module_tests/test_module_shodan_dns.py | 2 +- 7 files changed, 216 insertions(+), 121 deletions(-) create mode 100644 bbot/modules/internetdb.py delete mode 100644 bbot/modules/shodan_port.py create mode 100644 bbot/modules/templates/shodan.py create mode 100644 bbot/test/test_step_2/module_tests/test_module_internetdb.py diff --git a/bbot/modules/internetdb.py b/bbot/modules/internetdb.py new file mode 100644 index 000000000..a220867e1 --- /dev/null +++ b/bbot/modules/internetdb.py @@ -0,0 +1,121 @@ +from bbot.modules.base import BaseModule + + +class internetdb(BaseModule): + """ + Query IP in Shodan InternetDB, returning open ports, discovered technologies, and findings/vulnerabilities + API reference: https://internetdb.shodan.io/docs + + Example API response: + + { + "cpes": [ + "cpe:/a:microsoft:internet_information_services", + "cpe:/a:microsoft:outlook_web_access:15.0.1367", + ], + "hostnames": [ + "autodiscover.evilcorp.com", + "mail.evilcorp.com", + ], + "ip": "1.2.3.4", + "ports": [ + 25, + 80, + 443, + ], + "tags": [ + "starttls", + "self-signed", + "eol-os" + ], + "vulns": [ + "CVE-2021-26857", + "CVE-2021-26855" + ] + } + """ + + watched_events = ["IP_ADDRESS", "DNS_NAME"] + produced_events = ["TECHNOLOGY", "VULNERABILITY", "FINDING", "OPEN_TCP_PORT", "DNS_NAME"] + flags = ["passive", "safe", "portscan", "subdomain-enum"] + meta = {"description": "Query Shodan's InternetDB for open ports, hostnames, technologies, and vulnerabilities"} + + # limit outgoing queue size to help avoid rate limiting + _qsize = 1 + + base_url = "https://internetdb.shodan.io" + + async def setup(self): + self.processed = set() + return True + + async def filter_event(self, event): + ip = self.get_ip(event) + if ip: + ip_hash = hash(ip) + if ip_hash in self.processed: + return False, "IP was already processed" + self.processed.add(ip_hash) + return True + return False, "event had no valid IP addresses" + + async def handle_event(self, event): + ip = self.get_ip(event) + if ip is None: + return + url = f"{self.base_url}/{ip}" + r = await self.request_with_fail_count(url) + if r is None: + self.debug(f"No response for {event.data}") + return + try: + data = r.json() + except Exception as e: + self.verbose(f"Error parsing JSON response from {url}: {e}") + self.trace() + return + if data: + if r.status_code == 200: + self._parse_response(data=data, event=event) + elif r.status_code == 404: + detail = data.get("detail", "") + if detail: + self.debug(f"404 response for {url}: {detail}") + else: + err_data = data.get("type", "") + err_msg = data.get("msg", "") + self.verbose(f"Shodan error for {ip}: {err_data}: {err_msg}") + + def _parse_response(self, data: dict, event): + """Handles emitting events from returned JSON""" + data: dict # has keys: cpes, hostnames, ip, ports, tags, vulns + # ip is a string, ports is a list of ports, the rest is a list of strings + for hostname in data.get("hostnames", []): + self.emit_event(hostname, "DNS_NAME", source=event) + for cpe in data.get("cpes", []): + self.emit_event({"technology": cpe, "host": str(event.host)}, "TECHNOLOGY", source=event) + for port in data.get("ports", []): + self.emit_event(self.helpers.make_netloc(event.data, port), "OPEN_TCP_PORT", source=event) + for vuln in data.get("vulns", []): + self.emit_event( + {"description": f"Shodan reported verified vulnerability: {vuln}", "host": str(event.host)}, + "FINDING", + source=event, + ) + + def get_ip(self, event): + """ + Get the first available IP address from an event (IP_ADDRESS or DNS_NAME) + """ + if event.type == "IP_ADDRESS": + return event.host + elif event.type == "DNS_NAME": + # always try IPv4 first + ipv6 = [] + for host in event.resolved_hosts: + if self.helpers.is_ip(host, version=4): + return host + elif self.helpers.is_ip(host, version=6): + ipv6.append(host) + for ip in ipv6: + return ip diff --git a/bbot/modules/shodan_dns.py b/bbot/modules/shodan_dns.py index 7780120b6..4d19b237e 100644 --- a/bbot/modules/shodan_dns.py +++ b/bbot/modules/shodan_dns.py @@ -1,7 +1,7 @@ -from bbot.modules.templates.subdomain_enum import subdomain_enum_apikey +from bbot.modules.templates.shodan import shodan -class shodan_dns(subdomain_enum_apikey): +class shodan_dns(shodan): watched_events = ["DNS_NAME"] produced_events = ["DNS_NAME"] flags = ["subdomain-enum", "passive", "safe"] @@ -11,12 +11,6 @@ class shodan_dns(subdomain_enum_apikey): base_url = "https://api.shodan.io" - async def ping(self): - url = f"{self.base_url}/api-info?key={self.api_key}" - r = await self.request_with_fail_count(url) - resp_content = getattr(r, "text", "") - assert getattr(r, "status_code", 0) == 200, resp_content - async def request_url(self, query): url = f"{self.base_url}/dns/domain/{self.helpers.quote(query)}?key={self.api_key}" response = await self.request_with_fail_count(url) diff --git a/bbot/modules/shodan_port.py b/bbot/modules/shodan_port.py deleted file mode 100644 index a5b8a508c..000000000 --- a/bbot/modules/shodan_port.py +++ /dev/null @@ -1,109 +0,0 @@ -from bbot.modules.base import BaseModule - - -class shodan_port(BaseModule): - """ - Query IP in Shodan, returning open ports, discovered technologies, and findings/vulnerabilities - API reference: https://developer.shodan.io/api - """ - - watched_events = ["IP_ADDRESS"] - produced_events = ["TECHNOLOGY", "VULNERABILITY", "FINDING", "OPEN_TCP_PORT", "DNS_NAME"] - flags = ["passive", "safe", "portscan"] - meta = {"description": "Query Shodan for open ports", "auth_required": True} - options = {"api_key": ""} - options_desc = {"api_key": "Shodan API key"} - base_url = "https://api.shodan.io" - scope_distance_modifier = 1 - - @staticmethod - def _severity_lookup(f: float) -> str: - """ - Takes a CVSS v3 base score as input and returns the severity rating - """ - if f >= 9.0: - return "CRITICAL" - elif 9.0 > f >= 7.0: - return "HIGH" - elif 7.0 > f >= 4.0: - return "MEDIUM" - elif 4.0 > f > 0: - return "LOW" - return "NONE" - - def _parse_response(self, data: dict, event): - """Handles emiting events from returned JSON""" - data: list[dict] - found_domains: set[str] = set() - # Decrease scope distance for ports since ports are directly connected to the host - event.scope_distance = event.scope_distance - 1 - for datum in data: - if port_number := datum.get("port"): - netloc = self.helpers.make_netloc(event.data, port_number) - # If TCP, report up - if datum.get("transport") == "tcp": - self.emit_event(netloc, event_type="OPEN_TCP_PORT", source=event) - # Check for vulns - if vulns := datum.get("vulns"): - for cve, vuln_info in vulns.items(): - vuln_data = { - "cve": cve, - "cvss": vuln_info.get("cvss", 0), - "host": str(event.host), - "netloc": netloc, - "verified": vuln_info.get("verified", False), - "description": "", - "port": port_number, - "severity": shodan_port._severity_lookup(vuln_info["cvss"]), - } - if vuln_info["verified"]: - vuln_data["description"] = f"Shodan reported verified CVE {cve}" - self.emit_event( - vuln_data, - event_type="VULNERABILITY", - source=event, - severity=vuln_data["severity"], - ) - else: - vuln_data["description"] = f"Shodan reported unverified CVE {cve}" - self.emit_event( - vuln_data, - event_type="FINDING", - source=event, - ) - - # check for tech - os and cpe - if os := datum.get("os"): - self.emit_event({"technology": os, "host": str(event.host), "type": "os"}, "TECHNOLOGY", event) - if cpes := datum.get("cpe23"): - for cpe in cpes: - tech_data = {"technology": cpe, "host": str(event.host), "type": "cpe"} - self.emit_event(tech_data, "TECHNOLOGY", source=event) - # check for domains - if domains := datum.get("domains"): - for domain in domains: - found_domains.add(domain) - if hostnames := datum.get("hostnames"): - for hostname in hostnames: - found_domains.add(hostname) - for dns_name in found_domains: - self.emit_event(dns_name, event_type="DNS_NAME", source=event) - - async def handle_event(self, event): - url = f"{self.base_url}/shodan/host/{event.data}" - r = await self.helpers.request(f"{url}?key={self.api_key}") - if r is None: - self.debug(f"No response for {event.data}") - return - try: - json = r.json() - except Exception: - return - if data := json.get("data"): - # Iterate over each port. Inside the ports, we can get: - # domains, hostnames, os, transport (tcp or udp), vulns - self._parse_response(data=data, event=event) - - async def setup(self): - await super().setup() - return await self.require_api_key() diff --git a/bbot/modules/templates/github.py b/bbot/modules/templates/github.py index bf5b43c6f..8441c1009 100644 --- a/bbot/modules/templates/github.py +++ b/bbot/modules/templates/github.py @@ -21,15 +21,15 @@ async def setup(self): self.api_key = api_key self.headers = {"Authorization": f"token {self.api_key}"} break + if not self.api_key: + if self.auth_required: + return None, "No API key set" try: await self.ping() self.hugesuccess(f"API is ready") return True except Exception as e: return None, f"Error with API ({str(e).strip()})" - if not self.api_key: - if self.auth_required: - return None, "No API key set" return True async def ping(self): diff --git a/bbot/modules/templates/shodan.py b/bbot/modules/templates/shodan.py new file mode 100644 index 000000000..f439bfd9d --- /dev/null +++ b/bbot/modules/templates/shodan.py @@ -0,0 +1,34 @@ +from bbot.modules.templates.subdomain_enum import subdomain_enum + + +class shodan(subdomain_enum): + options = {"api_key": ""} + options_desc = {"api_key": "Shodan API key"} + + base_url = "https://api.shodan.io" + + async def setup(self): + await super().setup() + self.api_key = None + for module_name in ("shodan", "shodan_dns", "shodan_port"): + module_config = self.scan.config.get("modules", {}).get(module_name, {}) + api_key = module_config.get("api_key", "") + if api_key: + self.api_key = api_key + break + if not self.api_key: + if self.auth_required: + return None, "No API key set" + try: + await self.ping() + self.hugesuccess(f"API is ready") + return True + except Exception as e: + return None, f"Error with API ({str(e).strip()})" + return True + + async def ping(self): + url = f"{self.base_url}/api-info?key={self.api_key}" + r = await self.request_with_fail_count(url) + resp_content = getattr(r, "text", "") + assert getattr(r, "status_code", 0) == 200, resp_content diff --git a/bbot/test/test_step_2/module_tests/test_module_internetdb.py b/bbot/test/test_step_2/module_tests/test_module_internetdb.py new file mode 100644 index 000000000..b3ccde4ac --- /dev/null +++ b/bbot/test/test_step_2/module_tests/test_module_internetdb.py @@ -0,0 +1,55 @@ +from .base import ModuleTestBase + + +class TestInternetDB(ModuleTestBase): + config_overrides = {"dns_resolution": True} + + async def setup_before_prep(self, module_test): + module_test.scan.helpers.mock_dns( + { + ("blacklanternsecurity.com", "A"): "1.2.3.4", + ("autodiscover.blacklanternsecurity.com", "A"): "2.3.4.5", + ("mail.blacklanternsecurity.com", "A"): "3.4.5.6", + } + ) + + module_test.httpx_mock.add_response( + url="https://internetdb.shodan.io/1.2.3.4", + json={ + "cpes": [ + "cpe:/a:microsoft:internet_information_services", + "cpe:/a:microsoft:outlook_web_access:15.0.1367", + ], + "hostnames": [ + "autodiscover.blacklanternsecurity.com", + "mail.blacklanternsecurity.com", + ], + "ip": "1.2.3.4", + "ports": [ + 25, + 80, + 443, + ], + "tags": ["starttls", "self-signed", "eol-os"], + "vulns": ["CVE-2021-26857", "CVE-2021-26855"], + }, + ) + + def check(self, module_test, events): + assert 9 == len([e for e in events if str(e.module) == "internetdb"]) + assert 1 == len( + [e for e in events if e.type == "DNS_NAME" and e.data == "autodiscover.blacklanternsecurity.com"] + ) + assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "mail.blacklanternsecurity.com"]) + assert 3 == len([e for e in events if e.type == "OPEN_TCP_PORT" and str(e.module) == "internetdb"]) + assert 1 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "blacklanternsecurity.com:443"]) + assert 2 == len([e for e in events if e.type == "FINDING" and str(e.module) == "internetdb"]) + assert 1 == len([e for e in events if e.type == "FINDING" and "CVE-2021-26857" in e.data["description"]]) + assert 2 == len([e for e in events if e.type == "TECHNOLOGY" and str(e.module) == "internetdb"]) + assert 1 == len( + [ + e + for e in events + if e.type == "TECHNOLOGY" and e.data["technology"] == "cpe:/a:microsoft:outlook_web_access:15.0.1367" + ] + ) diff --git a/bbot/test/test_step_2/module_tests/test_module_shodan_dns.py b/bbot/test/test_step_2/module_tests/test_module_shodan_dns.py index 90a93fe9b..448158572 100644 --- a/bbot/test/test_step_2/module_tests/test_module_shodan_dns.py +++ b/bbot/test/test_step_2/module_tests/test_module_shodan_dns.py @@ -2,7 +2,7 @@ class TestShodan_DNS(ModuleTestBase): - config_overrides = {"modules": {"shodan_dns": {"api_key": "asdf"}}} + config_overrides = {"modules": {"shodan": {"api_key": "asdf"}}} async def setup_before_prep(self, module_test): module_test.httpx_mock.add_response(