-
Notifications
You must be signed in to change notification settings - Fork 561
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
wrote tests, shodan_port --> internetdb
- Loading branch information
1 parent
899534c
commit f0f0fdc
Showing
7 changed files
with
216 additions
and
121 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
from bbot.modules.base import BaseModule | ||
|
||
|
||
class internetdb(BaseModule): | ||
""" | ||
Query IP in Shodan InternetDB, returning open ports, discovered technologies, and findings/vulnerabilities | ||
API reference: https://internetdb.shodan.io/docs | ||
Example API response: | ||
{ | ||
"cpes": [ | ||
"cpe:/a:microsoft:internet_information_services", | ||
"cpe:/a:microsoft:outlook_web_access:15.0.1367", | ||
], | ||
"hostnames": [ | ||
"autodiscover.evilcorp.com", | ||
"mail.evilcorp.com", | ||
], | ||
"ip": "1.2.3.4", | ||
"ports": [ | ||
25, | ||
80, | ||
443, | ||
], | ||
"tags": [ | ||
"starttls", | ||
"self-signed", | ||
"eol-os" | ||
], | ||
"vulns": [ | ||
"CVE-2021-26857", | ||
"CVE-2021-26855" | ||
] | ||
} | ||
""" | ||
|
||
watched_events = ["IP_ADDRESS", "DNS_NAME"] | ||
produced_events = ["TECHNOLOGY", "VULNERABILITY", "FINDING", "OPEN_TCP_PORT", "DNS_NAME"] | ||
flags = ["passive", "safe", "portscan", "subdomain-enum"] | ||
meta = {"description": "Query Shodan's InternetDB for open ports, hostnames, technologies, and vulnerabilities"} | ||
|
||
# limit outgoing queue size to help avoid rate limiting | ||
_qsize = 1 | ||
|
||
base_url = "https://internetdb.shodan.io" | ||
|
||
async def setup(self): | ||
self.processed = set() | ||
return True | ||
|
||
async def filter_event(self, event): | ||
ip = self.get_ip(event) | ||
if ip: | ||
ip_hash = hash(ip) | ||
if ip_hash in self.processed: | ||
return False, "IP was already processed" | ||
self.processed.add(ip_hash) | ||
return True | ||
return False, "event had no valid IP addresses" | ||
|
||
async def handle_event(self, event): | ||
ip = self.get_ip(event) | ||
if ip is None: | ||
return | ||
url = f"{self.base_url}/{ip}" | ||
r = await self.request_with_fail_count(url) | ||
if r is None: | ||
self.debug(f"No response for {event.data}") | ||
return | ||
try: | ||
data = r.json() | ||
except Exception as e: | ||
self.verbose(f"Error parsing JSON response from {url}: {e}") | ||
self.trace() | ||
return | ||
if data: | ||
if r.status_code == 200: | ||
self._parse_response(data=data, event=event) | ||
elif r.status_code == 404: | ||
detail = data.get("detail", "") | ||
if detail: | ||
self.debug(f"404 response for {url}: {detail}") | ||
else: | ||
err_data = data.get("type", "") | ||
err_msg = data.get("msg", "") | ||
self.verbose(f"Shodan error for {ip}: {err_data}: {err_msg}") | ||
|
||
def _parse_response(self, data: dict, event): | ||
"""Handles emitting events from returned JSON""" | ||
data: dict # has keys: cpes, hostnames, ip, ports, tags, vulns | ||
# ip is a string, ports is a list of ports, the rest is a list of strings | ||
for hostname in data.get("hostnames", []): | ||
self.emit_event(hostname, "DNS_NAME", source=event) | ||
for cpe in data.get("cpes", []): | ||
self.emit_event({"technology": cpe, "host": str(event.host)}, "TECHNOLOGY", source=event) | ||
for port in data.get("ports", []): | ||
self.emit_event(self.helpers.make_netloc(event.data, port), "OPEN_TCP_PORT", source=event) | ||
for vuln in data.get("vulns", []): | ||
self.emit_event( | ||
{"description": f"Shodan reported verified vulnerability: {vuln}", "host": str(event.host)}, | ||
"FINDING", | ||
source=event, | ||
) | ||
|
||
def get_ip(self, event): | ||
""" | ||
Get the first available IP address from an event (IP_ADDRESS or DNS_NAME) | ||
""" | ||
if event.type == "IP_ADDRESS": | ||
return event.host | ||
elif event.type == "DNS_NAME": | ||
# always try IPv4 first | ||
ipv6 = [] | ||
for host in event.resolved_hosts: | ||
if self.helpers.is_ip(host, version=4): | ||
return host | ||
elif self.helpers.is_ip(host, version=6): | ||
ipv6.append(host) | ||
for ip in ipv6: | ||
return ip |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from bbot.modules.templates.subdomain_enum import subdomain_enum | ||
|
||
|
||
class shodan(subdomain_enum): | ||
options = {"api_key": ""} | ||
options_desc = {"api_key": "Shodan API key"} | ||
|
||
base_url = "https://api.shodan.io" | ||
|
||
async def setup(self): | ||
await super().setup() | ||
self.api_key = None | ||
for module_name in ("shodan", "shodan_dns", "shodan_port"): | ||
module_config = self.scan.config.get("modules", {}).get(module_name, {}) | ||
api_key = module_config.get("api_key", "") | ||
if api_key: | ||
self.api_key = api_key | ||
break | ||
if not self.api_key: | ||
if self.auth_required: | ||
return None, "No API key set" | ||
try: | ||
await self.ping() | ||
self.hugesuccess(f"API is ready") | ||
return True | ||
except Exception as e: | ||
return None, f"Error with API ({str(e).strip()})" | ||
return True | ||
|
||
async def ping(self): | ||
url = f"{self.base_url}/api-info?key={self.api_key}" | ||
r = await self.request_with_fail_count(url) | ||
resp_content = getattr(r, "text", "") | ||
assert getattr(r, "status_code", 0) == 200, resp_content |
55 changes: 55 additions & 0 deletions
55
bbot/test/test_step_2/module_tests/test_module_internetdb.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from .base import ModuleTestBase | ||
|
||
|
||
class TestInternetDB(ModuleTestBase): | ||
config_overrides = {"dns_resolution": True} | ||
|
||
async def setup_before_prep(self, module_test): | ||
module_test.scan.helpers.mock_dns( | ||
{ | ||
("blacklanternsecurity.com", "A"): "1.2.3.4", | ||
("autodiscover.blacklanternsecurity.com", "A"): "2.3.4.5", | ||
("mail.blacklanternsecurity.com", "A"): "3.4.5.6", | ||
} | ||
) | ||
|
||
module_test.httpx_mock.add_response( | ||
url="https://internetdb.shodan.io/1.2.3.4", | ||
json={ | ||
"cpes": [ | ||
"cpe:/a:microsoft:internet_information_services", | ||
"cpe:/a:microsoft:outlook_web_access:15.0.1367", | ||
], | ||
"hostnames": [ | ||
"autodiscover.blacklanternsecurity.com", | ||
"mail.blacklanternsecurity.com", | ||
], | ||
"ip": "1.2.3.4", | ||
"ports": [ | ||
25, | ||
80, | ||
443, | ||
], | ||
"tags": ["starttls", "self-signed", "eol-os"], | ||
"vulns": ["CVE-2021-26857", "CVE-2021-26855"], | ||
}, | ||
) | ||
|
||
def check(self, module_test, events): | ||
assert 9 == len([e for e in events if str(e.module) == "internetdb"]) | ||
assert 1 == len( | ||
[e for e in events if e.type == "DNS_NAME" and e.data == "autodiscover.blacklanternsecurity.com"] | ||
) | ||
assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "mail.blacklanternsecurity.com"]) | ||
assert 3 == len([e for e in events if e.type == "OPEN_TCP_PORT" and str(e.module) == "internetdb"]) | ||
assert 1 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "blacklanternsecurity.com:443"]) | ||
assert 2 == len([e for e in events if e.type == "FINDING" and str(e.module) == "internetdb"]) | ||
assert 1 == len([e for e in events if e.type == "FINDING" and "CVE-2021-26857" in e.data["description"]]) | ||
assert 2 == len([e for e in events if e.type == "TECHNOLOGY" and str(e.module) == "internetdb"]) | ||
assert 1 == len( | ||
[ | ||
e | ||
for e in events | ||
if e.type == "TECHNOLOGY" and e.data["technology"] == "cpe:/a:microsoft:outlook_web_access:15.0.1367" | ||
] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters