diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 507b7ac54..dbd9d53e3 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -48,7 +48,7 @@ jobs:
poetry install
- name: Run tests
run: |
- poetry run pytest --exitfirst --reruns 2 -o timeout_func_only=true --timeout 1200 --disable-warnings --log-cli-level=INFO --cov-config=bbot/test/coverage.cfg --cov-report xml:cov.xml --cov=bbot .
+ poetry run pytest -vv --exitfirst --reruns 2 -o timeout_func_only=true --timeout 1200 --disable-warnings --log-cli-level=INFO --cov-config=bbot/test/coverage.cfg --cov-report xml:cov.xml --cov=bbot .
- name: Upload Debug Logs
uses: actions/upload-artifact@v3
with:
diff --git a/bbot/cli.py b/bbot/cli.py
index 877f2bcaa..4e2ce39a8 100755
--- a/bbot/cli.py
+++ b/bbot/cli.py
@@ -174,7 +174,7 @@ async def _main():
if sys.stdin.isatty():
# warn if any targets belong directly to a cloud provider
- for event in scan.target.events:
+ for event in scan.target.seeds.events:
if event.type == "DNS_NAME":
cloudcheck_result = scan.helpers.cloudcheck(event.host)
if cloudcheck_result:
diff --git a/bbot/core/engine.py b/bbot/core/engine.py
index f4c52a803..d8c58bfd8 100644
--- a/bbot/core/engine.py
+++ b/bbot/core/engine.py
@@ -641,7 +641,7 @@ async def finished_tasks(self, tasks, timeout=None):
except BaseException as e:
if isinstance(e, (TimeoutError, asyncio.exceptions.TimeoutError)):
self.log.warning(f"{self.name}: Timeout after {timeout:,} seconds in finished_tasks({tasks})")
- for task in tasks:
+ for task in list(tasks):
task.cancel()
self._await_cancelled_task(task)
else:
@@ -683,5 +683,5 @@ async def cancel_all_tasks(self):
for client_id in list(self.tasks):
await self.cancel_task(client_id)
for client_id, tasks in self.child_tasks.items():
- for task in tasks:
+ for task in list(tasks):
await self._await_cancelled_task(task)
diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py
index d185b1d74..ce627f695 100644
--- a/bbot/core/event/base.py
+++ b/bbot/core/event/base.py
@@ -341,6 +341,21 @@ def host_original(self):
return self.host
return self._host_original
+ @property
+ def host_filterable(self):
+ """
+ A string version of the event that's used for regex-based blacklisting.
+
+ For example, the user can specify "REGEX:.*.evilcorp.com" in their blacklist, and this regex
+ will be applied against this property.
+ """
+ parsed_url = getattr(self, "parsed_url", None)
+ if parsed_url is not None:
+ return parsed_url.geturl()
+ if self.host is not None:
+ return str(self.host)
+ return ""
+
@property
def port(self):
self.host
@@ -1114,8 +1129,7 @@ def __init__(self, *args, **kwargs):
class IP_RANGE(DnsEvent):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
- net = ipaddress.ip_network(self.data, strict=False)
- self.add_tag(f"ipv{net.version}")
+ self.add_tag(f"ipv{self.host.version}")
def sanitize_data(self, data):
return str(ipaddress.ip_network(str(data), strict=False))
@@ -1689,6 +1703,13 @@ def make_event(
if event_type == "USERNAME" and validators.soft_validate(data, "email"):
event_type = "EMAIL_ADDRESS"
tags.add("affiliate")
+ # Convert single-host IP_RANGE to IP_ADDRESS
+ if event_type == "IP_RANGE":
+ with suppress(Exception):
+ net = ipaddress.ip_network(data, strict=False)
+ if net.prefixlen == net.max_prefixlen:
+ event_type = "IP_ADDRESS"
+ data = net.network_address
event_class = globals().get(event_type, DefaultEvent)
diff --git a/bbot/core/helpers/bloom.py b/bbot/core/helpers/bloom.py
index 357c715c0..4a3508edf 100644
--- a/bbot/core/helpers/bloom.py
+++ b/bbot/core/helpers/bloom.py
@@ -64,8 +64,15 @@ def _fnv1a_hash(self, data):
hash = (hash * 0x01000193) % 2**32 # 16777619
return hash
- def __del__(self):
+ def close(self):
+ """Explicitly close the memory-mapped file."""
self.mmap_file.close()
+ def __del__(self):
+ try:
+ self.close()
+ except Exception:
+ pass
+
def __contains__(self, item):
return self.check(item)
diff --git a/bbot/core/helpers/dns/helpers.py b/bbot/core/helpers/dns/helpers.py
index c18a2c162..340af5a42 100644
--- a/bbot/core/helpers/dns/helpers.py
+++ b/bbot/core/helpers/dns/helpers.py
@@ -1,6 +1,6 @@
import logging
-from bbot.core.helpers.regexes import dns_name_regex
+from bbot.core.helpers.regexes import dns_name_extraction_regex
from bbot.core.helpers.misc import clean_dns_record, smart_decode
log = logging.getLogger("bbot.core.helpers.dns")
@@ -198,7 +198,7 @@ def add_result(rdtype, _record):
elif rdtype == "TXT":
for s in record.strings:
s = smart_decode(s)
- for match in dns_name_regex.finditer(s):
+ for match in dns_name_extraction_regex.finditer(s):
start, end = match.span()
host = s[start:end]
add_result(rdtype, host)
diff --git a/bbot/core/helpers/helper.py b/bbot/core/helpers/helper.py
index 9565c1623..6db4b6921 100644
--- a/bbot/core/helpers/helper.py
+++ b/bbot/core/helpers/helper.py
@@ -12,10 +12,11 @@
from .regex import RegexHelper
from .wordcloud import WordCloud
from .interactsh import Interactsh
-from ...scanner.target import Target
from .depsinstaller import DepsInstaller
from .async_helpers import get_event_loop
+from bbot.scanner.target import BaseTarget
+
log = logging.getLogger("bbot.core.helpers")
@@ -155,8 +156,8 @@ def clean_old_scans(self):
_filter = lambda x: x.is_dir() and self.regexes.scan_name_regex.match(x.name)
self.clean_old(self.scans_dir, keep=self.keep_old_scans, filter=_filter)
- def make_target(self, *events, **kwargs):
- return Target(*events, **kwargs)
+ def make_target(self, *targets, **kwargs):
+ return BaseTarget(*targets, scan=self.scan, **kwargs)
@property
def config(self):
diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py
index c416e54f9..1a5693296 100644
--- a/bbot/core/helpers/misc.py
+++ b/bbot/core/helpers/misc.py
@@ -586,17 +586,18 @@ def is_dns_name(d, include_local=True):
if include_local:
if bbot_regexes.hostname_regex.match(d):
return True
- if bbot_regexes.dns_name_regex.match(d):
+ if bbot_regexes.dns_name_validation_regex.match(d):
return True
return False
-def is_ip(d, version=None):
+def is_ip(d, version=None, include_network=False):
"""
Checks if the given string or object represents a valid IP address.
Args:
d (str or ipaddress.IPvXAddress): The IP address to check.
+ include_network (bool, optional): Whether to include network types (IPv4Network or IPv6Network). Defaults to False.
version (int, optional): The IP version to validate (4 or 6). Default is None.
Returns:
@@ -612,12 +613,17 @@ def is_ip(d, version=None):
>>> is_ip('evilcorp.com')
False
"""
+ ip = None
try:
ip = ipaddress.ip_address(d)
- if version is None or ip.version == version:
- return True
except Exception:
- pass
+ if include_network:
+ try:
+ ip = ipaddress.ip_network(d, strict=False)
+ except Exception:
+ pass
+ if ip is not None and (version is None or ip.version == version):
+ return True
return False
diff --git a/bbot/core/helpers/regexes.py b/bbot/core/helpers/regexes.py
index 1fd513e5a..8d5d23b3a 100644
--- a/bbot/core/helpers/regexes.py
+++ b/bbot/core/helpers/regexes.py
@@ -40,7 +40,8 @@
# dns names with periods
_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+(?:[xX][nN]--)?[^\W_]{1,63}\.?"
-dns_name_regex = re.compile(_dns_name_regex, re.I)
+dns_name_extraction_regex = re.compile(_dns_name_regex, re.I)
+dns_name_validation_regex = re.compile(r"^" + _dns_name_regex + r"$", re.I)
# dns names without periods
_hostname_regex = r"(?!\w*\.\w+)\w(?:[\w-]{0,100}\w)?"
diff --git a/bbot/core/helpers/web/web.py b/bbot/core/helpers/web/web.py
index b05b2d798..a767945d0 100644
--- a/bbot/core/helpers/web/web.py
+++ b/bbot/core/helpers/web/web.py
@@ -58,7 +58,7 @@ def __init__(self, parent_helper):
self.ssl_verify = self.config.get("ssl_verify", False)
engine_debug = self.config.get("engine", {}).get("debug", False)
super().__init__(
- server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.radix_only},
+ server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.minimal},
debug=engine_debug,
)
diff --git a/bbot/modules/anubisdb.py b/bbot/modules/anubisdb.py
index b456365e5..597f5520d 100644
--- a/bbot/modules/anubisdb.py
+++ b/bbot/modules/anubisdb.py
@@ -38,7 +38,7 @@ async def abort_if(self, event):
return True, "DNS name is unresolved"
return await super().abort_if(event)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
results = set()
json = r.json()
if json:
diff --git a/bbot/modules/baddns.py b/bbot/modules/baddns.py
index 443606f7e..5e468b0d7 100644
--- a/bbot/modules/baddns.py
+++ b/bbot/modules/baddns.py
@@ -116,7 +116,7 @@ async def handle_event(self, event):
context=f'{{module}}\'s "{r_dict["module"]}" module found {{event.type}}: {r_dict["description"]}',
)
else:
- self.warning(f"Got unrecognized confidence level: {r['confidence']}")
+ self.warning(f"Got unrecognized confidence level: {r_dict['confidence']}")
found_domains = r_dict.get("found_domains", None)
if found_domains:
diff --git a/bbot/modules/bevigil.py b/bbot/modules/bevigil.py
index f3889e7fd..8e70fe414 100644
--- a/bbot/modules/bevigil.py
+++ b/bbot/modules/bevigil.py
@@ -60,14 +60,14 @@ async def request_urls(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}/urls/"
return await self.api_request(url)
- def parse_subdomains(self, r, query=None):
+ async def parse_subdomains(self, r, query=None):
results = set()
subdomains = r.json().get("subdomains")
if subdomains:
results.update(subdomains)
return results
- def parse_urls(self, r, query=None):
+ async def parse_urls(self, r, query=None):
results = set()
urls = r.json().get("urls")
if urls:
diff --git a/bbot/modules/binaryedge.py b/bbot/modules/binaryedge.py
index e9f6224b6..e712beec5 100644
--- a/bbot/modules/binaryedge.py
+++ b/bbot/modules/binaryedge.py
@@ -37,6 +37,6 @@ async def request_url(self, query):
url = f"{self.base_url}/query/domains/subdomain/{self.helpers.quote(query)}"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
j = r.json()
return j.get("events", [])
diff --git a/bbot/modules/bufferoverrun.py b/bbot/modules/bufferoverrun.py
index 1eba8ad4c..9523dc626 100644
--- a/bbot/modules/bufferoverrun.py
+++ b/bbot/modules/bufferoverrun.py
@@ -33,7 +33,7 @@ async def request_url(self, query):
url = f"{self.commercial_base_url if self.commercial else self.base_url}?q=.{query}"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
j = r.json()
subdomains_set = set()
if isinstance(j, dict):
@@ -44,5 +44,4 @@ def parse_results(self, r, query):
subdomain = parts[4].strip()
if subdomain and subdomain.endswith(f".{query}"):
subdomains_set.add(subdomain)
- for subdomain in subdomains_set:
- yield subdomain
+ return subdomains_set
diff --git a/bbot/modules/builtwith.py b/bbot/modules/builtwith.py
index 19e880034..9887f1822 100644
--- a/bbot/modules/builtwith.py
+++ b/bbot/modules/builtwith.py
@@ -62,7 +62,7 @@ async def request_redirects(self, query):
url = f"{self.base_url}/redirect1/api.json?KEY={{api_key}}&LOOKUP={query}"
return await self.api_request(url)
- def parse_domains(self, r, query):
+ async def parse_domains(self, r, query):
"""
This method returns a set of subdomains.
Each subdomain is an "FQDN" that was reported in the "Detailed Technology Profile" page on builtwith.com
@@ -92,7 +92,7 @@ def parse_domains(self, r, query):
self.verbose(f"No results for {query}: {error}")
return results_set
- def parse_redirects(self, r, query):
+ async def parse_redirects(self, r, query):
"""
This method creates a set.
Each entry in the set is either an Inbound or Outbound Redirect reported in the "Redirect Profile" page on builtwith.com
diff --git a/bbot/modules/c99.py b/bbot/modules/c99.py
index 7e703966b..7bb395fa1 100644
--- a/bbot/modules/c99.py
+++ b/bbot/modules/c99.py
@@ -26,7 +26,8 @@ async def request_url(self, query):
url = f"{self.base_url}/subdomainfinder?key={{api_key}}&domain={self.helpers.quote(query)}&json"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
j = r.json()
if isinstance(j, dict):
subdomains = j.get("subdomains", [])
@@ -34,4 +35,5 @@ def parse_results(self, r, query):
for s in subdomains:
subdomain = s.get("subdomain", "")
if subdomain:
- yield subdomain
+ results.add(subdomain)
+ return results
diff --git a/bbot/modules/certspotter.py b/bbot/modules/certspotter.py
index d4d770365..c6cbc6eb6 100644
--- a/bbot/modules/certspotter.py
+++ b/bbot/modules/certspotter.py
@@ -17,9 +17,11 @@ def request_url(self, query):
url = f"{self.base_url}/issuances?domain={self.helpers.quote(query)}&include_subdomains=true&expand=dns_names"
return self.api_request(url, timeout=self.http_timeout + 30)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
json = r.json()
if json:
for r in json:
for dns_name in r.get("dns_names", []):
- yield dns_name.lstrip(".*").rstrip(".")
+ results.add(dns_name.lstrip(".*").rstrip("."))
+ return results
diff --git a/bbot/modules/chaos.py b/bbot/modules/chaos.py
index cba4e7ea4..15a321046 100644
--- a/bbot/modules/chaos.py
+++ b/bbot/modules/chaos.py
@@ -26,7 +26,8 @@ async def request_url(self, query):
url = f"{self.base_url}/{domain}/subdomains"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
j = r.json()
subdomains_set = set()
if isinstance(j, dict):
@@ -39,4 +40,5 @@ def parse_results(self, r, query):
for s in subdomains_set:
full_subdomain = f"{s}.{domain}"
if full_subdomain and full_subdomain.endswith(f".{query}"):
- yield full_subdomain
+ results.add(full_subdomain)
+ return results
diff --git a/bbot/modules/columbus.py b/bbot/modules/columbus.py
index 6e3e9ce0b..781c3c94b 100644
--- a/bbot/modules/columbus.py
+++ b/bbot/modules/columbus.py
@@ -17,7 +17,7 @@ async def request_url(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}?days=365"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
results = set()
json = r.json()
if json and isinstance(json, list):
diff --git a/bbot/modules/crt.py b/bbot/modules/crt.py
index 441dbbb9b..05735c4e9 100644
--- a/bbot/modules/crt.py
+++ b/bbot/modules/crt.py
@@ -23,7 +23,8 @@ async def request_url(self, query):
url = self.helpers.add_get_params(self.base_url, params).geturl()
return await self.api_request(url, timeout=self.http_timeout + 30)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
j = r.json()
for cert_info in j:
if not type(cert_info) == dict:
@@ -35,4 +36,5 @@ def parse_results(self, r, query):
domain = cert_info.get("name_value")
if domain:
for d in domain.splitlines():
- yield d.lower()
+ results.add(d.lower())
+ return results
diff --git a/bbot/modules/digitorus.py b/bbot/modules/digitorus.py
index 48c060346..049343ac2 100644
--- a/bbot/modules/digitorus.py
+++ b/bbot/modules/digitorus.py
@@ -19,7 +19,7 @@ async def request_url(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}"
return await self.helpers.request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
results = set()
content = getattr(r, "text", "")
extract_regex = re.compile(r"[\w.-]+\." + query, re.I)
diff --git a/bbot/modules/dnscaa.py b/bbot/modules/dnscaa.py
index 1d18a811a..1465cd8fa 100644
--- a/bbot/modules/dnscaa.py
+++ b/bbot/modules/dnscaa.py
@@ -2,7 +2,7 @@
#
# Checks for and parses CAA DNS TXT records for IODEF reporting destination email addresses and/or URL's.
#
-# NOTE: when the target domain is initially resolved basic "dns_name_regex" matched targets will be extracted so we do not perform that again here.
+# NOTE: when the target domain is initially resolved basic "dns_name_extraction_regex" matched targets will be extracted so we do not perform that again here.
#
# Example CAA records,
# 0 iodef "mailto:dnsadmin@example.com"
@@ -23,7 +23,7 @@
import re
-from bbot.core.helpers.regexes import dns_name_regex, email_regex, url_regexes
+from bbot.core.helpers.regexes import dns_name_extraction_regex, email_regex, url_regexes
# Handle '0 iodef "mailto:support@hcaptcha.com"'
# Handle '1 iodef "https://some.host.tld/caa;"'
@@ -109,7 +109,7 @@ async def handle_event(self, event):
elif caa_match.group("property").lower().startswith("issue"):
if self._dns_names:
- for match in dns_name_regex.finditer(caa_match.group("text")):
+ for match in dns_name_extraction_regex.finditer(caa_match.group("text")):
start, end = match.span()
name = caa_match.group("text")[start:end]
diff --git a/bbot/modules/fullhunt.py b/bbot/modules/fullhunt.py
index 5736053e3..85106e582 100644
--- a/bbot/modules/fullhunt.py
+++ b/bbot/modules/fullhunt.py
@@ -35,5 +35,5 @@ async def request_url(self, query):
response = await self.api_request(url)
return response
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
return r.json().get("hosts", [])
diff --git a/bbot/modules/hackertarget.py b/bbot/modules/hackertarget.py
index adfa54458..b42352d47 100644
--- a/bbot/modules/hackertarget.py
+++ b/bbot/modules/hackertarget.py
@@ -18,12 +18,14 @@ async def request_url(self, query):
response = await self.api_request(url)
return response
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
for line in r.text.splitlines():
host = line.split(",")[0]
try:
self.helpers.validators.validate_host(host)
- yield host
+ results.add(host)
except ValueError:
self.debug(f"Error validating API result: {line}")
continue
+ return results
diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py
index bc777e66c..94032c554 100644
--- a/bbot/modules/internal/excavate.py
+++ b/bbot/modules/internal/excavate.py
@@ -527,9 +527,8 @@ class CSPExtractor(ExcavateRule):
async def process(self, yara_results, event, yara_rule_settings, discovery_context):
for identifier in yara_results.keys():
for csp_str in yara_results[identifier]:
- domains = await self.helpers.re.findall(bbot_regexes.dns_name_regex, csp_str)
- unique_domains = set(domains)
- for domain in unique_domains:
+ domains = await self.excavate.scan.extract_in_scope_hostnames(csp_str)
+ for domain in domains:
await self.report(domain, event, yara_rule_settings, discovery_context, event_type="DNS_NAME")
class EmailExtractor(ExcavateRule):
diff --git a/bbot/modules/internal/speculate.py b/bbot/modules/internal/speculate.py
index e52e4e1bb..84e9726bb 100644
--- a/bbot/modules/internal/speculate.py
+++ b/bbot/modules/internal/speculate.py
@@ -65,7 +65,7 @@ async def setup(self):
if not self.portscanner_enabled:
self.info(f"No portscanner enabled. Assuming open ports: {', '.join(str(x) for x in self.ports)}")
- target_len = len(self.scan.target)
+ target_len = len(self.scan.target.seeds)
if target_len > self.config.get("max_hosts", 65536):
if not self.portscanner_enabled:
self.hugewarning(
diff --git a/bbot/modules/leakix.py b/bbot/modules/leakix.py
index ba098f800..ac9e81f87 100644
--- a/bbot/modules/leakix.py
+++ b/bbot/modules/leakix.py
@@ -35,10 +35,12 @@ async def request_url(self, query):
response = await self.api_request(url)
return response
- def parse_results(self, r, query=None):
+ async def parse_results(self, r, query=None):
+ results = set()
json = r.json()
if json:
for entry in json:
subdomain = entry.get("subdomain", "")
if subdomain:
- yield subdomain
+ results.add(subdomain)
+ return results
diff --git a/bbot/modules/myssl.py b/bbot/modules/myssl.py
index 5c4a8021b..1a04364bc 100644
--- a/bbot/modules/myssl.py
+++ b/bbot/modules/myssl.py
@@ -17,7 +17,7 @@ async def request_url(self, query):
url = f"{self.base_url}?domain={self.helpers.quote(query)}"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
results = set()
json = r.json()
if json and isinstance(json, dict):
diff --git a/bbot/modules/otx.py b/bbot/modules/otx.py
index 01b65eff5..f0075bfc1 100644
--- a/bbot/modules/otx.py
+++ b/bbot/modules/otx.py
@@ -17,10 +17,12 @@ def request_url(self, query):
url = f"{self.base_url}/api/v1/indicators/domain/{self.helpers.quote(query)}/passive_dns"
return self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
j = r.json()
if isinstance(j, dict):
for entry in j.get("passive_dns", []):
subdomain = entry.get("hostname", "")
if subdomain:
- yield subdomain
+ results.add(subdomain)
+ return results
diff --git a/bbot/modules/passivetotal.py b/bbot/modules/passivetotal.py
index 0099d1e07..b20c7bbac 100644
--- a/bbot/modules/passivetotal.py
+++ b/bbot/modules/passivetotal.py
@@ -39,6 +39,8 @@ async def request_url(self, query):
url = f"{self.base_url}/enrichment/subdomains?query={self.helpers.quote(query)}"
return await self.api_request(url)
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
for subdomain in r.json().get("subdomains", []):
- yield f"{subdomain}.{query}"
+ results.add(f"{subdomain}.{query}")
+ return results
diff --git a/bbot/modules/rapiddns.py b/bbot/modules/rapiddns.py
index ad680131a..150728eca 100644
--- a/bbot/modules/rapiddns.py
+++ b/bbot/modules/rapiddns.py
@@ -18,11 +18,6 @@ async def request_url(self, query):
response = await self.api_request(url, timeout=self.http_timeout + 10)
return response
- def parse_results(self, r, query):
- results = set()
+ async def parse_results(self, r, query):
text = getattr(r, "text", "")
- for match in self.helpers.regexes.dns_name_regex.findall(text):
- match = match.lower()
- if match.endswith(query):
- results.add(match)
- return results
+ return await self.scan.extract_in_scope_hostnames(text)
diff --git a/bbot/modules/securitytrails.py b/bbot/modules/securitytrails.py
index c74450307..b92ac07dc 100644
--- a/bbot/modules/securitytrails.py
+++ b/bbot/modules/securitytrails.py
@@ -26,8 +26,10 @@ async def request_url(self, query):
response = await self.api_request(url)
return response
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
+ results = set()
j = r.json()
if isinstance(j, dict):
for host in j.get("subdomains", []):
- yield f"{host}.{query}"
+ results.add(f"{host}.{query}")
+ return results
diff --git a/bbot/modules/shodan_dns.py b/bbot/modules/shodan_dns.py
index 21140831e..2ad0bc505 100644
--- a/bbot/modules/shodan_dns.py
+++ b/bbot/modules/shodan_dns.py
@@ -22,5 +22,5 @@ async def handle_event(self, event):
def make_url(self, query):
return f"{self.base_url}/dns/domain/{self.helpers.quote(query)}?key={{api_key}}&page={{page}}"
- def parse_results(self, json, query):
+ async def parse_results(self, json, query):
return [f"{sub}.{query}" for sub in json.get("subdomains", [])]
diff --git a/bbot/modules/subdomaincenter.py b/bbot/modules/subdomaincenter.py
index 9fdce8c49..077ccf1a6 100644
--- a/bbot/modules/subdomaincenter.py
+++ b/bbot/modules/subdomaincenter.py
@@ -33,7 +33,7 @@ async def request_url(self, query):
break
return response
- def parse_results(self, r, query):
+ async def parse_results(self, r, query):
results = set()
json = r.json()
if json and isinstance(json, list):
diff --git a/bbot/modules/templates/subdomain_enum.py b/bbot/modules/templates/subdomain_enum.py
index 30267cc10..913b6c2ed 100644
--- a/bbot/modules/templates/subdomain_enum.py
+++ b/bbot/modules/templates/subdomain_enum.py
@@ -106,7 +106,7 @@ def make_query(self, event):
break
return ".".join([s for s in query.split(".") if s != "_wildcard"])
- def parse_results(self, r, query=None):
+ async def parse_results(self, r, query=None):
json = r.json()
if json:
for hostname in json:
@@ -123,7 +123,7 @@ async def query(self, query, request_fn=None, parse_fn=None):
self.info(f'Query "{query}" failed (no response)')
return []
try:
- results = list(parse_fn(response, query))
+ results = list(await parse_fn(response, query))
except Exception as e:
if response:
self.info(
@@ -144,7 +144,7 @@ async def query_paginated(self, query):
agen = self.api_page_iter(url, page_size=self.page_size, **self.api_page_iter_kwargs)
try:
async for response in agen:
- subdomains = self.parse_results(response, query)
+ subdomains = await self.parse_results(response, query)
self.verbose(f'Got {len(subdomains):,} subdomains for "{query}"')
if not subdomains:
break
diff --git a/bbot/modules/trickest.py b/bbot/modules/trickest.py
index 40f6ea704..246fdcfde 100644
--- a/bbot/modules/trickest.py
+++ b/bbot/modules/trickest.py
@@ -36,7 +36,7 @@ def make_url(self, query):
url += "&limit={page_size}&offset={offset}&select=hostname&orderby=hostname"
return url
- def parse_results(self, j, query):
+ async def parse_results(self, j, query):
results = j.get("results", [])
subdomains = set()
for item in results:
diff --git a/bbot/modules/virustotal.py b/bbot/modules/virustotal.py
index 14eec2a9b..b93241945 100644
--- a/bbot/modules/virustotal.py
+++ b/bbot/modules/virustotal.py
@@ -24,11 +24,6 @@ def prepare_api_request(self, url, kwargs):
kwargs["headers"]["x-apikey"] = self.api_key
return url, kwargs
- def parse_results(self, r, query):
- results = set()
+ async def parse_results(self, r, query):
text = getattr(r, "text", "")
- for match in self.helpers.regexes.dns_name_regex.findall(text):
- match = match.lower()
- if match.endswith(query):
- results.add(match)
- return results
+ return await self.scan.extract_in_scope_hostnames(text)
diff --git a/bbot/modules/zoomeye.py b/bbot/modules/zoomeye.py
index ffba419dd..ac7c2bd25 100644
--- a/bbot/modules/zoomeye.py
+++ b/bbot/modules/zoomeye.py
@@ -60,7 +60,7 @@ async def query(self, query):
agen = self.api_page_iter(url)
try:
async for j in agen:
- r = list(self.parse_results(j))
+ r = list(await self.parse_results(j))
if r:
results.update(set(r))
if not r or i >= (self.max_pages - 1):
@@ -70,6 +70,8 @@ async def query(self, query):
agen.aclose()
return results
- def parse_results(self, r):
+ async def parse_results(self, r):
+ results = set()
for entry in r.get("list", []):
- yield entry["name"]
+ results.add(entry["name"])
+ return results
diff --git a/bbot/presets/spider.yml b/bbot/presets/spider.yml
index 0ffb495c4..9e98ff453 100644
--- a/bbot/presets/spider.yml
+++ b/bbot/presets/spider.yml
@@ -3,6 +3,10 @@ description: Recursive web spider
modules:
- httpx
+blacklist:
+ # Prevent spider from invalidating sessions by logging out
+ - "RE:/.*(sign|log)[_-]?out"
+
config:
web:
# how many links to follow in a row
diff --git a/bbot/scanner/manager.py b/bbot/scanner/manager.py
index 8cbe098a5..4b129d524 100644
--- a/bbot/scanner/manager.py
+++ b/bbot/scanner/manager.py
@@ -38,7 +38,7 @@ async def init_events(self, events=None):
- It also marks the Scan object as finished with initialization by setting `_finished_init` to True.
"""
if events is None:
- events = self.scan.target.events
+ events = self.scan.target.seeds.events
async with self.scan._acatch(self.init_events), self._task_counter.count(self.init_events):
sorted_events = sorted(events, key=lambda e: len(e.data))
for event in [self.scan.root_event] + sorted_events:
@@ -49,7 +49,6 @@ async def init_events(self, events=None):
event.parent = self.scan.root_event
if event.module is None:
event.module = self.scan._make_dummy_module(name="TARGET", _type="TARGET")
- event.add_tag("target")
if event != self.scan.root_event:
event.discovery_context = f"Scan {self.scan.name} seeded with " + "{event.type}: {event.data}"
self.verbose(f"Target: {event}")
diff --git a/bbot/scanner/preset/args.py b/bbot/scanner/preset/args.py
index cf48dd4b9..591a52235 100644
--- a/bbot/scanner/preset/args.py
+++ b/bbot/scanner/preset/args.py
@@ -223,7 +223,7 @@ def create_parser(self, *args, **kwargs):
"--modules",
nargs="+",
default=[],
- help=f'Modules to enable. Choices: {",".join(self.preset.module_loader.scan_module_choices)}',
+ help=f'Modules to enable. Choices: {",".join(sorted(self.preset.module_loader.scan_module_choices))}',
metavar="MODULE",
)
modules.add_argument("-l", "--list-modules", action="store_true", help=f"List available modules.")
@@ -238,7 +238,7 @@ def create_parser(self, *args, **kwargs):
"--flags",
nargs="+",
default=[],
- help=f'Enable modules by flag. Choices: {",".join(self.preset.module_loader.flag_choices)}',
+ help=f'Enable modules by flag. Choices: {",".join(sorted(self.preset.module_loader.flag_choices))}',
metavar="FLAG",
)
modules.add_argument("-lf", "--list-flags", action="store_true", help=f"List available flags.")
@@ -300,7 +300,7 @@ def create_parser(self, *args, **kwargs):
"--output-modules",
nargs="+",
default=[],
- help=f'Output module(s). Choices: {",".join(self.preset.module_loader.output_module_choices)}',
+ help=f'Output module(s). Choices: {",".join(sorted(self.preset.module_loader.output_module_choices))}',
metavar="MODULE",
)
output.add_argument("--json", "-j", action="store_true", help="Output scan data in JSON format")
diff --git a/bbot/scanner/preset/path.py b/bbot/scanner/preset/path.py
index 730b16e63..9b8456612 100644
--- a/bbot/scanner/preset/path.py
+++ b/bbot/scanner/preset/path.py
@@ -33,7 +33,9 @@ def find(self, filename):
if "/" in str(filename):
if filename_path.parent not in paths_to_search:
paths_to_search.append(filename_path.parent)
- log.debug(f"Searching for preset in {paths_to_search}, file candidates: {file_candidates_str}")
+ log.debug(
+ f"Searching for preset in {[str(p) for p in paths_to_search]}, file candidates: {file_candidates_str}"
+ )
for path in paths_to_search:
for candidate in file_candidates:
for file in path.rglob(candidate):
diff --git a/bbot/scanner/preset/preset.py b/bbot/scanner/preset/preset.py
index 1b296d68d..0388fbcfa 100644
--- a/bbot/scanner/preset/preset.py
+++ b/bbot/scanner/preset/preset.py
@@ -241,7 +241,7 @@ def __init__(
# "presets" is alias to "include"
if presets and include:
raise ValueError(
- 'Cannot use both "presets" and "include" args at the same time (presets is only an alias to include). Please pick only one :)'
+ 'Cannot use both "presets" and "include" args at the same time (presets is an alias to include). Please pick one or the other :)'
)
if presets and not include:
include = presets
@@ -270,6 +270,12 @@ def target(self):
raise ValueError("Cannot access target before preset is baked (use ._seeds instead)")
return self._target
+ @property
+ def seeds(self):
+ if self._seeds is None:
+ raise ValueError("Cannot access target before preset is baked (use ._seeds instead)")
+ return self.target.seeds
+
@property
def whitelist(self):
if self._target is None:
@@ -755,11 +761,11 @@ def to_dict(self, include_target=False, full_config=False, redact_secrets=False)
# scope
if include_target:
- target = sorted(str(t.data) for t in self.target.seeds)
+ target = sorted(self.target.seeds.inputs)
whitelist = []
if self.target.whitelist is not None:
- whitelist = sorted(str(t.data) for t in self.target.whitelist)
- blacklist = sorted(str(t.data) for t in self.target.blacklist)
+ whitelist = sorted(self.target.whitelist.inputs)
+ blacklist = sorted(self.target.blacklist.inputs)
if target:
preset_dict["target"] = target
if whitelist and whitelist != target:
diff --git a/bbot/scanner/scanner.py b/bbot/scanner/scanner.py
index ff394a060..2b06ef1e2 100644
--- a/bbot/scanner/scanner.py
+++ b/bbot/scanner/scanner.py
@@ -269,7 +269,7 @@ async def _prep(self):
f.write(self.preset.to_yaml())
# log scan overview
- start_msg = f"Scan with {len(self.preset.scan_modules):,} modules seeded with {len(self.target):,} targets"
+ start_msg = f"Scan seeded with {len(self.seeds):,} targets"
details = []
if self.whitelist != self.target:
details.append(f"{len(self.whitelist):,} in whitelist")
@@ -362,7 +362,8 @@ async def async_start(self):
# distribute seed events
self.init_events_task = asyncio.create_task(
- self.ingress_module.init_events(self.target.events), name=f"{self.name}.ingress_module.init_events()"
+ self.ingress_module.init_events(self.target.seeds.events),
+ name=f"{self.name}.ingress_module.init_events()",
)
# main scan loop
@@ -896,6 +897,10 @@ def config(self):
def target(self):
return self.preset.target
+ @property
+ def seeds(self):
+ return self.preset.seeds
+
@property
def whitelist(self):
return self.preset.whitelist
diff --git a/bbot/scanner/target.py b/bbot/scanner/target.py
index aff8b3227..2163bddcd 100644
--- a/bbot/scanner/target.py
+++ b/bbot/scanner/target.py
@@ -1,112 +1,251 @@
-import re
-import copy
import logging
-import ipaddress
-import traceback
+import regex as re
from hashlib import sha1
-from contextlib import suppress
from radixtarget import RadixTarget
+from radixtarget.helpers import host_size_key
from bbot.errors import *
-from bbot.modules.base import BaseModule
-from bbot.core.helpers.misc import make_ip_type
from bbot.core.event import make_event, is_event
+from bbot.core.helpers.misc import is_dns_name, is_ip
+
log = logging.getLogger("bbot.core.target")
-class BBOTTarget:
+def special_target_type(regex_pattern):
+ def decorator(func):
+ func._regex = re.compile(regex_pattern, re.IGNORECASE)
+ return func
+
+ return decorator
+
+
+class BaseTarget(RadixTarget):
"""
- A convenient abstraction of a scan target that includes whitelisting and blacklisting
+ A collection of BBOT events that represent a scan target.
- Provides high-level functions like in_scope(), which includes both whitelist and blacklist checks.
+ Based on radixtarget, which allows extremely fast IP and DNS lookups.
+
+ This class is inherited by all three components of the BBOT target:
+ - Whitelist
+ - Blacklist
+ - Seeds
"""
- def __init__(self, *targets, whitelist=None, blacklist=None, strict_scope=False, scan=None):
- self.strict_scope = strict_scope
+ special_target_types = {
+ # regex-callback pairs for handling special target types
+ # these aren't defined explicitly; instead they are decorated with @special_target_type
+ # the function must return a list of events
+ }
+ tags = []
+
+ def __init__(self, *targets, scan=None, **kwargs):
self.scan = scan
- if len(targets) > 0:
- log.verbose(f"Creating events from {len(targets):,} targets")
- self.seeds = Target(*targets, strict_scope=self.strict_scope, scan=scan)
- if whitelist is None:
- whitelist = set([e.host for e in self.seeds if e.host])
+ self.events = set()
+ self.inputs = set()
+ # Register decorated methods
+ for method in dir(self):
+ if callable(getattr(self, method, None)):
+ func = getattr(self, method)
+ if hasattr(func, "_regex"):
+ self.special_target_types[func._regex] = func
+
+ super().__init__(*targets, **kwargs)
+
+ def get(self, event, **kwargs):
+ """
+ Override default .get() to accept events
+ """
+ if is_event(event):
+ host = event.host
+ # save resources by checking if the event is an IP or DNS name
+ elif is_ip(event, include_network=True) or is_dns_name(event):
+ host = event
+ elif isinstance(event, str):
+ event = self.make_event(event)
+ host = event.host
else:
- log.verbose(f"Creating events from {len(whitelist):,} whitelist entries")
- self.whitelist = Target(*whitelist, strict_scope=self.strict_scope, scan=scan, acl_mode=True)
- if blacklist is None:
- blacklist = []
- if blacklist:
- log.verbose(f"Creating events from {len(blacklist):,} blacklist entries")
- self.blacklist = Target(*blacklist, scan=scan, acl_mode=True)
- self._hash = None
+ raise ValueError(f"Invalid host/event: {event} ({type(event)})")
+ if not host:
+ if kwargs.get("raise_error", False):
+ raise KeyError(f"Host not found: '{event}'")
+ return None
+ results = super().get(host, **kwargs)
+ return results
+
+ def make_event(self, *args, **kwargs):
+ # if it's already an event, return it
+ if args and is_event(args[0]):
+ return args[0]
+ # otherwise make a new one
+ if not "tags" in kwargs:
+ kwargs["tags"] = set()
+ kwargs["tags"].update(self.tags)
+ return make_event(*args, dummy=True, scan=self.scan, **kwargs)
+
+ def add(self, targets):
+ if not isinstance(targets, (list, set, tuple)):
+ targets = [targets]
+ events = set()
+ for target in targets:
+ _events = []
+ special_target_type, _events = self.check_special_target_types(str(target))
+ if special_target_type:
+ self.inputs.add(str(target))
+ else:
+ event = self.make_event(target)
+ if event:
+ _events = [event]
+ for event in _events:
+ self.inputs.add(event.data)
+ events.add(event)
+
+ # sort by host size to ensure consistency
+ events = sorted(events, key=lambda e: (0 if not e.host else host_size_key(e.host)))
+ for event in events:
+ self.events.add(event)
+ self._add(event.host, data=event)
+
+ def check_special_target_types(self, target):
+ for regex, callback in self.special_target_types.items():
+ match = regex.match(target)
+ if match:
+ return True, callback(match)
+ return False, []
+
+ def __iter__(self):
+ yield from self.events
- def add(self, *args, **kwargs):
- self.seeds.add(*args, **kwargs)
- self._hash = None
- def get(self, host):
- return self.seeds.get(host)
+class ScanSeeds(BaseTarget):
+ """
+ Initial events used to seed a scan.
- def get_host(self, host):
- return self.seeds.get(host)
+ These are the targets specified by the user, e.g. via `-t` on the CLI.
+ """
- def __iter__(self):
- return iter(self.seeds)
+ tags = ["target"]
+
+ @special_target_type(r"^(?:ORG|ORG_STUB):(.*)")
+ def handle_org_stub(self, match):
+ org_stub_event = self.make_event(match.group(1), event_type="ORG_STUB")
+ if org_stub_event:
+ return [org_stub_event]
+ return []
+
+ @special_target_type(r"^(?:USER|USERNAME):(.*)")
+ def handle_username(self, match):
+ username_event = self.make_event(match.group(1), event_type="USERNAME")
+ if username_event:
+ return [username_event]
+ return []
+
+ def get(self, event, single=True, **kwargs):
+ results = super().get(event, **kwargs)
+ if results and single:
+ return next(iter(results))
+ return results
+
+ def _add(self, host, data):
+ """
+ Overrides the base method to enable having multiple events for the same host.
- def __len__(self):
- return len(self.seeds)
+ The "data" attribute of the node is now a set of events.
+ """
+ if host:
+ try:
+ event_set = self.get(host, raise_error=True, single=False)
+ event_set.add(data)
+ except KeyError:
+ event_set = {data}
+ super()._add(host, data=event_set)
- def __contains__(self, other):
- if isinstance(other, self.__class__):
- other = other.seeds
- return other in self.seeds
+ def _hash_value(self):
+ # seeds get hashed by event data
+ return sorted(str(e.data).encode() for e in self.events)
- def __bool__(self):
- return bool(self.seeds)
- def __eq__(self, other):
- return self.hash == other.hash
+class ACLTarget(BaseTarget):
+ def __init__(self, *args, **kwargs):
+ # ACL mode dedupes by host (and skips adding already-contained hosts) for efficiency
+ kwargs["acl_mode"] = True
+ super().__init__(*args, **kwargs)
- @property
- def hash(self):
- """
- A sha1 hash representing a BBOT target and all three of its components (seeds, whitelist, blacklist)
- This can be used to compare targets.
+class ScanWhitelist(ACLTarget):
+ """
+ A collection of BBOT events that represent a scan's whitelist.
+ """
- Examples:
- >>> target1 = BBOTTarget("evilcorp.com", blacklist=["prod.evilcorp.com"], whitelist=["test.evilcorp.com"])
- >>> target2 = BBOTTarget("evilcorp.com", blacklist=["prod.evilcorp.com"], whitelist=["test.evilcorp.com"])
- >>> target3 = BBOTTarget("evilcorp.com", blacklist=["prod.evilcorp.com"])
- >>> target1 == target2
- True
- >>> target1 == target3
- False
- """
- if self._hash is None:
- # Create a new SHA-1 hash object
- sha1_hash = sha1()
- # Update the SHA-1 object with the hash values of each object
- for target_hash in [t.hash for t in (self.seeds, self.whitelist, self.blacklist)]:
- # Convert the hash value to bytes and update the SHA-1 object
- sha1_hash.update(target_hash)
- self._hash = sha1_hash.digest()
- return self._hash
+ pass
- @property
- def scope_hash(self):
- """
- A sha1 hash representing only the whitelist and blacklist
- This is used to record the scope of a scan.
+class ScanBlacklist(ACLTarget):
+ """
+ A collection of BBOT events that represent a scan's blacklist.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.blacklist_regexes = set()
+ super().__init__(*args, **kwargs)
+
+ @special_target_type(r"^(?:RE|REGEX):(.*)")
+ def handle_regex(self, match):
+ pattern = match.group(1)
+ log.info(f"Blacklisting by custom regex: {pattern}")
+ blacklist_regex = re.compile(pattern, re.IGNORECASE)
+ self.blacklist_regexes.add(blacklist_regex)
+ return []
+
+ def get(self, event, **kwargs):
"""
- # Create a new SHA-1 hash object
- sha1_hash = sha1()
- # Update the SHA-1 object with the hash values of each object
- for target_hash in [t.hash for t in (self.whitelist, self.blacklist)]:
- # Convert the hash value to bytes and update the SHA-1 object
- sha1_hash.update(target_hash)
- return sha1_hash.digest()
+ Here, for the blacklist, we modify this method to also consider any special regex patterns specified by the user
+ """
+ event = self.make_event(event)
+ # first, check event's host against blacklist
+ try:
+ event_result = super().get(event, raise_error=True)
+ except KeyError:
+ event_result = None
+ if event_result is not None:
+ return event_result
+ # next, check event's host against regexes
+ host_or_url = event.host_filterable
+ if host_or_url:
+ for regex in self.blacklist_regexes:
+ if regex.search(str(host_or_url)):
+ return event
+ if kwargs.get("raise_error", False):
+ raise KeyError(f"Host not found: '{event.data}'")
+ return None
+
+ def _hash_value(self):
+ # regexes are included in blacklist hash
+ regex_patterns = [str(r.pattern).encode() for r in self.blacklist_regexes]
+ hosts = [str(h).encode() for h in self.sorted_hosts]
+ return hosts + regex_patterns
+
+
+class BBOTTarget:
+ """
+ A convenient abstraction of a scan target that contains three subtargets:
+ - seeds
+ - whitelist
+ - blacklist
+
+ Provides high-level functions like in_scope(), which includes both whitelist and blacklist checks.
+ """
+
+ def __init__(self, *seeds, whitelist=None, blacklist=None, strict_scope=False, scan=None):
+ self.scan = scan
+ self.strict_scope = strict_scope
+ self.seeds = ScanSeeds(*seeds, strict_dns_scope=strict_scope, scan=scan)
+ if whitelist is None:
+ whitelist = self.seeds.hosts
+ self.whitelist = ScanWhitelist(*whitelist, strict_dns_scope=strict_scope, scan=scan)
+ if blacklist is None:
+ blacklist = []
+ self.blacklist = ScanBlacklist(*blacklist, scan=scan)
@property
def json(self):
@@ -122,16 +261,20 @@ def json(self):
"scope_hash": self.scope_hash.hex(),
}
- def copy(self):
- self_copy = copy.copy(self)
- self_copy.seeds = self.seeds.copy()
- self_copy.whitelist = self.whitelist.copy()
- self_copy.blacklist = self.blacklist.copy()
- return self_copy
+ @property
+ def hash(self):
+ sha1_hash = sha1()
+ for target_hash in [t.hash for t in (self.seeds, self.whitelist, self.blacklist)]:
+ sha1_hash.update(target_hash)
+ return sha1_hash.digest()
@property
- def events(self):
- return self.seeds.events
+ def scope_hash(self):
+ sha1_hash = sha1()
+ # Consider only the hash values of the whitelist and blacklist
+ for target_hash in [t.hash for t in (self.whitelist, self.blacklist)]:
+ sha1_hash.update(target_hash)
+ return sha1_hash.digest()
def in_scope(self, host):
"""
@@ -167,8 +310,7 @@ def blacklisted(self, host):
>>> preset.blacklisted("http://www.evilcorp.com")
True
"""
- e = make_event(host, dummy=True)
- return e in self.blacklist
+ return host in self.blacklist
def whitelisted(self, host):
"""
@@ -184,360 +326,20 @@ def whitelisted(self, host):
>>> preset.whitelisted("http://www.evilcorp.com")
True
"""
- e = make_event(host, dummy=True)
- whitelist = self.whitelist
- if whitelist is None:
- whitelist = self.seeds
- return e in whitelist
+ return host in self.whitelist
@property
- def radix_only(self):
+ def minimal(self):
"""
A slimmer, serializable version of the target designed for simple scope checks
- This version doesn't have the events, only their hosts.
+ This version doesn't have the events, only their hosts. This allows it to be passed across process boundaries.
"""
return self.__class__(
- *[e.host for e in self.seeds if e.host],
- whitelist=None if self.whitelist is None else [e for e in self.whitelist],
- blacklist=[e for e in self.blacklist],
+ whitelist=self.whitelist.inputs,
+ blacklist=self.blacklist.inputs,
strict_scope=self.strict_scope,
)
-
-class Target:
- """
- A class representing a target. Can contain an unlimited number of hosts, IP or IP ranges, URLs, etc.
-
- Attributes:
- strict_scope (bool): Flag indicating whether to consider child domains in-scope.
- If set to True, only the exact hosts specified and not their children are considered part of the target.
-
- _radix (RadixTree): Radix tree for quick IP/DNS lookups.
- _events (set): Flat set of contained events.
-
- Examples:
- Basic usage
- >>> target = Target(scan, "evilcorp.com", "1.2.3.0/24")
- >>> len(target)
- 257
- >>> list(t.events)
- [
- DNS_NAME("evilcorp.com", module=TARGET, tags={'domain', 'distance-1', 'target'}),
- IP_RANGE("1.2.3.0/24", module=TARGET, tags={'ipv4', 'distance-1', 'target'})
- ]
- >>> "www.evilcorp.com" in target
- True
- >>> "1.2.3.4" in target
- True
- >>> "4.3.2.1" in target
- False
- >>> "https://admin.evilcorp.com" in target
- True
- >>> "bob@evilcorp.com" in target
- True
-
- Event correlation
- >>> target.get("www.evilcorp.com")
- DNS_NAME("evilcorp.com", module=TARGET, tags={'domain', 'distance-1', 'target'})
- >>> target.get("1.2.3.4")
- IP_RANGE("1.2.3.0/24", module=TARGET, tags={'ipv4', 'distance-1', 'target'})
-
- Target comparison
- >>> target2 = Targets(scan, "www.evilcorp.com")
- >>> target2 == target
- False
- >>> target2 in target
- True
- >>> target in target2
- False
-
- Notes:
- - Targets are only precise down to the individual host. Ports and protocols are not considered in scope calculations.
- - If you specify "https://evilcorp.com:8443" as a target, all of evilcorp.com (including subdomains and other ports and protocols) will be considered part of the target
- - If you do not want to include child subdomains, use `strict_scope=True`
- """
-
- def __init__(self, *targets, strict_scope=False, scan=None, acl_mode=False):
- """
- Initialize a Target object.
-
- Args:
- *targets: One or more targets (e.g., domain names, IP ranges) to be included in this Target.
- strict_scope (bool): Whether to consider subdomains of target domains in-scope
- scan (Scan): Reference to the Scan object that instantiated the Target.
- acl_mode (bool): Stricter deduplication for more efficient checks
-
- Notes:
- - If you are instantiating a target from within a BBOT module, use `self.helpers.make_target()` instead. (this removes the need to pass in a scan object.)
- - The strict_scope flag can be set to restrict scope calculation to only exactly-matching hosts and not their child subdomains.
- - Each target is processed and stored as an `Event` in the '_events' dictionary.
- """
- self.scan = scan
- self.strict_scope = strict_scope
- self.acl_mode = acl_mode
- self.special_event_types = {
- "ORG_STUB": re.compile(r"^(?:ORG|ORG_STUB):(.*)", re.IGNORECASE),
- "USERNAME": re.compile(r"^(?:USER|USERNAME):(.*)", re.IGNORECASE),
- }
- self._events = set()
- self._radix = RadixTarget()
-
- for target_event in self._make_events(targets):
- self._add_event(target_event)
-
- self._hash = None
-
- def add(self, t, event_type=None):
- """
- Add a target or merge events from another Target object into this Target.
-
- Args:
- t: The target to be added. It can be either a string, an event object, or another Target object.
-
- Attributes Modified:
- _events (dict): The dictionary is updated to include the new target's events.
-
- Examples:
- >>> target.add('example.com')
-
- Notes:
- - If `t` is of the same class as this Target, all its events are merged.
- - If `t` is an event, it is directly added to `_events`.
- """
- if not isinstance(t, (list, tuple, set)):
- t = [t]
- for single_target in t:
- if isinstance(single_target, self.__class__):
- for event in single_target.events:
- self._add_event(event)
- else:
- if is_event(single_target):
- event = single_target
- else:
- try:
- event = make_event(
- single_target, event_type=event_type, dummy=True, tags=["target"], scan=self.scan
- )
- except ValidationError as e:
- # allow commented lines
- if not str(t).startswith("#"):
- log.trace(traceback.format_exc())
- raise ValidationError(f'Could not add target "{t}": {e}')
- self._add_event(event)
-
- @property
- def events(self):
- """
- Returns all events in the target.
-
- Yields:
- Event object: One of the Event objects stored in the `_events` dictionary.
-
- Examples:
- >>> target = Target(scan, "example.com")
- >>> for event in target.events:
- ... print(event)
-
- Notes:
- - This property is read-only.
- """
- return self._events
-
- @property
- def hosts(self):
- return [e.host for e in self.events]
-
- def copy(self):
- """
- Creates and returns a copy of the Target object, including a shallow copy of the `_events` and `_radix` attributes.
-
- Returns:
- Target: A new Target object with the sameattributes as the original.
- A shallow copy of the `_events` dictionary is made.
-
- Examples:
- >>> original_target = Target(scan, "example.com")
- >>> copied_target = original_target.copy()
- >>> copied_target is original_target
- False
- >>> copied_target == original_target
- True
- >>> copied_target in original_target
- True
- >>> original_target in copied_target
- True
-
- Notes:
- - The `scan` object reference is kept intact in the copied Target object.
- """
- self_copy = self.__class__()
- self_copy._events = set(self._events)
- self_copy._radix = copy.copy(self._radix)
- return self_copy
-
- def get(self, host, single=True):
- """
- Gets the event associated with the specified host from the target's radix tree.
-
- Args:
- host (Event, Target, or str): The hostname, IP, URL, or event to look for.
- single (bool): Whether to return a single event. If False, return all events matching the host
-
- Returns:
- Event or None: Returns the Event object associated with the given host if it exists, otherwise returns None.
-
- Examples:
- >>> target = Target(scan, "evilcorp.com", "1.2.3.0/24")
- >>> target.get("www.evilcorp.com")
- DNS_NAME("evilcorp.com", module=TARGET, tags={'domain', 'distance-1', 'target'})
- >>> target.get("1.2.3.4")
- IP_RANGE("1.2.3.0/24", module=TARGET, tags={'ipv4', 'distance-1', 'target'})
-
- Notes:
- - The method returns the first event that matches the given host.
- - If `strict_scope` is False, it will also consider parent domains and IP ranges.
- """
- try:
- event = make_event(host, dummy=True)
- except ValidationError:
- return
- if event.host:
- return self.get_host(event.host, single=single)
-
- def get_host(self, host, single=True):
- """
- A more efficient version of .get() that only accepts hostnames and IP addresses
- """
- host = make_ip_type(host)
- with suppress(KeyError, StopIteration):
- result = self._radix.search(host)
- if result is not None:
- ret = set()
- for event in result:
- # if the result is a dns name and strict scope is enabled
- if isinstance(event.host, str) and self.strict_scope:
- # if the result doesn't exactly equal the host, abort
- if event.host != host:
- return
- if single:
- return event
- else:
- ret.add(event)
- if ret and not single:
- return ret
-
- def _sort_events(self, events):
- return sorted(events, key=lambda x: x._host_size)
-
- def _make_events(self, targets):
- events = []
- for target in targets:
- event_type = None
- for eventtype, regex in self.special_event_types.items():
- if isinstance(target, str):
- match = regex.match(target)
- if match:
- target = match.groups()[0]
- event_type = eventtype
- break
- events.append(make_event(target, event_type=event_type, dummy=True, scan=self.scan))
- return self._sort_events(events)
-
- def _add_event(self, event):
- skip = False
- if event.host:
- radix_data = self._radix.search(event.host)
- if self.acl_mode:
- # skip if the hostname/IP/subnet (or its parent) has already been added
- if radix_data is not None and not self.strict_scope:
- skip = True
- else:
- event_type = "IP_RANGE" if event.type == "IP_RANGE" else "DNS_NAME"
- event = make_event(event.host, event_type=event_type, dummy=True, scan=self.scan)
- if not skip:
- # if strict scope is enabled and it's not an exact host match, we add a whole new entry
- if radix_data is None or (self.strict_scope and event.host not in radix_data):
- radix_data = {event}
- self._radix.insert(event.host, radix_data)
- # otherwise, we add the event to the set
- else:
- radix_data.add(event)
- # clear hash
- self._hash = None
- elif self.acl_mode and not self.strict_scope:
- # skip if we're in ACL mode and there's no host
- skip = True
- if not skip:
- self._events.add(event)
-
- def _contains(self, other):
- if self.get(other) is not None:
- return True
- return False
-
- def __str__(self):
- return ",".join([str(e.data) for e in self.events][:5])
-
- def __iter__(self):
- yield from self.events
-
- def __contains__(self, other):
- # if "other" is a Target
- if isinstance(other, self.__class__):
- contained_in_self = [self._contains(e) for e in other.events]
- return all(contained_in_self)
- else:
- return self._contains(other)
-
- def __bool__(self):
- return bool(self._events)
-
def __eq__(self, other):
return self.hash == other.hash
-
- @property
- def hash(self):
- if self._hash is None:
- # Create a new SHA-1 hash object
- sha1_hash = sha1()
- # Update the SHA-1 object with the hash values of each object
- for event_type, event_hash in sorted([(e.type.encode(), e.data_hash) for e in self.events]):
- sha1_hash.update(event_type)
- sha1_hash.update(event_hash)
- if self.strict_scope:
- sha1_hash.update(b"\x00")
- self._hash = sha1_hash.digest()
- return self._hash
-
- def __len__(self):
- """
- Calculates and returns the total number of hosts within this target, not counting duplicate events.
-
- Returns:
- int: The total number of unique hosts present within the target's `_events`.
-
- Examples:
- >>> target = Target(scan, "evilcorp.com", "1.2.3.0/24")
- >>> len(target)
- 257
-
- Notes:
- - If a host is represented as an IP network, all individual IP addresses in that network are counted.
- - For other types of hosts, each unique event is counted as one.
- """
- num_hosts = 0
- for event in self._events:
- if isinstance(event.host, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
- num_hosts += event.host.num_addresses
- else:
- num_hosts += 1
- return num_hosts
-
-
-class TargetDummyModule(BaseModule):
- _type = "TARGET"
- name = "TARGET"
-
- def __init__(self, scan):
- self.scan = scan
diff --git a/bbot/test/test_step_1/test_bloom_filter.py b/bbot/test/test_step_1/test_bloom_filter.py
index e57c56110..22ec4db32 100644
--- a/bbot/test/test_step_1/test_bloom_filter.py
+++ b/bbot/test/test_step_1/test_bloom_filter.py
@@ -66,4 +66,6 @@ def generate_random_strings(n, length=10):
# ensure false positives are less than .02 percent
assert false_positive_percent < 0.02
+ bloom_filter.close()
+
await scan._cleanup()
diff --git a/bbot/test/test_step_1/test_cli.py b/bbot/test/test_step_1/test_cli.py
index 47db12d2a..acdd4011b 100644
--- a/bbot/test/test_step_1/test_cli.py
+++ b/bbot/test/test_step_1/test_cli.py
@@ -535,6 +535,13 @@ def test_cli_module_validation(monkeypatch, caplog):
]
)
+ # bad target
+ caplog.clear()
+ assert not caplog.text
+ monkeypatch.setattr("sys.argv", ["bbot", "-t", "asdf:::sdf"])
+ cli.main()
+ assert 'Unable to autodetect event type from "asdf:::sdf"' in caplog.text
+
# incorrect flag
caplog.clear()
assert not caplog.text
diff --git a/bbot/test/test_step_1/test_dns.py b/bbot/test/test_step_1/test_dns.py
index 16e949abf..d0bfb6833 100644
--- a/bbot/test/test_step_1/test_dns.py
+++ b/bbot/test/test_step_1/test_dns.py
@@ -106,7 +106,8 @@ async def test_dns_resolution(bbot_scanner):
assert "2606:4700:4700::1111" in await dnsengine.resolve("one.one.one.one", type="AAAA")
assert "one.one.one.one" in await dnsengine.resolve("1.1.1.1")
for rdtype in ("NS", "SOA", "MX", "TXT"):
- assert len(await dnsengine.resolve("google.com", type=rdtype)) > 0
+ results = await dnsengine.resolve("google.com", type=rdtype)
+ assert len(results) > 0
# batch resolution
batch_results = [r async for r in dnsengine.resolve_batch(["1.1.1.1", "one.one.one.one"])]
diff --git a/bbot/test/test_step_1/test_events.py b/bbot/test/test_step_1/test_events.py
index 1b1971d1d..8156fc796 100644
--- a/bbot/test/test_step_1/test_events.py
+++ b/bbot/test/test_step_1/test_events.py
@@ -42,6 +42,7 @@ async def test_events(events, helpers):
# ip tests
assert events.ipv4 == scan.make_event("8.8.8.8", dummy=True)
assert "8.8.8.8" in events.ipv4
+ assert events.ipv4.host_filterable == "8.8.8.8"
assert "8.8.8.8" == events.ipv4
assert "8.8.8.8" in events.netv4
assert "8.8.8.9" not in events.ipv4
@@ -59,11 +60,19 @@ async def test_events(events, helpers):
assert events.emoji not in events.ipv4
assert events.emoji not in events.netv6
assert events.netv6 not in events.emoji
- assert "dead::c0de" == scan.make_event(" [DEaD::c0De]:88", "DNS_NAME", dummy=True)
+ ipv6_event = scan.make_event(" [DEaD::c0De]:88", "DNS_NAME", dummy=True)
+ assert "dead::c0de" == ipv6_event
+ assert ipv6_event.host_filterable == "dead::c0de"
+ range_to_ip = scan.make_event("1.2.3.4/32", dummy=True)
+ assert range_to_ip.type == "IP_ADDRESS"
+ range_to_ip = scan.make_event("dead::beef/128", dummy=True)
+ assert range_to_ip.type == "IP_ADDRESS"
# hostname tests
assert events.domain.host == "publicapis.org"
+ assert events.domain.host_filterable == "publicapis.org"
assert events.subdomain.host == "api.publicapis.org"
+ assert events.subdomain.host_filterable == "api.publicapis.org"
assert events.domain.host_stem == "publicapis"
assert events.subdomain.host_stem == "api.publicapis"
assert "api.publicapis.org" in events.domain
@@ -86,7 +95,11 @@ async def test_events(events, helpers):
assert "port" not in e.json()
# url tests
- assert scan.make_event("http://evilcorp.com", dummy=True) == scan.make_event("http://evilcorp.com/", dummy=True)
+ url_no_trailing_slash = scan.make_event("http://evilcorp.com", dummy=True)
+ url_trailing_slash = scan.make_event("http://evilcorp.com/", dummy=True)
+ assert url_no_trailing_slash == url_trailing_slash
+ assert url_no_trailing_slash.host_filterable == "http://evilcorp.com/"
+ assert url_trailing_slash.host_filterable == "http://evilcorp.com/"
assert events.url_unverified.host == "api.publicapis.org"
assert events.url_unverified in events.domain
assert events.url_unverified in events.subdomain
@@ -129,6 +142,7 @@ async def test_events(events, helpers):
assert events.http_response.port == 80
assert events.http_response.parsed_url.scheme == "http"
assert events.http_response.with_port().geturl() == "http://example.com:80/"
+ assert events.http_response.host_filterable == "http://example.com/"
http_response = scan.make_event(
{
diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py
index d13f4f0aa..76cf63517 100644
--- a/bbot/test/test_step_1/test_helpers.py
+++ b/bbot/test/test_step_1/test_helpers.py
@@ -93,8 +93,23 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_httpserver):
ipaddress.ip_network("0.0.0.0/0"),
]
assert helpers.is_ip("127.0.0.1")
+ assert helpers.is_ip("127.0.0.1", include_network=True)
+ assert helpers.is_ip("127.0.0.1", version=4)
+ assert not helpers.is_ip("127.0.0.1", version=6)
assert not helpers.is_ip("127.0.0.0.1")
+ assert helpers.is_ip("dead::beef")
+ assert helpers.is_ip("dead::beef", include_network=True)
+ assert not helpers.is_ip("dead::beef", version=4)
+ assert helpers.is_ip("dead::beef", version=6)
+ assert not helpers.is_ip("dead:::beef")
+
+ assert not helpers.is_ip("1.2.3.4/24")
+ assert helpers.is_ip("1.2.3.4/24", include_network=True)
+ assert not helpers.is_ip("1.2.3.4/24", version=4)
+ assert helpers.is_ip("1.2.3.4/24", include_network=True, version=4)
+ assert not helpers.is_ip("1.2.3.4/24", include_network=True, version=6)
+
assert not helpers.is_ip_type("127.0.0.1")
assert helpers.is_ip_type(ipaddress.ip_address("127.0.0.1"))
assert not helpers.is_ip_type(ipaddress.ip_address("127.0.0.1"), network=True)
@@ -104,6 +119,8 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_httpserver):
assert not helpers.is_ip_type(ipaddress.ip_network("127.0.0.0/8"), network=False)
assert helpers.is_dns_name("evilcorp.com")
+ assert not helpers.is_dns_name("evilcorp.com:80")
+ assert not helpers.is_dns_name("http://evilcorp.com:80")
assert helpers.is_dns_name("evilcorp")
assert not helpers.is_dns_name("evilcorp", include_local=False)
assert helpers.is_dns_name("ドメイン.テスト")
diff --git a/bbot/test/test_step_1/test_presets.py b/bbot/test/test_step_1/test_presets.py
index cb7cbc5cb..1b11529ea 100644
--- a/bbot/test/test_step_1/test_presets.py
+++ b/bbot/test/test_step_1/test_presets.py
@@ -88,9 +88,13 @@ def test_preset_yaml(clean_default_config):
config={"preset_test_asdf": 1},
)
preset1 = preset1.bake()
- assert "evilcorp.com" in preset1.target
+ assert "evilcorp.com" in preset1.target.seeds
+ assert "evilcorp.ce" not in preset1.target.seeds
+ assert "asdf.www.evilcorp.ce" in preset1.target.seeds
assert "evilcorp.ce" in preset1.whitelist
+ assert "asdf.evilcorp.ce" in preset1.whitelist
assert "test.www.evilcorp.ce" in preset1.blacklist
+ assert "asdf.test.www.evilcorp.ce" in preset1.blacklist
assert "sslcert" in preset1.scan_modules
assert preset1.whitelisted("evilcorp.ce")
assert preset1.whitelisted("www.evilcorp.ce")
@@ -170,12 +174,14 @@ def test_preset_scope():
# test target merging
scan = Scanner("1.2.3.4", preset=Preset.from_dict({"target": ["evilcorp.com"]}))
- assert set([str(h) for h in scan.preset.target.seeds.hosts]) == {"1.2.3.4", "evilcorp.com"}
- assert set([e.data for e in scan.target]) == {"1.2.3.4", "evilcorp.com"}
+ assert set([str(h) for h in scan.preset.target.seeds.hosts]) == {"1.2.3.4/32", "evilcorp.com"}
+ assert set([e.data for e in scan.target.seeds]) == {"1.2.3.4", "evilcorp.com"}
+ assert set([e.data for e in scan.target.whitelist]) == {"1.2.3.4", "evilcorp.com"}
blank_preset = Preset()
blank_preset = blank_preset.bake()
- assert not blank_preset.target
+ assert not blank_preset.target.seeds
+ assert not blank_preset.target.whitelist
assert blank_preset.strict_scope == False
preset1 = Preset(
@@ -187,10 +193,11 @@ def test_preset_scope():
preset1_baked = preset1.bake()
# make sure target logic works as expected
- assert "evilcorp.com" in preset1_baked.target
- assert "asdf.evilcorp.com" in preset1_baked.target
- assert "asdf.www.evilcorp.ce" in preset1_baked.target
- assert not "evilcorp.ce" in preset1_baked.target
+ assert "evilcorp.com" in preset1_baked.target.seeds
+ assert not "evilcorp.com" in preset1_baked.target.whitelist
+ assert "asdf.evilcorp.com" in preset1_baked.target.seeds
+ assert not "asdf.evilcorp.com" in preset1_baked.target.whitelist
+ assert "asdf.evilcorp.ce" in preset1_baked.whitelist
assert "evilcorp.ce" in preset1_baked.whitelist
assert "test.www.evilcorp.ce" in preset1_baked.blacklist
assert not "evilcorp.ce" in preset1_baked.blacklist
@@ -217,17 +224,21 @@ def test_preset_scope():
preset1_baked = preset1.bake()
# targets should be merged
- assert "evilcorp.com" in preset1_baked.target
- assert "www.evilcorp.ce" in preset1_baked.target
- assert "evilcorp.org" in preset1_baked.target
+ assert "evilcorp.com" in preset1_baked.target.seeds
+ assert "www.evilcorp.ce" in preset1_baked.target.seeds
+ assert "evilcorp.org" in preset1_baked.target.seeds
# strict scope is enabled
- assert not "asdf.evilcorp.com" in preset1_baked.target
- assert not "asdf.www.evilcorp.ce" in preset1_baked.target
+ assert not "asdf.www.evilcorp.ce" in preset1_baked.target.seeds
+ assert not "asdf.evilcorp.org" in preset1_baked.target.seeds
+ assert not "asdf.evilcorp.com" in preset1_baked.target.seeds
+ assert not "asdf.www.evilcorp.ce" in preset1_baked.target.seeds
assert "evilcorp.ce" in preset1_baked.whitelist
assert "evilcorp.de" in preset1_baked.whitelist
assert not "asdf.evilcorp.de" in preset1_baked.whitelist
assert not "asdf.evilcorp.ce" in preset1_baked.whitelist
# blacklist should be merged, strict scope does not apply
+ assert "test.www.evilcorp.ce" in preset1_baked.blacklist
+ assert "test.www.evilcorp.de" in preset1_baked.blacklist
assert "asdf.test.www.evilcorp.ce" in preset1_baked.blacklist
assert "asdf.test.www.evilcorp.de" in preset1_baked.blacklist
assert not "asdf.test.www.evilcorp.org" in preset1_baked.blacklist
@@ -263,14 +274,14 @@ def test_preset_scope():
}
assert preset_whitelist_baked.to_dict(include_target=True) == {
"target": ["evilcorp.org"],
- "whitelist": ["1.2.3.0/24", "evilcorp.net"],
- "blacklist": ["evilcorp.co.uk"],
+ "whitelist": ["1.2.3.0/24", "http://evilcorp.net/"],
+ "blacklist": ["bob@evilcorp.co.uk", "evilcorp.co.uk:443"],
"config": {"modules": {"secretsdb": {"api_key": "deadbeef", "otherthing": "asdf"}}},
}
assert preset_whitelist_baked.to_dict(include_target=True, redact_secrets=True) == {
"target": ["evilcorp.org"],
- "whitelist": ["1.2.3.0/24", "evilcorp.net"],
- "blacklist": ["evilcorp.co.uk"],
+ "whitelist": ["1.2.3.0/24", "http://evilcorp.net/"],
+ "blacklist": ["bob@evilcorp.co.uk", "evilcorp.co.uk:443"],
"config": {"modules": {"secretsdb": {"otherthing": "asdf"}}},
}
@@ -278,7 +289,8 @@ def test_preset_scope():
assert not preset_nowhitelist_baked.in_scope("www.evilcorp.de")
assert not preset_nowhitelist_baked.in_scope("1.2.3.4/24")
- assert "www.evilcorp.org" in preset_whitelist_baked.target
+ assert "www.evilcorp.org" in preset_whitelist_baked.target.seeds
+ assert not "www.evilcorp.org" in preset_whitelist_baked.target.whitelist
assert "1.2.3.4" in preset_whitelist_baked.whitelist
assert not preset_whitelist_baked.in_scope("www.evilcorp.org")
assert not preset_whitelist_baked.in_scope("www.evilcorp.de")
@@ -291,17 +303,17 @@ def test_preset_scope():
assert preset_whitelist_baked.whitelisted("1.2.3.4/28")
assert preset_whitelist_baked.whitelisted("1.2.3.4/24")
- assert set([e.data for e in preset_nowhitelist_baked.target]) == {"evilcorp.com"}
- assert set([e.data for e in preset_whitelist_baked.target]) == {"evilcorp.org"}
+ assert set([e.data for e in preset_nowhitelist_baked.seeds]) == {"evilcorp.com"}
assert set([e.data for e in preset_nowhitelist_baked.whitelist]) == {"evilcorp.com"}
- assert set([e.data for e in preset_whitelist_baked.whitelist]) == {"1.2.3.0/24", "evilcorp.net"}
+ assert set([e.data for e in preset_whitelist_baked.seeds]) == {"evilcorp.org"}
+ assert set([e.data for e in preset_whitelist_baked.whitelist]) == {"1.2.3.0/24", "http://evilcorp.net/"}
preset_nowhitelist.merge(preset_whitelist)
preset_nowhitelist_baked = preset_nowhitelist.bake()
- assert set([e.data for e in preset_nowhitelist_baked.target]) == {"evilcorp.com", "evilcorp.org"}
- assert set([e.data for e in preset_nowhitelist_baked.whitelist]) == {"1.2.3.0/24", "evilcorp.net"}
- assert "www.evilcorp.org" in preset_nowhitelist_baked.target
- assert "www.evilcorp.com" in preset_nowhitelist_baked.target
+ assert set([e.data for e in preset_nowhitelist_baked.seeds]) == {"evilcorp.com", "evilcorp.org"}
+ assert set([e.data for e in preset_nowhitelist_baked.whitelist]) == {"1.2.3.0/24", "http://evilcorp.net/"}
+ assert "www.evilcorp.org" in preset_nowhitelist_baked.seeds
+ assert "www.evilcorp.com" in preset_nowhitelist_baked.seeds
assert "1.2.3.4" in preset_nowhitelist_baked.whitelist
assert not preset_nowhitelist_baked.in_scope("www.evilcorp.org")
assert not preset_nowhitelist_baked.in_scope("www.evilcorp.com")
@@ -313,10 +325,12 @@ def test_preset_scope():
preset_whitelist = Preset("evilcorp.org", whitelist=["1.2.3.4/24"])
preset_whitelist.merge(preset_nowhitelist)
preset_whitelist_baked = preset_whitelist.bake()
- assert set([e.data for e in preset_whitelist_baked.target]) == {"evilcorp.com", "evilcorp.org"}
+ assert set([e.data for e in preset_whitelist_baked.seeds]) == {"evilcorp.com", "evilcorp.org"}
assert set([e.data for e in preset_whitelist_baked.whitelist]) == {"1.2.3.0/24"}
- assert "www.evilcorp.org" in preset_whitelist_baked.target
- assert "www.evilcorp.com" in preset_whitelist_baked.target
+ assert "www.evilcorp.org" in preset_whitelist_baked.seeds
+ assert "www.evilcorp.com" in preset_whitelist_baked.seeds
+ assert not "www.evilcorp.org" in preset_whitelist_baked.target.whitelist
+ assert not "www.evilcorp.com" in preset_whitelist_baked.target.whitelist
assert "1.2.3.4" in preset_whitelist_baked.whitelist
assert not preset_whitelist_baked.in_scope("www.evilcorp.org")
assert not preset_whitelist_baked.in_scope("www.evilcorp.com")
@@ -328,18 +342,18 @@ def test_preset_scope():
preset_nowhitelist2 = Preset("evilcorp.de")
preset_nowhitelist1_baked = preset_nowhitelist1.bake()
preset_nowhitelist2_baked = preset_nowhitelist2.bake()
- assert set([e.data for e in preset_nowhitelist1_baked.target]) == {"evilcorp.com"}
- assert set([e.data for e in preset_nowhitelist2_baked.target]) == {"evilcorp.de"}
+ assert set([e.data for e in preset_nowhitelist1_baked.seeds]) == {"evilcorp.com"}
+ assert set([e.data for e in preset_nowhitelist2_baked.seeds]) == {"evilcorp.de"}
assert set([e.data for e in preset_nowhitelist1_baked.whitelist]) == {"evilcorp.com"}
assert set([e.data for e in preset_nowhitelist2_baked.whitelist]) == {"evilcorp.de"}
preset_nowhitelist1.merge(preset_nowhitelist2)
preset_nowhitelist1_baked = preset_nowhitelist1.bake()
- assert set([e.data for e in preset_nowhitelist1_baked.target]) == {"evilcorp.com", "evilcorp.de"}
- assert set([e.data for e in preset_nowhitelist2_baked.target]) == {"evilcorp.de"}
+ assert set([e.data for e in preset_nowhitelist1_baked.seeds]) == {"evilcorp.com", "evilcorp.de"}
+ assert set([e.data for e in preset_nowhitelist2_baked.seeds]) == {"evilcorp.de"}
assert set([e.data for e in preset_nowhitelist1_baked.whitelist]) == {"evilcorp.com", "evilcorp.de"}
assert set([e.data for e in preset_nowhitelist2_baked.whitelist]) == {"evilcorp.de"}
- assert "www.evilcorp.com" in preset_nowhitelist1_baked.target
- assert "www.evilcorp.de" in preset_nowhitelist1_baked.target
+ assert "www.evilcorp.com" in preset_nowhitelist1_baked.seeds
+ assert "www.evilcorp.de" in preset_nowhitelist1_baked.seeds
assert "www.evilcorp.com" in preset_nowhitelist1_baked.target.seeds
assert "www.evilcorp.de" in preset_nowhitelist1_baked.target.seeds
assert "www.evilcorp.com" in preset_nowhitelist1_baked.whitelist
@@ -356,8 +370,8 @@ def test_preset_scope():
preset_nowhitelist2.merge(preset_nowhitelist1)
preset_nowhitelist1_baked = preset_nowhitelist1.bake()
preset_nowhitelist2_baked = preset_nowhitelist2.bake()
- assert set([e.data for e in preset_nowhitelist1_baked.target]) == {"evilcorp.com"}
- assert set([e.data for e in preset_nowhitelist2_baked.target]) == {"evilcorp.com", "evilcorp.de"}
+ assert set([e.data for e in preset_nowhitelist1_baked.seeds]) == {"evilcorp.com"}
+ assert set([e.data for e in preset_nowhitelist2_baked.seeds]) == {"evilcorp.com", "evilcorp.de"}
assert set([e.data for e in preset_nowhitelist1_baked.whitelist]) == {"evilcorp.com"}
assert set([e.data for e in preset_nowhitelist2_baked.whitelist]) == {"evilcorp.com", "evilcorp.de"}
diff --git a/bbot/test/test_step_1/test_python_api.py b/bbot/test/test_step_1/test_python_api.py
index 60ab89286..eaa9636b1 100644
--- a/bbot/test/test_step_1/test_python_api.py
+++ b/bbot/test/test_step_1/test_python_api.py
@@ -84,6 +84,10 @@ def test_python_api_sync():
def test_python_api_validation():
from bbot.scanner import Scanner, Preset
+ # invalid target
+ with pytest.raises(ValidationError) as error:
+ Scanner("asdf:::asdf")
+ assert str(error.value) == 'Unable to autodetect event type from "asdf:::asdf"'
# invalid module
with pytest.raises(ValidationError) as error:
Scanner(modules=["asdf"])
diff --git a/bbot/test/test_step_1/test_scan.py b/bbot/test/test_step_1/test_scan.py
index 3f80807af..f5f845826 100644
--- a/bbot/test/test_step_1/test_scan.py
+++ b/bbot/test/test_step_1/test_scan.py
@@ -1,3 +1,5 @@
+from ipaddress import ip_network
+
from ..bbot_fixtures import *
@@ -12,6 +14,7 @@ async def test_scan(
"1.1.1.0",
"1.1.1.1/31",
"evilcorp.com",
+ "test.evilcorp.com",
blacklist=["1.1.1.1/28", "www.evilcorp.com"],
modules=["ipneighbor"],
)
@@ -31,8 +34,11 @@ async def test_scan(
assert not scan0.in_scope("test.www.evilcorp.com")
assert not scan0.in_scope("www.evilcorp.co.uk")
j = scan0.json
- assert set(j["target"]["seeds"]) == {"1.1.1.0", "1.1.1.0/31", "evilcorp.com"}
- assert set(j["target"]["whitelist"]) == {"1.1.1.0/31", "evilcorp.com"}
+ assert set(j["target"]["seeds"]) == {"1.1.1.0", "1.1.1.0/31", "evilcorp.com", "test.evilcorp.com"}
+ # we preserve the original whitelist inputs
+ assert set(j["target"]["whitelist"]) == {"1.1.1.0", "1.1.1.0/31", "evilcorp.com", "test.evilcorp.com"}
+ # but in the background they are collapsed
+ assert scan0.target.whitelist.hosts == {ip_network("1.1.1.0/31"), "evilcorp.com"}
assert set(j["target"]["blacklist"]) == {"1.1.1.0/28", "www.evilcorp.com"}
assert "ipneighbor" in j["preset"]["modules"]
diff --git a/bbot/test/test_step_1/test_target.py b/bbot/test/test_step_1/test_target.py
index 5b974bd45..0513d6abe 100644
--- a/bbot/test/test_step_1/test_target.py
+++ b/bbot/test/test_step_1/test_target.py
@@ -3,39 +3,31 @@
@pytest.mark.asyncio
async def test_target(bbot_scanner):
- import random
+ from radixtarget import RadixTarget
from ipaddress import ip_address, ip_network
- from bbot.scanner.target import Target, BBOTTarget
+ from bbot.scanner.target import BBOTTarget, ScanSeeds
scan1 = bbot_scanner("api.publicapis.org", "8.8.8.8/30", "2001:4860:4860::8888/126")
scan2 = bbot_scanner("8.8.8.8/29", "publicapis.org", "2001:4860:4860::8888/125")
scan3 = bbot_scanner("8.8.8.8/29", "publicapis.org", "2001:4860:4860::8888/125")
scan4 = bbot_scanner("8.8.8.8/29")
scan5 = bbot_scanner()
- assert not scan5.target
- assert len(scan1.target) == 9
- assert len(scan4.target) == 8
- assert "8.8.8.9" in scan1.target
- assert "8.8.8.12" not in scan1.target
- assert "8.8.8.8/31" in scan1.target
- assert "8.8.8.8/30" in scan1.target
- assert "8.8.8.8/29" not in scan1.target
- assert "2001:4860:4860::8889" in scan1.target
- assert "2001:4860:4860::888c" not in scan1.target
- assert "www.api.publicapis.org" in scan1.target
- assert "api.publicapis.org" in scan1.target
- assert "publicapis.org" not in scan1.target
- assert "bob@www.api.publicapis.org" in scan1.target
- assert "https://www.api.publicapis.org" in scan1.target
- assert "www.api.publicapis.org:80" in scan1.target
- assert scan1.make_event("https://[2001:4860:4860::8888]:80", dummy=True) in scan1.target
- assert scan1.make_event("[2001:4860:4860::8888]:80", "OPEN_TCP_PORT", dummy=True) in scan1.target
- assert scan1.make_event("[2001:4860:4860::888c]:80", "OPEN_TCP_PORT", dummy=True) not in scan1.target
- assert scan1.target in scan2.target
- assert scan2.target not in scan1.target
- assert scan3.target in scan2.target
- assert scan2.target == scan3.target
- assert scan4.target != scan1.target
+
+ # test different types of inputs
+ target = BBOTTarget("evilcorp.com", "1.2.3.4/8")
+ assert "www.evilcorp.com" in target.seeds
+ assert "www.evilcorp.com:80" in target.seeds
+ assert "http://www.evilcorp.com:80" in target.seeds
+ assert "1.2.3.4" in target.seeds
+ assert "1.2.3.4/24" in target.seeds
+ assert ip_address("1.2.3.4") in target.seeds
+ assert ip_network("1.2.3.4/24", strict=False) in target.seeds
+ event = scan1.make_event("https://www.evilcorp.com:80", dummy=True)
+ assert event in target.seeds
+ with pytest.raises(ValueError):
+ ["asdf"] in target.seeds
+ with pytest.raises(ValueError):
+ target.seeds.get(["asdf"])
assert not scan5.target.seeds
assert len(scan1.target.seeds) == 9
@@ -56,6 +48,36 @@ async def test_target(bbot_scanner):
assert scan1.make_event("https://[2001:4860:4860::8888]:80", dummy=True) in scan1.target.seeds
assert scan1.make_event("[2001:4860:4860::8888]:80", "OPEN_TCP_PORT", dummy=True) in scan1.target.seeds
assert scan1.make_event("[2001:4860:4860::888c]:80", "OPEN_TCP_PORT", dummy=True) not in scan1.target.seeds
+ assert scan1.target.seeds in scan2.target.seeds
+ assert scan2.target.seeds not in scan1.target.seeds
+ assert scan3.target.seeds in scan2.target.seeds
+ assert scan2.target.seeds == scan3.target.seeds
+ assert scan4.target.seeds != scan1.target.seeds
+
+ assert not scan5.target.whitelist
+ assert len(scan1.target.whitelist) == 9
+ assert len(scan4.target.whitelist) == 8
+ assert "8.8.8.9" in scan1.target.whitelist
+ assert "8.8.8.12" not in scan1.target.whitelist
+ assert "8.8.8.8/31" in scan1.target.whitelist
+ assert "8.8.8.8/30" in scan1.target.whitelist
+ assert "8.8.8.8/29" not in scan1.target.whitelist
+ assert "2001:4860:4860::8889" in scan1.target.whitelist
+ assert "2001:4860:4860::888c" not in scan1.target.whitelist
+ assert "www.api.publicapis.org" in scan1.target.whitelist
+ assert "api.publicapis.org" in scan1.target.whitelist
+ assert "publicapis.org" not in scan1.target.whitelist
+ assert "bob@www.api.publicapis.org" in scan1.target.whitelist
+ assert "https://www.api.publicapis.org" in scan1.target.whitelist
+ assert "www.api.publicapis.org:80" in scan1.target.whitelist
+ assert scan1.make_event("https://[2001:4860:4860::8888]:80", dummy=True) in scan1.target.whitelist
+ assert scan1.make_event("[2001:4860:4860::8888]:80", "OPEN_TCP_PORT", dummy=True) in scan1.target.whitelist
+ assert scan1.make_event("[2001:4860:4860::888c]:80", "OPEN_TCP_PORT", dummy=True) not in scan1.target.whitelist
+ assert scan1.target.whitelist in scan2.target.whitelist
+ assert scan2.target.whitelist not in scan1.target.whitelist
+ assert scan3.target.whitelist in scan2.target.whitelist
+ assert scan2.target.whitelist == scan3.target.whitelist
+ assert scan4.target.whitelist != scan1.target.whitelist
assert scan1.whitelisted("https://[2001:4860:4860::8888]:80")
assert scan1.whitelisted("[2001:4860:4860::8888]:80")
@@ -70,28 +92,34 @@ async def test_target(bbot_scanner):
assert scan2.target.seeds == scan3.target.seeds
assert scan4.target.seeds != scan1.target.seeds
- assert str(scan1.target.get("8.8.8.9").host) == "8.8.8.8/30"
- assert scan1.target.get("8.8.8.12") is None
- assert str(scan1.target.get("2001:4860:4860::8889").host) == "2001:4860:4860::8888/126"
- assert scan1.target.get("2001:4860:4860::888c") is None
- assert str(scan1.target.get("www.api.publicapis.org").host) == "api.publicapis.org"
- assert scan1.target.get("publicapis.org") is None
-
- target = Target("evilcorp.com")
+ assert str(scan1.target.seeds.get("8.8.8.9").host) == "8.8.8.8/30"
+ assert str(scan1.target.whitelist.get("8.8.8.9").host) == "8.8.8.8/30"
+ assert scan1.target.seeds.get("8.8.8.12") is None
+ assert scan1.target.whitelist.get("8.8.8.12") is None
+ assert str(scan1.target.seeds.get("2001:4860:4860::8889").host) == "2001:4860:4860::8888/126"
+ assert str(scan1.target.whitelist.get("2001:4860:4860::8889").host) == "2001:4860:4860::8888/126"
+ assert scan1.target.seeds.get("2001:4860:4860::888c") is None
+ assert scan1.target.whitelist.get("2001:4860:4860::888c") is None
+ assert str(scan1.target.seeds.get("www.api.publicapis.org").host) == "api.publicapis.org"
+ assert str(scan1.target.whitelist.get("www.api.publicapis.org").host) == "api.publicapis.org"
+ assert scan1.target.seeds.get("publicapis.org") is None
+ assert scan1.target.whitelist.get("publicapis.org") is None
+
+ target = RadixTarget("evilcorp.com")
assert not "com" in target
assert "evilcorp.com" in target
assert "www.evilcorp.com" in target
- strict_target = Target("evilcorp.com", strict_scope=True)
+ strict_target = RadixTarget("evilcorp.com", strict_dns_scope=True)
assert not "com" in strict_target
assert "evilcorp.com" in strict_target
assert not "www.evilcorp.com" in strict_target
- target = Target()
+ target = RadixTarget()
target.add("evilcorp.com")
assert not "com" in target
assert "evilcorp.com" in target
assert "www.evilcorp.com" in target
- strict_target = Target(strict_scope=True)
+ strict_target = RadixTarget(strict_dns_scope=True)
strict_target.add("evilcorp.com")
assert not "com" in strict_target
assert "evilcorp.com" in strict_target
@@ -99,16 +127,23 @@ async def test_target(bbot_scanner):
# test target hashing
- target1 = Target()
- target1.add("evilcorp.com")
- target1.add("1.2.3.4/24")
- target1.add("https://evilcorp.net:8080")
-
- target2 = Target()
- target2.add("bob@evilcorp.org")
- target2.add("evilcorp.com")
- target2.add("1.2.3.4/24")
- target2.add("https://evilcorp.net:8080")
+ target1 = BBOTTarget()
+ target1.whitelist.add("evilcorp.com")
+ target1.whitelist.add("1.2.3.4/24")
+ target1.whitelist.add("https://evilcorp.net:8080")
+ target1.seeds.add("evilcorp.com")
+ target1.seeds.add("1.2.3.4/24")
+ target1.seeds.add("https://evilcorp.net:8080")
+
+ target2 = BBOTTarget()
+ target2.whitelist.add("bob@evilcorp.org")
+ target2.whitelist.add("evilcorp.com")
+ target2.whitelist.add("1.2.3.4/24")
+ target2.whitelist.add("https://evilcorp.net:8080")
+ target2.seeds.add("bob@evilcorp.org")
+ target2.seeds.add("evilcorp.com")
+ target2.seeds.add("1.2.3.4/24")
+ target2.seeds.add("https://evilcorp.net:8080")
# make sure it's a sha1 hash
assert isinstance(target1.hash, bytes)
@@ -116,11 +151,22 @@ async def test_target(bbot_scanner):
# hashes shouldn't match yet
assert target1.hash != target2.hash
+ assert target1.scope_hash != target2.scope_hash
# add missing email
- target1.add("bob@evilcorp.org")
+ target1.whitelist.add("bob@evilcorp.org")
+ assert target1.hash != target2.hash
+ assert target1.scope_hash == target2.scope_hash
+ target1.seeds.add("bob@evilcorp.org")
# now they should match
assert target1.hash == target2.hash
+ # test default whitelist
+ bbottarget = BBOTTarget("http://1.2.3.4:8443", "bob@evilcorp.com")
+ assert bbottarget.seeds.hosts == {ip_network("1.2.3.4"), "evilcorp.com"}
+ assert bbottarget.whitelist.hosts == {ip_network("1.2.3.4"), "evilcorp.com"}
+ assert set([e.data for e in bbottarget.seeds.events]) == {"http://1.2.3.4:8443/", "bob@evilcorp.com"}
+ assert set([e.data for e in bbottarget.whitelist.events]) == {"1.2.3.4", "evilcorp.com"}
+
bbottarget1 = BBOTTarget("evilcorp.com", "evilcorp.net", whitelist=["1.2.3.4/24"], blacklist=["1.2.3.4"])
bbottarget2 = BBOTTarget("evilcorp.com", "evilcorp.net", whitelist=["1.2.3.0/24"], blacklist=["1.2.3.4"])
bbottarget3 = BBOTTarget("evilcorp.com", whitelist=["1.2.3.4/24"], blacklist=["1.2.3.4"])
@@ -137,14 +183,23 @@ async def test_target(bbot_scanner):
assert bbottarget1 == bbottarget2
assert bbottarget2 == bbottarget1
+ # 1 and 3 have different seeds
assert bbottarget1 != bbottarget3
assert bbottarget3 != bbottarget1
- bbottarget3.add("evilcorp.net")
+ # until we make them the same
+ bbottarget3.seeds.add("evilcorp.net")
assert bbottarget1 == bbottarget3
assert bbottarget3 == bbottarget1
- bbottarget1.add("http://evilcorp.co.nz")
- bbottarget2.add("evilcorp.co.nz")
+ # adding different events (but with same host) to whitelist should not change hash (since only hosts matter)
+ bbottarget1.whitelist.add("http://evilcorp.co.nz")
+ bbottarget2.whitelist.add("evilcorp.co.nz")
+ assert bbottarget1 == bbottarget2
+ assert bbottarget2 == bbottarget1
+
+ # but seeds should change hash
+ bbottarget1.seeds.add("http://evilcorp.co.nz")
+ bbottarget2.seeds.add("evilcorp.co.nz")
assert bbottarget1 != bbottarget2
assert bbottarget2 != bbottarget1
@@ -156,15 +211,11 @@ async def test_target(bbot_scanner):
assert bbottarget8 != bbottarget9
assert bbottarget9 != bbottarget8
- bbottarget10 = bbottarget9.copy()
- assert bbottarget10 == bbottarget9
- assert bbottarget9 == bbottarget10
-
# make sure duplicate events don't change hash
- target1 = Target("https://evilcorp.com")
- target2 = Target("https://evilcorp.com")
+ target1 = BBOTTarget("https://evilcorp.com")
+ target2 = BBOTTarget("https://evilcorp.com")
assert target1 == target2
- target1.add("https://evilcorp.com:443")
+ target1.seeds.add("https://evilcorp.com:443")
assert target1 == target2
# make sure hosts are collapsed in whitelist and blacklist
@@ -173,10 +224,12 @@ async def test_target(bbot_scanner):
whitelist=["evilcorp.net:443", "http://evilcorp.net:8080"],
blacklist=["http://evilcorp.org:8080", "evilcorp.org:443"],
)
- assert list(bbottarget) == ["http://evilcorp.com:8080"]
+ # base class is not iterable
+ with pytest.raises(TypeError):
+ assert list(bbottarget) == ["http://evilcorp.com:8080"]
assert list(bbottarget.seeds) == ["http://evilcorp.com:8080"]
- assert list(bbottarget.whitelist) == ["evilcorp.net"]
- assert list(bbottarget.blacklist) == ["evilcorp.org"]
+ assert set([e.data for e in bbottarget.whitelist]) == {"evilcorp.net:443", "http://evilcorp.net:8080/"}
+ assert set([e.data for e in bbottarget.blacklist]) == {"http://evilcorp.org:8080/", "evilcorp.org:443"}
# test org stub as target
for org_target in ("ORG:evilcorp", "ORG_STUB:evilcorp"):
@@ -205,16 +258,25 @@ async def test_target(bbot_scanner):
"http://www.evilcorp.net/",
"bob@fdsa.evilcorp.net",
}
- assert set([e.data for e in bbottarget.whitelist.events]) == {"evilcorp.com", "evilcorp.net"}
- assert set([e.data for e in bbottarget.blacklist.events]) == {"1.2.3.4", "4.3.2.0/24", "asdf.evilcorp.net"}
+ assert set([e.data for e in bbottarget.whitelist.events]) == {
+ "evilcorp.com",
+ "evilcorp.net",
+ "bob@www.evilcorp.com",
+ }
+ assert set([e.data for e in bbottarget.blacklist.events]) == {
+ "1.2.3.4",
+ "4.3.2.0/24",
+ "http://1.2.3.4/",
+ "bob@asdf.evilcorp.net",
+ }
assert set(bbottarget.seeds.hosts) == {ip_network("1.2.3.0/24"), "www.evilcorp.net", "fdsa.evilcorp.net"}
assert set(bbottarget.whitelist.hosts) == {"evilcorp.com", "evilcorp.net"}
- assert set(bbottarget.blacklist.hosts) == {ip_address("1.2.3.4"), ip_network("4.3.2.0/24"), "asdf.evilcorp.net"}
- assert bbottarget.hash == b"\x0b\x908\xe3\xef\n=\x13d\xdf\x00;\xack\x0c\xbc\xd2\xcc'\xba"
- assert bbottarget.scope_hash == b"\x00\xf5V\xfb.\xeb#\xcb\xf0q\xf9\xe9e\xb7\x1f\xe2T+\xdbw"
- assert bbottarget.seeds.hash == b"\xaf.\x86\x83\xa1C\xad\xb4\xe7`X\x94\xe2\xa0\x01\xc2\xe3:J\xc5"
- assert bbottarget.whitelist.hash == b"\xa0Af\x07n\x10\xd9\xb6\n\xa7TO\xb07\xcdW\xc4vLC"
- assert bbottarget.blacklist.hash == b"\xaf\x0e\x8a\xe9JZ\x86\xbe\xee\xa9\xa9\xdb0\xaf'#\x84 U/"
+ assert set(bbottarget.blacklist.hosts) == {ip_network("1.2.3.4/32"), ip_network("4.3.2.0/24"), "asdf.evilcorp.net"}
+ assert bbottarget.hash == b"\xb3iU\xa8#\x8aq\x84/\xc5\xf2;\x11\x11\x0c&\xea\x07\xd4Q"
+ assert bbottarget.scope_hash == b"f\xe1\x01c^3\xf5\xd24B\x87P\xa0Glq0p3J"
+ assert bbottarget.seeds.hash == b"V\n\xf5\x1d\x1f=i\xbc\\\x15o\xc2p\xb2\x84\x97\xfeR\xde\xc1"
+ assert bbottarget.whitelist.hash == b"\x8e\xd0\xa76\x8em4c\x0e\x1c\xfdA\x9d*sv}\xeb\xc4\xc4"
+ assert bbottarget.blacklist.hash == b'\xf7\xaf\xa1\xda4"C:\x13\xf42\xc3,\xc3\xa9\x9f\x15\x15n\\'
scan = bbot_scanner(
"http://www.evilcorp.net",
@@ -227,72 +289,35 @@ async def test_target(bbot_scanner):
scan_events = [e for e in events if e.type == "SCAN"]
assert len(scan_events) == 2
target_dict = scan_events[0].data["target"]
+
+ assert target_dict["seeds"] == ["1.2.3.0/24", "bob@fdsa.evilcorp.net", "http://www.evilcorp.net/"]
+ assert target_dict["whitelist"] == ["bob@www.evilcorp.com", "evilcorp.com", "evilcorp.net"]
+ assert target_dict["blacklist"] == ["1.2.3.4", "4.3.2.0/24", "bob@asdf.evilcorp.net", "http://1.2.3.4/"]
assert target_dict["strict_scope"] == False
- assert target_dict["hash"] == b"\x0b\x908\xe3\xef\n=\x13d\xdf\x00;\xack\x0c\xbc\xd2\xcc'\xba".hex()
- assert target_dict["scope_hash"] == b"\x00\xf5V\xfb.\xeb#\xcb\xf0q\xf9\xe9e\xb7\x1f\xe2T+\xdbw".hex()
- assert target_dict["seed_hash"] == b"\xaf.\x86\x83\xa1C\xad\xb4\xe7`X\x94\xe2\xa0\x01\xc2\xe3:J\xc5".hex()
- assert target_dict["whitelist_hash"] == b"\xa0Af\x07n\x10\xd9\xb6\n\xa7TO\xb07\xcdW\xc4vLC".hex()
- assert target_dict["blacklist_hash"] == b"\xaf\x0e\x8a\xe9JZ\x86\xbe\xee\xa9\xa9\xdb0\xaf'#\x84 U/".hex()
- assert target_dict["hash"] == "0b9038e3ef0a3d1364df003bac6b0cbcd2cc27ba"
- assert target_dict["scope_hash"] == "00f556fb2eeb23cbf071f9e965b71fe2542bdb77"
- assert target_dict["seed_hash"] == "af2e8683a143adb4e7605894e2a001c2e33a4ac5"
- assert target_dict["whitelist_hash"] == "a04166076e10d9b60aa7544fb037cd57c4764c43"
- assert target_dict["blacklist_hash"] == "af0e8ae94a5a86beeea9a9db30af27238420552f"
-
- # test target sorting
- big_subnet = scan.make_event("1.2.3.4/24", dummy=True)
- medium_subnet = scan.make_event("1.2.3.4/28", dummy=True)
- small_subnet = scan.make_event("1.2.3.4/30", dummy=True)
- ip_event = scan.make_event("1.2.3.4", dummy=True)
- parent_domain = scan.make_event("evilcorp.com", dummy=True)
- grandparent_domain = scan.make_event("www.evilcorp.com", dummy=True)
- greatgrandparent_domain = scan.make_event("api.www.evilcorp.com", dummy=True)
- target = Target()
- assert big_subnet._host_size == -256
- assert medium_subnet._host_size == -16
- assert small_subnet._host_size == -4
- assert ip_event._host_size == 1
- assert parent_domain._host_size == 12
- assert grandparent_domain._host_size == 16
- assert greatgrandparent_domain._host_size == 20
- events = [
- big_subnet,
- medium_subnet,
- small_subnet,
- ip_event,
- parent_domain,
- grandparent_domain,
- greatgrandparent_domain,
- ]
- random.shuffle(events)
- assert target._sort_events(events) == [
- big_subnet,
- medium_subnet,
- small_subnet,
- ip_event,
- parent_domain,
- grandparent_domain,
- greatgrandparent_domain,
- ]
+ assert target_dict["hash"] == "b36955a8238a71842fc5f23b11110c26ea07d451"
+ assert target_dict["seed_hash"] == "560af51d1f3d69bc5c156fc270b28497fe52dec1"
+ assert target_dict["whitelist_hash"] == "8ed0a7368e6d34630e1cfd419d2a73767debc4c4"
+ assert target_dict["blacklist_hash"] == "f7afa1da3422433a13f432c32cc3a99f15156e5c"
+ assert target_dict["scope_hash"] == "66e101635e33f5d234428750a0476c713070334a"
# make sure child subnets/IPs don't get added to whitelist/blacklist
- target = Target("1.2.3.4/24", "1.2.3.4/28", acl_mode=True)
- assert set(e.data for e in target) == {"1.2.3.0/24"}
- target = Target("1.2.3.4/28", "1.2.3.4/24", acl_mode=True)
- assert set(e.data for e in target) == {"1.2.3.0/24"}
- target = Target("1.2.3.4/28", "1.2.3.4", acl_mode=True)
- assert set(e.data for e in target) == {"1.2.3.0/28"}
- target = Target("1.2.3.4", "1.2.3.4/28", acl_mode=True)
- assert set(e.data for e in target) == {"1.2.3.0/28"}
+ target = RadixTarget("1.2.3.4/24", "1.2.3.4/28", acl_mode=True)
+ assert set(target) == {ip_network("1.2.3.0/24")}
+ target = RadixTarget("1.2.3.4/28", "1.2.3.4/24", acl_mode=True)
+ assert set(target) == {ip_network("1.2.3.0/24")}
+ target = RadixTarget("1.2.3.4/28", "1.2.3.4", acl_mode=True)
+ assert set(target) == {ip_network("1.2.3.0/28")}
+ target = RadixTarget("1.2.3.4", "1.2.3.4/28", acl_mode=True)
+ assert set(target) == {ip_network("1.2.3.0/28")}
# same but for domains
- target = Target("evilcorp.com", "www.evilcorp.com", acl_mode=True)
- assert set(e.data for e in target) == {"evilcorp.com"}
- target = Target("www.evilcorp.com", "evilcorp.com", acl_mode=True)
- assert set(e.data for e in target) == {"evilcorp.com"}
+ target = RadixTarget("evilcorp.com", "www.evilcorp.com", acl_mode=True)
+ assert set(target) == {"evilcorp.com"}
+ target = RadixTarget("www.evilcorp.com", "evilcorp.com", acl_mode=True)
+ assert set(target) == {"evilcorp.com"}
# make sure strict_scope doesn't mess us up
- target = Target("evilcorp.co.uk", "www.evilcorp.co.uk", acl_mode=True, strict_scope=True)
+ target = RadixTarget("evilcorp.co.uk", "www.evilcorp.co.uk", acl_mode=True, strict_dns_scope=True)
assert set(target.hosts) == {"evilcorp.co.uk", "www.evilcorp.co.uk"}
assert "evilcorp.co.uk" in target
assert "www.evilcorp.co.uk" in target
@@ -300,10 +325,83 @@ async def test_target(bbot_scanner):
assert not "api.www.evilcorp.co.uk" in target
# test 'single' boolean argument
- target = Target("http://evilcorp.com", "evilcorp.com:443")
+ target = ScanSeeds("http://evilcorp.com", "evilcorp.com:443")
assert "www.evilcorp.com" in target
+ assert "bob@evilcorp.com" in target
event = target.get("www.evilcorp.com")
assert event.host == "evilcorp.com"
events = target.get("www.evilcorp.com", single=False)
assert len(events) == 2
assert set([e.data for e in events]) == {"http://evilcorp.com/", "evilcorp.com:443"}
+
+
+@pytest.mark.asyncio
+async def test_blacklist_regex(bbot_scanner, bbot_httpserver):
+
+ from bbot.scanner.target import ScanBlacklist
+
+ blacklist = ScanBlacklist("evilcorp.com")
+ assert blacklist.inputs == {"evilcorp.com"}
+ assert "www.evilcorp.com" in blacklist
+ assert "http://www.evilcorp.com" in blacklist
+ blacklist.add("RE:test")
+ assert "RE:test" in blacklist.inputs
+ assert set(blacklist.inputs) == {"evilcorp.com", "RE:test"}
+ assert blacklist.blacklist_regexes
+ assert next(iter(blacklist.blacklist_regexes)).pattern == "test"
+ result1 = blacklist.get("test.com")
+ assert result1.type == "DNS_NAME"
+ assert result1.data == "test.com"
+ result2 = blacklist.get("www.evilcorp.com")
+ assert result2.type == "DNS_NAME"
+ assert result2.data == "evilcorp.com"
+ result2 = blacklist.get("www.evil.com")
+ assert result2 is None
+ with pytest.raises(KeyError):
+ blacklist.get("www.evil.com", raise_error=True)
+ assert "test.com" in blacklist
+ assert "http://evilcorp.com/test.aspx" in blacklist
+ assert not "http://tes.com" in blacklist
+
+ blacklist = ScanBlacklist("evilcorp.com", r"RE:[0-9]{6}\.aspx$")
+ assert "http://evilcorp.com" in blacklist
+ assert not "http://test.com/123456" in blacklist
+ assert not "http://test.com/12345.aspx?a=asdf" in blacklist
+ assert not "http://test.com/asdf/123456.aspx/asdf" in blacklist
+ assert "http://test.com/asdf/123456.aspx?a=asdf" in blacklist
+ assert "http://test.com/asdf/123456.aspx" in blacklist
+
+ bbot_httpserver.expect_request(uri="/").respond_with_data(
+ """
+
+
+ """
+ )
+ bbot_httpserver.expect_request(uri="/asdfevilasdf").respond_with_data("")
+ bbot_httpserver.expect_request(uri="/logout.aspx").respond_with_data("")
+
+ # make sure URL is detected normally
+ scan = bbot_scanner("http://127.0.0.1:8888/", presets=["spider"], config={"excavate": True}, debug=True)
+ assert set([r.pattern for r in scan.target.blacklist.blacklist_regexes]) == {r"/.*(sign|log)[_-]?out"}
+ events = [e async for e in scan.async_start()]
+ urls = [e.data for e in events if e.type == "URL"]
+ assert len(urls) == 2
+ assert set(urls) == {"http://127.0.0.1:8888/", "http://127.0.0.1:8888/asdfevil333asdf"}
+
+ # same scan again but with blacklist regex
+ scan = bbot_scanner(
+ "http://127.0.0.1:8888/",
+ blacklist=[r"RE:evil[0-9]{3}"],
+ presets=["spider"],
+ config={"excavate": True},
+ debug=True,
+ )
+ assert scan.target.blacklist.blacklist_regexes
+ assert set([r.pattern for r in scan.target.blacklist.blacklist_regexes]) == {
+ r"evil[0-9]{3}",
+ r"/.*(sign|log)[_-]?out",
+ }
+ events = [e async for e in scan.async_start()]
+ urls = [e.data for e in events if e.type == "URL"]
+ assert len(urls) == 1
+ assert set(urls) == {"http://127.0.0.1:8888/"}
diff --git a/bbot/test/test_step_2/module_tests/test_module_dastardly.py b/bbot/test/test_step_2/module_tests/test_module_dastardly.py
index cb4a501b8..83d081a14 100644
--- a/bbot/test/test_step_2/module_tests/test_module_dastardly.py
+++ b/bbot/test/test_step_2/module_tests/test_module_dastardly.py
@@ -44,7 +44,7 @@ async def setup_after_prep(self, module_test):
# get docker IP
docker_ip = await self.get_docker_ip(module_test)
- module_test.scan.target.add(docker_ip)
+ module_test.scan.target.seeds.add(docker_ip)
# replace 127.0.0.1 with docker host IP to allow dastardly access to local http server
old_filter_event = module_test.module.filter_event
diff --git a/bbot/test/test_step_2/module_tests/test_module_ffuf_shortnames.py b/bbot/test/test_step_2/module_tests/test_module_ffuf_shortnames.py
index 00c1f9b1e..85327e743 100644
--- a/bbot/test/test_step_2/module_tests/test_module_ffuf_shortnames.py
+++ b/bbot/test/test_step_2/module_tests/test_module_ffuf_shortnames.py
@@ -142,7 +142,7 @@ async def setup_after_prep(self, module_test):
tags=["shortname-file"],
)
)
- module_test.scan.target.seeds._events = set(seed_events)
+ module_test.scan.target.seeds.events = set(seed_events)
expect_args = {"method": "GET", "uri": "/administrator.aspx"}
respond_args = {"response_data": "alive"}
diff --git a/docs/scanning/index.md b/docs/scanning/index.md
index a7359730a..e82d9101f 100644
--- a/docs/scanning/index.md
+++ b/docs/scanning/index.md
@@ -178,6 +178,8 @@ Note that `--strict-scope` only applies to targets and whitelists, but not black
BBOT allows precise control over scope with whitelists and blacklists. These both use the same syntax as `--target`, meaning they accept the same event types, and you can specify an unlimited number of them, via a file, the CLI, or both.
+#### Whitelists
+
`--whitelist` enables you to override what's in scope. For example, if you want to run nuclei against `evilcorp.com`, but stay only inside their corporate IP range of `1.2.3.0/24`, you can accomplish this like so:
```bash
@@ -185,6 +187,8 @@ BBOT allows precise control over scope with whitelists and blacklists. These bot
bbot -t evilcorp.com --whitelist 1.2.3.0/24 -f subdomain-enum -m nmap nuclei --allow-deadly
```
+#### Blacklists
+
`--blacklist` takes ultimate precedence. Anything in the blacklist is completely excluded from the scan, even if it's in the whitelist.
```bash
@@ -192,6 +196,49 @@ bbot -t evilcorp.com --whitelist 1.2.3.0/24 -f subdomain-enum -m nmap nuclei --a
bbot -t evilcorp.com --blacklist internal.evilcorp.com -f subdomain-enum -m nmap nuclei --allow-deadly
```
+#### Blacklist by Regex
+
+Blacklists also accept regex patterns. These regexes are are checked against the full URL, including the host and path.
+
+To specify a regex, prefix the pattern with `RE:`. For example, to exclude all events containing "signout", you could do:
+
+```bash
+bbot -t evilcorp.com --blacklist "RE:signout"
+```
+
+Note that this would blacklist both of the following events:
+
+- `[URL] http://evilcorp.com/signout.aspx`
+- `[DNS_NAME] signout.evilcorp.com`
+
+If you only want to blacklist the URL, you could narrow the regex like so:
+
+```bash
+bbot -t evilcorp.com --blacklist 'RE:signout\.aspx$'
+```
+
+Similar to targets and whitelists, blacklists can be specified in your preset. The `spider` preset makes use of this to prevent the spider from following logout links:
+
+```yaml title="spider.yml"
+description: Recursive web spider
+
+modules:
+ - httpx
+
+blacklist:
+ # Prevent spider from invalidating sessions by logging out
+ - "RE:/.*(sign|log)[_-]?out"
+
+config:
+ web:
+ # how many links to follow in a row
+ spider_distance: 2
+ # don't follow links whose directory depth is higher than 4
+ spider_depth: 4
+ # maximum number of links to follow per page
+ spider_links_per_page: 25
+```
+
## DNS Wildcards
BBOT has robust wildcard detection built-in. It can reliably detect wildcard domains, and will tag them accordingly:
diff --git a/poetry.lock b/poetry.lock
index 0b61edc1a..714d0182e 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -417,19 +417,19 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "cloudcheck"
-version = "5.0.1.595"
+version = "6.0.0.661"
description = "Check whether an IP address belongs to a cloud provider"
optional = false
python-versions = "<4.0,>=3.9"
files = [
- {file = "cloudcheck-5.0.1.595-py3-none-any.whl", hash = "sha256:68acec63b09400fa0409ae7f3ffa817cbc891bf8a2ac63f9610a3b049a4bf57d"},
- {file = "cloudcheck-5.0.1.595.tar.gz", hash = "sha256:38456074332ed2ba928e7073e3928a5223a6005a64124b4b342d8b9599ca10e0"},
+ {file = "cloudcheck-6.0.0.661-py3-none-any.whl", hash = "sha256:b8c45061d76eea14aa493e9dfd087e1aefccb1632c3bb8d49c77d273f721188c"},
+ {file = "cloudcheck-6.0.0.661.tar.gz", hash = "sha256:98a7b88f4784fad91faa3d6ea5749c7fe215462dbad63c34df1afc671f915795"},
]
[package.dependencies]
httpx = ">=0.26,<0.28"
pydantic = ">=2.4.2,<3.0.0"
-radixtarget = ">=1.0.0.14,<2.0.0.0"
+radixtarget = ">=2.0.0.32,<3.0.0.0"
regex = ">=2024.4.16,<2025.0.0"
[[package]]
@@ -2338,13 +2338,13 @@ cffi = {version = "*", markers = "implementation_name == \"pypy\""}
[[package]]
name = "radixtarget"
-version = "1.1.0.18"
+version = "2.0.0.50"
description = "Check whether an IP address belongs to a cloud provider"
optional = false
python-versions = "<4.0,>=3.9"
files = [
- {file = "radixtarget-1.1.0.18-py3-none-any.whl", hash = "sha256:05e95de6afb0ee4dfa31c53bd25a34a193ae5bb46dc7624e0424bbcfed2c4cea"},
- {file = "radixtarget-1.1.0.18.tar.gz", hash = "sha256:1a3306891a22f7ff2c71d6cd42202af8852cdb4fb68e9a1e9a76a3f60aa98ab6"},
+ {file = "radixtarget-2.0.0.50-py3-none-any.whl", hash = "sha256:fe1670a382d1ddaebc2cba3b16607d32085987eb5d71074cc0535e19a02406b7"},
+ {file = "radixtarget-2.0.0.50.tar.gz", hash = "sha256:73519eebb0596a67d4e9347a5e4602c95c9ff9dc8be4c64e6ab0247bc69a13e8"},
]
[[package]]
@@ -3136,4 +3136,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
-content-hash = "3dae2f970494ad6b7716cd18ca02c76d53248aa5f7bad8e4ae22a7e4d885f79e"
+content-hash = "0201017ae3c42fef3017d761f569dfb5845b3be1f0143c6c0b3129f1b43d6647"
diff --git a/pyproject.toml b/pyproject.toml
index d2494cc6c..914ceb326 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,7 +11,7 @@ readme = "README.md"
repository = "https://github.com/blacklanternsecurity/bbot"
homepage = "https://github.com/blacklanternsecurity/bbot"
documentation = "https://www.blacklanternsecurity.com/bbot/"
-keywords = ["python", "cli", "automation", "osint", "neo4j", "scanner", "python-library", "hacking", "recursion", "pentesting", "recon", "command-line-tool", "bugbounty", "subdomains", "security-tools", "subdomain-scanner", "osint-framework", "attack-surface", "subdomain-enumeration", "osint-tool"]
+keywords = ["python", "cli", "automation", "osint", "threat-intel", "intelligence", "neo4j", "scanner", "python-library", "hacking", "recursion", "pentesting", "recon", "command-line-tool", "bugbounty", "subdomains", "security-tools", "subdomain-scanner", "osint-framework", "attack-surface", "subdomain-enumeration", "osint-tool"]
classifiers = [
"Operating System :: POSIX :: Linux",
"Topic :: Security",
@@ -48,14 +48,14 @@ socksio = "^1.0.0"
jinja2 = "^3.1.3"
regex = "^2024.4.16"
unidecode = "^1.3.8"
-radixtarget = "^1.0.0.15"
-cloudcheck = "^5.0.0.350"
mmh3 = ">=4.1,<6.0"
setproctitle = "^1.3.3"
yara-python = "^4.5.1"
pyzmq = "^26.0.3"
httpx = "^0.27.0"
puremagic = "^1.28"
+cloudcheck = "^6.0.0.602"
+radixtarget = "^2.0.0.50"
[tool.poetry.group.dev.dependencies]
flake8 = ">=6,<8"