Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple API keys per service #1810

Merged
merged 13 commits into from
Oct 2, 2024
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,17 @@ For more information, see [Targets](https://www.blacklanternsecurity.com/bbot/St

Similar to Amass or Subfinder, BBOT supports API keys for various third-party services such as SecurityTrails, etc.

The standard way to do this is to enter your API keys in **`~/.config/bbot/bbot.yml`**:
The standard way to do this is to enter your API keys in **`~/.config/bbot/bbot.yml`**. Note that multiple API keys are allowed:
```yaml
modules:
shodan_dns:
api_key: 4f41243847da693a4f356c0486114bc6
c99:
api_key: 21a270d5f59c9b05813a72bb41707266
# multiple API keys
api_key:
- 21a270d5f59c9b05813a72bb41707266
- ea8f243d9885cf8ce9876a580224fd3c
- 5bc6ed268ab6488270e496d3183a1a27
virustotal:
api_key: dd5f0eee2e4a99b71a939bded450b246
securitytrails:
Expand Down
12 changes: 12 additions & 0 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2788,3 +2788,15 @@ def top_tcp_ports(n, as_string=False):
if as_string:
return ",".join([str(s) for s in top_ports])
return top_ports


class SafeDict(dict):
def __missing__(self, key):
return "{" + key + "}"


def safe_format(s, **kwargs):
"""
Format string while ignoring unused keys (prevents KeyError)
"""
return s.format_map(SafeDict(kwargs))
61 changes: 0 additions & 61 deletions bbot/core/helpers/web/web.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import re
import logging
import warnings
import traceback
from pathlib import Path
from bs4 import BeautifulSoup

Expand Down Expand Up @@ -288,66 +287,6 @@ async def wordlist(self, path, lines=None, **kwargs):
f.write(line)
return truncated_filename

async def api_page_iter(self, url, page_size=100, json=True, next_key=None, **requests_kwargs):
"""
An asynchronous generator function for iterating through paginated API data.

This function continuously makes requests to a specified API URL, incrementing the page number
or applying a custom pagination function, and yields the received data one page at a time.
It is well-suited for APIs that provide paginated results.

Args:
url (str): The initial API URL. Can contain placeholders for 'page', 'page_size', and 'offset'.
page_size (int, optional): The number of items per page. Defaults to 100.
json (bool, optional): If True, attempts to deserialize the response content to a JSON object. Defaults to True.
next_key (callable, optional): A function that takes the last page's data and returns the URL for the next page. Defaults to None.
**requests_kwargs: Arbitrary keyword arguments that will be forwarded to the HTTP request function.

Yields:
dict or httpx.Response: If 'json' is True, yields a dictionary containing the parsed JSON data. Otherwise, yields the raw HTTP response.

Note:
The loop will continue indefinitely unless manually stopped. Make sure to break out of the loop once the last page has been received.

Examples:
>>> agen = api_page_iter('https://api.example.com/data?page={page}&page_size={page_size}')
>>> try:
>>> async for page in agen:
>>> subdomains = page["subdomains"]
>>> self.hugesuccess(subdomains)
>>> if not subdomains:
>>> break
>>> finally:
>>> agen.aclose()
"""
page = 1
offset = 0
result = None
while 1:
if result and callable(next_key):
try:
new_url = next_key(result)
except Exception as e:
log.debug(f"Failed to extract next page of results from {url}: {e}")
log.debug(traceback.format_exc())
else:
new_url = url.format(page=page, page_size=page_size, offset=offset)
result = await self.request(new_url, **requests_kwargs)
if result is None:
log.verbose(f"api_page_iter() got no response for {url}")
break
try:
if json:
result = result.json()
yield result
except Exception:
log.warning(f'Error in api_page_iter() for url: "{new_url}"')
log.trace(traceback.format_exc())
break
finally:
offset += page_size
page += 1

async def curl(self, *args, **kwargs):
"""
An asynchronous function that runs a cURL command with specified arguments and options.
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/anubisdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class anubisdb(subdomain_enum):

async def request_url(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}"
return await self.request_with_fail_count(url)
return await self.api_request(url)

def abort_if_pre(self, hostname):
"""
Expand Down
173 changes: 148 additions & 25 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class BaseModule:

batch_wait (int): Seconds to wait before force-submitting a batch. Default is 10.

failed_request_abort_threshold (int): Threshold for setting error state after failed HTTP requests (only takes effect when `request_with_fail_count()` is used. Default is 5.
api_failure_abort_threshold (int): Threshold for setting error state after failed HTTP requests (only takes effect when `api_request()` is used. Default is 5.

_preserve_graph (bool): When set to True, accept events that may be duplicates but are necessary for construction of complete graph. Typically only enabled for output modules that need to maintain full chains of events, e.g. `neo4j` and `json`. Default is False.

Expand Down Expand Up @@ -103,7 +103,13 @@ class BaseModule:
_module_threads = 1
_batch_size = 1
batch_wait = 10
failed_request_abort_threshold = 5

# API retries, etc.
_api_retries = 2
# disable the module after this many failed attempts in a row
_api_failure_abort_threshold = 3
# sleep for this many seconds after being rate limited
_429_sleep_interval = 30

default_discovery_context = "{module} discovered {event.type}: {event.data}"

Expand Down Expand Up @@ -148,8 +154,10 @@ def __init__(self, scan):
# string constant
self._custom_filter_criteria_msg = "it did not meet custom filter criteria"

# track number of failures (for .request_with_fail_count())
self._request_failures = 0
self._api_keys = []

# track number of failures (for .api_request())
self._api_request_failures = 0

self._tasks = []
self._event_received = asyncio.Condition()
Expand Down Expand Up @@ -306,10 +314,37 @@ async def require_api_key(self):
self.hugesuccess(f"API is ready")
return True
except Exception as e:
self.trace(traceback.format_exc())
return None, f"Error with API ({str(e).strip()})"
else:
return None, "No API key set"

@property
def api_key(self):
if self._api_keys:
return self._api_keys[0]

@api_key.setter
def api_key(self, api_keys):
if isinstance(api_keys, str):
api_keys = [api_keys]
self._api_keys = list(api_keys)

def cycle_api_key(self):
if self._api_keys:
self.verbose(f"Cycling API key")
self._api_keys.insert(0, self._api_keys.pop())
else:
self.debug(f"No extra API keys to cycle")

@property
def api_retries(self):
return max(self._api_retries + 1, len(self._api_keys))

@property
def api_failure_abort_threshold(self):
return (self.api_retries * self._api_failure_abort_threshold) + 1

async def ping(self):
"""Asynchronously checks the health of the configured API.

Expand All @@ -318,7 +353,7 @@ async def ping(self):
Example Usage:
In your implementation, if the API has a "/ping" endpoint:
async def ping(self):
r = await self.request_with_fail_count(f"{self.base_url}/ping")
r = await self.api_request(f"{self.base_url}/ping")
resp_content = getattr(r, "text", "")
assert getattr(r, "status_code", 0) == 200, resp_content

Expand Down Expand Up @@ -1065,32 +1100,120 @@ async def run_process_live(self, *args, **kwargs):
async for line in self.helpers.run_live(*args, **kwargs):
yield line

async def request_with_fail_count(self, *args, **kwargs):
"""Asynchronously perform an HTTP request while keeping track of consecutive failures.
def prepare_api_request(self, url, kwargs):
"""
Prepare an API request by adding the necessary authentication - header, bearer token, etc.
"""
if self.api_key:
url = url.format(api_key=self.api_key)
if not "headers" in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.api_key}"
return url, kwargs

This function wraps the `self.helpers.request` method, incrementing a failure counter if
the request returns None. When the failure counter exceeds `self.failed_request_abort_threshold`,
the module is set to an error state.
async def api_request(self, *args, **kwargs):
"""
Makes an HTTP request while automatically:
- avoiding rate limits (sleep/retry)
- cycling API keys
- cancelling after too many failed attempts
"""
url = args[0] if args else kwargs.pop("url", "")

Args:
*args: Positional arguments to pass to `self.helpers.request`.
**kwargs: Keyword arguments to pass to `self.helpers.request`.
# loop until we have a successful request
for _ in range(self.api_retries):
if not "headers" in kwargs:
kwargs["headers"] = {}
new_url, kwargs = self.prepare_api_request(url, kwargs)
kwargs["url"] = new_url

Returns:
Any: The response object or None if the request failed.
r = await self.helpers.request(**kwargs)
success = False if r is None else r.is_success

if success:
self._api_request_failures = 0
else:
status_code = getattr(r, "status_code", 0)
self._api_request_failures += 1
if self._api_request_failures >= self.api_failure_abort_threshold:
self.set_error_state(
f"Setting error state due to {self._api_request_failures:,} failed HTTP requests"
)
else:
# sleep for a bit if we're being rate limited
if status_code == 429:
self.verbose(
f"Sleeping for {self._429_sleep_interval:,} seconds due to rate limit (HTTP status: 429)"
)
await asyncio.sleep(self._429_sleep_interval)
elif self._api_keys:
# if request failed, cycle API keys and try again
self.cycle_api_key()
continue
break

Raises:
None: Sets the module to an error state when the failure threshold is reached.
"""
r = await self.helpers.request(*args, **kwargs)
if r is None:
self._request_failures += 1
else:
self._request_failures = 0
if self._request_failures >= self.failed_request_abort_threshold:
self.set_error_state(f"Setting error state due to {self._request_failures:,} failed HTTP requests")
return r

async def api_page_iter(self, url, page_size=100, json=True, next_key=None, **requests_kwargs):
"""
An asynchronous generator function for iterating through paginated API data.

This function continuously makes requests to a specified API URL, incrementing the page number
or applying a custom pagination function, and yields the received data one page at a time.
It is well-suited for APIs that provide paginated results.

Args:
url (str): The initial API URL. Can contain placeholders for 'page', 'page_size', and 'offset'.
page_size (int, optional): The number of items per page. Defaults to 100.
json (bool, optional): If True, attempts to deserialize the response content to a JSON object. Defaults to True.
next_key (callable, optional): A function that takes the last page's data and returns the URL for the next page. Defaults to None.
**requests_kwargs: Arbitrary keyword arguments that will be forwarded to the HTTP request function.

Yields:
dict or httpx.Response: If 'json' is True, yields a dictionary containing the parsed JSON data. Otherwise, yields the raw HTTP response.

Note:
The loop will continue indefinitely unless manually stopped. Make sure to break out of the loop once the last page has been received.

Examples:
>>> agen = api_page_iter('https://api.example.com/data?page={page}&page_size={page_size}')
>>> try:
>>> async for page in agen:
>>> subdomains = page["subdomains"]
>>> self.hugesuccess(subdomains)
>>> if not subdomains:
>>> break
>>> finally:
>>> agen.aclose()
"""
page = 1
offset = 0
result = None
while 1:
if result and callable(next_key):
try:
new_url = next_key(result)
except Exception as e:
self.debug(f"Failed to extract next page of results from {url}: {e}")
self.debug(traceback.format_exc())
else:
new_url = self.helpers.safe_format(url, page=page, page_size=page_size, offset=offset)
result = await self.api_request(new_url, **requests_kwargs)
if result is None:
self.verbose(f"api_page_iter() got no response for {url}")
break
try:
if json:
result = result.json()
yield result
except Exception:
self.warning(f'Error in api_page_iter() for url: "{new_url}"')
self.trace(traceback.format_exc())
break
finally:
offset += page_size
page += 1

@property
def preset(self):
return self.scan.preset
Expand Down
9 changes: 6 additions & 3 deletions bbot/modules/bevigil.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ class bevigil(subdomain_enum_apikey):

async def setup(self):
self.api_key = self.config.get("api_key", "")
self.headers = {"X-Access-Token": self.api_key}
self.urls = self.config.get("urls", False)
return await super().setup()

def prepare_api_request(self, url, kwargs):
kwargs["headers"]["X-Access-Token"] = self.api_key
return url, kwargs

async def ping(self):
pass

Expand Down Expand Up @@ -54,11 +57,11 @@ async def handle_event(self, event):

async def request_subdomains(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}/subdomains/"
return await self.request_with_fail_count(url, headers=self.headers)
return await self.api_request(url)

async def request_urls(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}/urls/"
return await self.request_with_fail_count(url, headers=self.headers)
return await self.api_request(url)

def parse_subdomains(self, r, query=None):
results = set()
Expand Down
9 changes: 6 additions & 3 deletions bbot/modules/binaryedge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ class binaryedge(subdomain_enum_apikey):

async def setup(self):
self.max_records = self.config.get("max_records", 1000)
self.headers = {"X-Key": self.config.get("api_key", "")}
return await super().setup()

def prepare_api_request(self, url, kwargs):
kwargs["headers"]["X-Key"] = self.api_key
return url, kwargs

async def ping(self):
url = f"{self.base_url}/user/subscription"
j = (await self.request_with_fail_count(url, headers=self.headers)).json()
j = (await self.api_request(url)).json()
assert j.get("requests_left", 0) > 0

async def request_url(self, query):
# todo: host query (certs + services)
url = f"{self.base_url}/query/domains/subdomain/{self.helpers.quote(query)}"
return await self.request_with_fail_count(url, headers=self.headers)
return await self.api_request(url)

def parse_results(self, r, query):
j = r.json()
Expand Down
Loading
Loading